tsdbCacheRead.c 14.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "taoserror.h"
#include "tarray.h"
#include "tcommon.h"
#include "tsdb.h"

21
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
22

H
Haojun Liao 已提交
23 24
static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
                          void** pRes, const char* idStr) {
25 26
  int32_t numOfRows = pBlock->info.rows;

27
  if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
28 29
    bool allNullRow = true;

30 31 32
    for (int32_t i = 0; i < pReader->numOfCols; ++i) {
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
      SFirstLastRes*   p = (SFirstLastRes*)varDataVal(pRes[i]);
33

34 35 36 37 38
      if (slotIds[i] == -1) {  // the primary timestamp
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
        p->ts = pColVal->ts;
        p->bytes = TSDB_KEYSIZE;
        *(int64_t*)p->buf = pColVal->ts;
39
        allNullRow = false;
40
      } else {
41
        int32_t slotId = slotIds[i];
42
        // add check for null value, caused by the modification of table schema (new column added).
43
        if (slotId >= taosArrayGetSize(pRow)) {
44 45
          p->ts = 0;
          p->isNull = true;
46
          colDataSetNULL(pColInfoData, numOfRows);
47 48 49
          continue;
        }

50 51
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);

52 53
        p->ts = pColVal->ts;
        p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
54 55
        allNullRow = p->isNull & allNullRow;

56 57 58 59
        if (!p->isNull) {
          if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
            varDataSetLen(p->buf, pColVal->colVal.value.nData);
            memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData);
60
            p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE;  // binary needs to plus the header size
61 62 63 64 65 66
          } else {
            memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes);
            p->bytes = pReader->pSchema->columns[slotId].bytes;
          }
        }
      }
67

68
      // pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to substruct it
69
      p->hasResult = true;
70
      varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE);
71
      colDataSetVal(pColInfoData, numOfRows, (const char*)pRes[i], false);
72
    }
73

74
    pBlock->info.rows += allNullRow ? 0 : 1;
H
Haojun Liao 已提交
75
  } else if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
76 77 78 79 80
    for (int32_t i = 0; i < pReader->numOfCols; ++i) {
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);

      if (slotIds[i] == -1) {
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
81
        colDataSetVal(pColInfoData, numOfRows, (const char*)&pColVal->ts, false);
82
      } else {
83 84 85 86 87 88
        int32_t slotId = slotIds[i];
        // add check for null value, caused by the modification of table schema (new column added).
        if (slotId >= taosArrayGetSize(pRow)) {
          colDataSetNULL(pColInfoData, numOfRows);
          continue;
        }
89
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
90
        SColVal*  pVal = &pColVal->colVal;
91

92
        if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
93
          if (!COL_VAL_IS_VALUE(&pColVal->colVal)) {
94
            colDataSetNULL(pColInfoData, numOfRows);
95
          } else {
96 97
            varDataSetLen(pReader->transferBuf[slotId], pVal->value.nData);
            memcpy(varDataVal(pReader->transferBuf[slotId]), pVal->value.pData, pVal->value.nData);
98
            colDataSetVal(pColInfoData, numOfRows, pReader->transferBuf[slotId], false);
99
          }
100
        } else {
101
          colDataSetVal(pColInfoData, numOfRows, (const char*)&pVal->value.val, !COL_VAL_IS_VALUE(pVal));
102 103 104
        }
      }
    }
105

106
    pBlock->info.rows += 1;
H
Haojun Liao 已提交
107 108 109
  } else {
    tsdbError("invalid retrieve type:%d, %s", pReader->type, idStr);
    return TSDB_CODE_INVALID_PARA;
110
  }
H
Haojun Liao 已提交
111 112

  return TSDB_CODE_SUCCESS;
113 114
}

115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* idstr) {
  int32_t numOfTables = p->numOfTables;

  if (suid != 0) {
    p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, suid, -1, 1);
    if (p->pSchema == NULL) {
      taosMemoryFree(p);
      tsdbWarn("stable:%" PRIu64 " has been dropped, failed to retrieve cached rows, %s", suid, idstr);
      return TSDB_CODE_PAR_TABLE_NOT_EXIST;
    }
  } else {
    for (int32_t i = 0; i < numOfTables; ++i) {
      uint64_t uid = p->pTableList[i].uid;
      p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, uid, -1, 1);
      if (p->pSchema != NULL) {
        break;
      }

      tsdbWarn("table:%" PRIu64 " has been dropped, failed to retrieve cached rows, %s", uid, idstr);
    }

    // all queried tables have been dropped already, return immediately.
    if (p->pSchema == NULL) {
      taosMemoryFree(p);
      tsdbWarn("all queried tables has been dropped, try next group, %s", idstr);
      return TSDB_CODE_PAR_TABLE_NOT_EXIST;
    }
  }

  return TSDB_CODE_SUCCESS;
}

147
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
H
Haojun Liao 已提交
148
                                uint64_t suid, void** pReader, const char* idstr) {
149 150
  *pReader = NULL;
  SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
151 152 153 154
  if (p == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

155 156
  p->type = type;
  p->pVnode = pVnode;
157 158
  p->pTsdb = p->pVnode->pTsdb;
  p->verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX};
159
  p->numOfCols = numOfCols;
160
  p->suid = suid;
161

162
  if (numOfTables == 0) {
163 164 165 166
    *pReader = p;
    return TSDB_CODE_SUCCESS;
  }

167
  p->pTableList = pTableIdList;
168
  p->numOfTables = numOfTables;
169

170 171 172 173 174 175
  int32_t code = setTableSchema(p, suid, idstr);
  if (code != TSDB_CODE_SUCCESS) {
    tsdbCacherowsReaderClose(p);
    return code;
  }

176
  p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
177
  if (p->transferBuf == NULL) {
H
Haojun Liao 已提交
178
    tsdbCacherowsReaderClose(p);
H
Haojun Liao 已提交
179
    return TSDB_CODE_OUT_OF_MEMORY;
180 181
  }

182
  for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
M
Minglei Jin 已提交
183
    if (IS_VAR_DATA_TYPE(p->pSchema->columns[i].type)) {
184
      p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes);
185 186 187 188
      if (p->transferBuf[i] == NULL) {
        tsdbCacherowsReaderClose(p);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
189 190
    }
  }
191

192 193
  int32_t numOfStt = ((SVnode*)pVnode)->config.sttTrigger;
  p->pLoadInfo = tCreateLastBlockLoadInfo(p->pSchema, NULL, 0, numOfStt);
194 195 196 197 198
  if (p->pLoadInfo == NULL) {
    tsdbCacherowsReaderClose(p);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

199
  p->idstr = taosStrdup(idstr);
200 201
  taosThreadMutexInit(&p->readerMutex, NULL);

202 203
  p->lastTs = INT64_MIN;

204 205 206 207
  *pReader = p;
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
208
void* tsdbCacherowsReaderClose(void* pReader) {
209
  SCacheRowsReader* p = pReader;
210

211 212 213 214 215 216 217
  if (p->pSchema != NULL) {
    for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
      taosMemoryFreeClear(p->transferBuf[i]);
    }

    taosMemoryFree(p->transferBuf);
    taosMemoryFree(p->pSchema);
218 219
  }

220 221
  taosMemoryFree(p->pCurrSchema);

222 223
  destroyLastBlockLoadInfo(p->pLoadInfo);

224
  taosMemoryFree((void*)p->idstr);
225 226
  taosThreadMutexDestroy(&p->readerMutex);

227
  taosMemoryFree(pReader);
H
Haojun Liao 已提交
228
  return NULL;
229 230
}

H
Haojun Liao 已提交
231
static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, SArray** pRow,
H
Haojun Liao 已提交
232
                                 LRUHandle** h) {
233
  int32_t code = TSDB_CODE_SUCCESS;
234
  *pRow = NULL;
235

H
Haojun Liao 已提交
236
  if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
237
    code = tsdbCacheGetLastrowH(lruCache, uid, pr, h);
238
  } else {
239
    code = tsdbCacheGetLastH(lruCache, uid, pr, h);
240
  }
241

242 243 244 245 246 247 248
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // no data in the table of Uid
  if (*h != NULL) {
    *pRow = (SArray*)taosLRUCacheValue(lruCache, *h);
249 250 251 252 253
  }

  return code;
}

254
static void freeItem(void* pItem) {
255
  SLastCol* pCol = (SLastCol*)pItem;
256 257 258 259 260
  if (IS_VAR_DATA_TYPE(pCol->colVal.type)) {
    taosMemoryFree(pCol->colVal.value.pData);
  }
}

261
static int32_t tsdbCacheQueryReseek(void* pQHandle) {
262 263 264
  int32_t           code = 0;
  SCacheRowsReader* pReader = pQHandle;

265 266 267 268
  code = taosThreadMutexTryLock(&pReader->readerMutex);
  if (code == 0) {
    // pause current reader's state if not paused, save ts & version for resuming
    // just wait for the big all tables' snapshot untaking for now
269

270
    code = TSDB_CODE_VND_QUERY_BUSY;
271

272 273 274 275 276 277 278 279
    taosThreadMutexUnlock(&pReader->readerMutex);

    return code;
  } else if (code == EBUSY) {
    return TSDB_CODE_VND_QUERY_BUSY;
  } else {
    return -1;
  }
280 281
}

282
int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) {
283
  if (pReader == NULL || pResBlock == NULL) {
284 285 286
    return TSDB_CODE_INVALID_PARA;
  }

287
  SCacheRowsReader* pr = pReader;
288

289
  int32_t    code = TSDB_CODE_SUCCESS;
290
  SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
291
  LRUHandle* h = NULL;
H
Haojun Liao 已提交
292
  SArray*    pRow = NULL;
293
  bool       hasRes = false;
294
  SArray*    pLastCols = NULL;
295

296
  void** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES);
297 298 299 300 301
  if (pRes == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }

302
  for (int32_t j = 0; j < pr->numOfCols; ++j) {
X
Xiaoyu Wang 已提交
303 304
    pRes[j] = taosMemoryCalloc(
        1, sizeof(SFirstLastRes) + pr->pSchema->columns[-1 == slotIds[j] ? 0 : slotIds[j]].bytes + VARSTR_HEADER_SIZE);
305 306 307 308
    SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]);
    p->ts = INT64_MIN;
  }

309 310 311 312 313 314
  pLastCols = taosArrayInit(pr->pSchema->numOfCols, sizeof(SLastCol));
  if (pLastCols == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }

315
  for (int32_t i = 0; i < pr->pSchema->numOfCols; ++i) {
316
    struct STColumn* pCol = &pr->pSchema->columns[i];
317
    SLastCol         p = {.ts = INT64_MIN, .colVal.type = pCol->type, .colVal.flag = CV_FLAG_NULL};
318 319 320 321 322

    if (IS_VAR_DATA_TYPE(pCol->type)) {
      p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char));
    }
    taosArrayPush(pLastCols, &p);
H
Haojun Liao 已提交
323
  }
324

325
  taosThreadMutexLock(&pr->readerMutex);
X
Xiaoyu Wang 已提交
326
  code = tsdbTakeReadSnap((STsdbReader*)pr, tsdbCacheQueryReseek, &pr->pReadSnap);
K
kailixu 已提交
327 328 329
  if (code != TSDB_CODE_SUCCESS) {
    goto _end;
  }
330
  pr->pDataFReader = NULL;
331
  pr->pDataFReaderLast = NULL;
332

333
  // retrieve the only one last row of all tables in the uid list.
H
Haojun Liao 已提交
334
  if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
335 336
    for (int32_t i = 0; i < pr->numOfTables; ++i) {
      STableKeyInfo* pKeyInfo = &pr->pTableList[i];
337

338
      code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
H
Haojun Liao 已提交
339
      if (code != TSDB_CODE_SUCCESS) {
K
kailixu 已提交
340
        goto _end;
341 342
      }

343
      if (h == NULL) {
344 345
        continue;
      }
346 347
      if (taosArrayGetSize(pRow) <= 0) {
        tsdbCacheRelease(lruCache, h);
348 349 350
        continue;
      }

H
Haojun Liao 已提交
351
      {
352 353
        bool    hasNotNullRow = true;
        int64_t minTs = INT64_MAX;
H
Haojun Liao 已提交
354
        for (int32_t k = 0; k < pr->numOfCols; ++k) {
355 356 357 358 359
          int32_t slotId = slotIds[k];

          if (slotId == -1) {  // the primary timestamp
            SLastCol* p = taosArrayGet(pLastCols, 0);
            SLastCol* pCol = (SLastCol*)taosArrayGet(pRow, 0);
360
            if (pCol->ts > p->ts) {
361
              hasRes = true;
362 363
              p->ts = pCol->ts;
              p->colVal = pCol->colVal;
364
              minTs = pCol->ts;
H
Haojun Liao 已提交
365 366 367 368 369 370 371 372 373

              // only set value for last row query
              if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
                if (taosArrayGetSize(pTableUidList) == 0) {
                  taosArrayPush(pTableUidList, &pKeyInfo->uid);
                } else {
                  taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
                }
              }
H
Haojun Liao 已提交
374 375
            }
          } else {
376
            SLastCol* p = taosArrayGet(pLastCols, slotId);
H
Haojun Liao 已提交
377 378
            SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);

379
            if (pColVal->ts > p->ts) {
380
              if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
381 382 383
                if (!COL_VAL_IS_VALUE(&p->colVal)) {
                  hasNotNullRow = false;
                }
384 385 386
                continue;
              }

387
              hasRes = true;
388
              p->ts = pColVal->ts;
389 390 391
              if (pColVal->ts < minTs && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
                minTs = pColVal->ts;
              }
H
Haojun Liao 已提交
392

H
Haojun Liao 已提交
393 394 395 396 397 398
              if (!IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
                p->colVal = pColVal->colVal;
              } else {
                if (COL_VAL_IS_VALUE(&pColVal->colVal)) {
                  memcpy(p->colVal.value.pData, pColVal->colVal.value.pData, pColVal->colVal.value.nData);
                }
H
Haojun Liao 已提交
399

H
Haojun Liao 已提交
400 401 402 403
                p->colVal.value.nData = pColVal->colVal.value.nData;
                p->colVal.type = pColVal->colVal.type;
                p->colVal.flag = pColVal->colVal.flag;
                p->colVal.cid = pColVal->colVal.cid;
H
Haojun Liao 已提交
404 405 406
              }
            }
          }
407
        }
408 409 410 411

        if (hasNotNullRow) {
          pr->lastTs = minTs;
        }
412
      }
413

414
      tsdbCacheRelease(lruCache, h);
415
    }
416 417

    if (hasRes) {
H
Haojun Liao 已提交
418
      saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes, pr->idstr);
419 420
    }

H
Haojun Liao 已提交
421
  } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
422 423
    for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
      STableKeyInfo* pKeyInfo = &pr->pTableList[i];
424
      code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
H
Haojun Liao 已提交
425
      if (code != TSDB_CODE_SUCCESS) {
K
kailixu 已提交
426
        goto _end;
427 428
      }

429
      if (h == NULL) {
430 431
        continue;
      }
432 433
      if (taosArrayGetSize(pRow) <= 0) {
        tsdbCacheRelease(lruCache, h);
434 435 436
        continue;
      }

H
Haojun Liao 已提交
437
      saveOneRow(pRow, pResBlock, pr, slotIds, pRes, pr->idstr);
438
      // TODO reset the pRes
439

440
      taosArrayPush(pTableUidList, &pKeyInfo->uid);
441
      tsdbCacheRelease(lruCache, h);
442

443 444
      pr->tableIndex += 1;
      if (pResBlock->info.rows >= pResBlock->info.capacity) {
445
        goto _end;
446
      }
447 448
    }
  } else {
449
    code = TSDB_CODE_INVALID_PARA;
450 451
  }

452
_end:
453
  tsdbDataFReaderClose(&pr->pDataFReaderLast);
454 455
  tsdbDataFReaderClose(&pr->pDataFReader);

456
  resetLastBlockLoadInfo(pr->pLoadInfo);
457
  tsdbUntakeReadSnap((STsdbReader*)pr, pr->pReadSnap, true);
458
  taosThreadMutexUnlock(&pr->readerMutex);
459

H
Haojun Liao 已提交
460 461 462 463
  if (pRes != NULL) {
    for (int32_t j = 0; j < pr->numOfCols; ++j) {
      taosMemoryFree(pRes[j]);
    }
464 465 466
  }

  taosMemoryFree(pRes);
467
  taosArrayDestroyEx(pLastCols, freeItem);
468
  return code;
469
}