tpagedbuf.c 18.7 KB
Newer Older
S
Shengliang Guan 已提交
1
#define _DEFAULT_SOURCE
H
Haojun Liao 已提交
2
#include "tpagedbuf.h"
3 4
#include "taoserror.h"
#include "tcompression.h"
H
Haojun Liao 已提交
5
#include "thash.h"
S
Shengliang Guan 已提交
6
#include "tlog.h"
7

S
Shengliang Guan 已提交
8
#define GET_DATA_PAYLOAD(_p)          ((char*)(_p)->pData + POINTER_BYTES)
9 10
#define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages)

11
typedef struct SPageDiskInfo {
S
Shengliang Guan 已提交
12 13
  int64_t offset;
  int32_t length;
H
Haojun Liao 已提交
14
} SPageDiskInfo, SFreeListItem;
15

H
Haojun Liao 已提交
16
struct SPageInfo {
17
  SListNode* pn;  // point to list node struct
S
Shengliang Guan 已提交
18 19 20
  void*      pData;
  int64_t    offset;
  int32_t    pageId;
21
  int32_t    length : 29;
S
Shengliang Guan 已提交
22 23
  bool       used : 1;   // set current page is in used
  bool       dirty : 1;  // set current buffer page is dirty or not
H
Haojun Liao 已提交
24
};
25

H
Haojun Liao 已提交
26
struct SDiskbasedBuf {
27 28
  int32_t   numOfPages;
  int64_t   totalBufSize;
S
Shengliang Guan 已提交
29
  uint64_t  fileSize;  // disk file size
30
  TdFilePtr pFile;
S
Shengliang Guan 已提交
31 32 33 34
  int32_t   allocateId;  // allocated page id
  char*     path;        // file path
  int32_t   pageSize;    // current used page size
  int32_t   inMemPages;  // numOfPages that are allocated in memory
35 36
  SList*    freePgList;  // free page list
  SHashObj* groupSet;    // id hash table, todo remove it
37 38
  SHashObj* all;
  SList*    lruList;
S
Shengliang Guan 已提交
39 40 41 42 43 44 45 46
  void*     emptyDummyIdList;  // dummy id list
  void*     assistBuf;         // assistant buffer for compress/decompress data
  SArray*   pFree;             // free area in file
  bool      comp;              // compressed before flushed to disk
  uint64_t  nextPos;           // next page flush position

  uint64_t            qId;          // for debug purpose
  bool                printStatis;  // Print statistics info when closing this buffer.
H
Haojun Liao 已提交
47
  SDiskbasedBufStatis statis;
H
Haojun Liao 已提交
48
};
49

H
Haojun Liao 已提交
50
static int32_t createDiskFile(SDiskbasedBuf* pBuf) {
51 52 53
  // pBuf->file = fopen(pBuf->path, "wb+");
  pBuf->pFile = taosOpenFile(pBuf->path, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
  if (pBuf->pFile == NULL) {
S
Shengliang Guan 已提交
54
    //    qError("failed to create tmp file: %s on disk. %s", pBuf->path, strerror(errno));
55 56 57 58 59 60
    return TAOS_SYSTEM_ERROR(errno);
  }

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
61
static char* doCompressData(void* data, int32_t srcSize, int32_t* dst, SDiskbasedBuf* pBuf) {  // do nothing
H
Haojun Liao 已提交
62
  if (!pBuf->comp) {
63 64 65 66
    *dst = srcSize;
    return data;
  }

H
Haojun Liao 已提交
67
  *dst = tsCompressString(data, srcSize, 1, pBuf->assistBuf, srcSize, ONE_STAGE_COMP, NULL, 0);
68

H
Haojun Liao 已提交
69
  memcpy(data, pBuf->assistBuf, *dst);
70 71 72
  return data;
}

S
Shengliang Guan 已提交
73
static char* doDecompressData(void* data, int32_t srcSize, int32_t* dst, SDiskbasedBuf* pBuf) {  // do nothing
H
Haojun Liao 已提交
74
  if (!pBuf->comp) {
75 76 77 78
    *dst = srcSize;
    return data;
  }

H
Haojun Liao 已提交
79
  *dst = tsDecompressString(data, srcSize, 1, pBuf->assistBuf, pBuf->pageSize, ONE_STAGE_COMP, NULL, 0);
80
  if (*dst > 0) {
H
Haojun Liao 已提交
81
    memcpy(data, pBuf->assistBuf, *dst);
82 83 84 85
  }
  return data;
}

H
Haojun Liao 已提交
86 87 88
static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) {
  if (pBuf->pFree == NULL) {
    return pBuf->nextPos;
89 90 91
  } else {
    int32_t offset = -1;

H
Haojun Liao 已提交
92
    size_t num = taosArrayGetSize(pBuf->pFree);
S
Shengliang Guan 已提交
93
    for (int32_t i = 0; i < num; ++i) {
H
Haojun Liao 已提交
94
      SFreeListItem* pi = taosArrayGet(pBuf->pFree, i);
H
Haojun Liao 已提交
95
      if (pi->length >= size) {
96 97
        offset = pi->offset;
        pi->offset += (int32_t)size;
H
Haojun Liao 已提交
98
        pi->length -= (int32_t)size;
99 100 101 102 103 104

        return offset;
      }
    }

    // no available recycle space, allocate new area in file
H
Haojun Liao 已提交
105
    return pBuf->nextPos;
106 107 108
  }
}

109
static void setPageNotInBuf(SPageInfo* pPageInfo) { pPageInfo->pData = NULL; }
H
Haojun Liao 已提交
110

111
static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + 2; }
H
Haojun Liao 已提交
112

H
Haojun Liao 已提交
113 114 115 116 117 118 119 120
/**
 *   +--------------------------+-------------------+--------------+
 *   | PTR to SPageInfo (8bytes)| Payload (PageSize)| 2 Extra Bytes|
 *   +--------------------------+-------------------+--------------+
 * @param pBuf
 * @param pg
 * @return
 */
H
Haojun Liao 已提交
121
static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
122 123
  assert(!pg->used && pg->pData != NULL);

H
Haojun Liao 已提交
124
  int32_t size = pBuf->pageSize;
H
Haojun Liao 已提交
125 126
  char*   t = NULL;
  if (pg->offset == -1 || pg->dirty) {
H
Haojun Liao 已提交
127 128 129
    void* payload = GET_DATA_PAYLOAD(pg);
    t = doCompressData(payload, pBuf->pageSize, &size, pBuf);
    assert(size >= 0);
H
Haojun Liao 已提交
130
  }
131 132

  // this page is flushed to disk for the first time
H
Haojun Liao 已提交
133 134 135
  if (pg->dirty) {
    if (pg->offset == -1) {
      assert(pg->dirty == true);
H
Haojun Liao 已提交
136

H
Haojun Liao 已提交
137 138
      pg->offset = allocatePositionInFile(pBuf, size);
      pBuf->nextPos += size;
139

140 141 142 143 144
      int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
      if (ret != 0) {
        terrno = TAOS_SYSTEM_ERROR(errno);
        return NULL;
      }
145

146 147 148 149 150
      ret = (int32_t)taosWriteFile(pBuf->pFile, t, size);
      if (ret != size) {
        terrno = TAOS_SYSTEM_ERROR(errno);
        return NULL;
      }
151

H
Haojun Liao 已提交
152 153 154
      if (pBuf->fileSize < pg->offset + size) {
        pBuf->fileSize = pg->offset + size;
      }
H
Haojun Liao 已提交
155

H
Haojun Liao 已提交
156 157 158 159 160 161 162 163 164 165 166 167 168
      pBuf->statis.flushBytes += size;
      pBuf->statis.flushPages += 1;
    } else {
      // length becomes greater, current space is not enough, allocate new place, otherwise, do nothing
      if (pg->length < size) {
        // 1. add current space to free list
        SPageDiskInfo dinfo = {.length = pg->length, .offset = pg->offset};
        taosArrayPush(pBuf->pFree, &dinfo);

        // 2. allocate new position, and update the info
        pg->offset = allocatePositionInFile(pBuf, size);
        pBuf->nextPos += size;
      }
169

170 171 172 173 174 175
      // 3. write to disk.
      int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
      if (ret != 0) {
        terrno = TAOS_SYSTEM_ERROR(errno);
        return NULL;
      }
176

177 178 179 180 181
      ret = (int32_t)taosWriteFile(pBuf->pFile, t, size);
      if (ret != size) {
        terrno = TAOS_SYSTEM_ERROR(errno);
        return NULL;
      }
182

H
Haojun Liao 已提交
183 184 185
      if (pBuf->fileSize < pg->offset + size) {
        pBuf->fileSize = pg->offset + size;
      }
186

H
Haojun Liao 已提交
187 188
      pBuf->statis.flushBytes += size;
      pBuf->statis.flushPages += 1;
189
    }
190
  } else {  // NOTE: the size may be -1, the this recycle page has not been flushed to disk yet.
H
Haojun Liao 已提交
191
    size = pg->length;
H
Haojun Liao 已提交
192 193
    if (size == -1) {
      printf("----\n");
194 195 196
    }
  }

H
Haojun Liao 已提交
197
  ASSERT(size > 0 || (pg->offset == -1 && pg->length == -1));
198

H
Haojun Liao 已提交
199
  char* pDataBuf = pg->pData;
H
Haojun Liao 已提交
200
  memset(pDataBuf, 0, getAllocPageSize(pBuf->pageSize));
201

H
Haojun Liao 已提交
202
  pg->length = size;  // on disk size
H
Haojun Liao 已提交
203
  return pDataBuf;
204 205
}

H
Haojun Liao 已提交
206
static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
207
  int32_t ret = TSDB_CODE_SUCCESS;
S
Shengliang Guan 已提交
208
  assert(((int64_t)pBuf->numOfPages * pBuf->pageSize) == pBuf->totalBufSize && pBuf->numOfPages >= pBuf->inMemPages);
209

210
  if (pBuf->pFile == NULL) {
H
Haojun Liao 已提交
211
    if ((ret = createDiskFile(pBuf)) != TSDB_CODE_SUCCESS) {
212 213 214 215 216
      terrno = ret;
      return NULL;
    }
  }

H
Haojun Liao 已提交
217 218 219 220 221
  char* p = doFlushPageToDisk(pBuf, pg);
  setPageNotInBuf(pg);
  pg->dirty = false;

  return p;
222 223 224
}

// load file block data in disk
H
Haojun Liao 已提交
225
static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
226
  int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
H
Haojun Liao 已提交
227 228 229 230 231
  if (ret != 0) {
    ret = TAOS_SYSTEM_ERROR(errno);
    return ret;
  }

232
  void* pPage = (void*)GET_DATA_PAYLOAD(pg);
H
Haojun Liao 已提交
233
  ret = (int32_t)taosReadFile(pBuf->pFile, pPage, pg->length);
234
  if (ret != pg->length) {
H
Haojun Liao 已提交
235 236
    ret = TAOS_SYSTEM_ERROR(errno);
    return ret;
237 238
  }

H
Haojun Liao 已提交
239 240
  pBuf->statis.loadBytes += pg->length;
  pBuf->statis.loadPages += 1;
241 242

  int32_t fullSize = 0;
H
Haojun Liao 已提交
243
  doDecompressData(pPage, pg->length, &fullSize, pBuf);
H
Haojun Liao 已提交
244
  return 0;
245 246
}

H
Haojun Liao 已提交
247
static SIDList addNewGroup(SDiskbasedBuf* pBuf, int32_t groupId) {
S
Shengliang Guan 已提交
248
  assert(taosHashGet(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t)) == NULL);
249 250

  SArray* pa = taosArrayInit(1, POINTER_BYTES);
H
Haojun Liao 已提交
251
  int32_t ret = taosHashPut(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t), &pa, POINTER_BYTES);
252 253 254 255 256
  assert(ret == 0);

  return pa;
}

H
Haojun Liao 已提交
257
static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t pageId) {
258 259
  SIDList list = NULL;

H
Haojun Liao 已提交
260
  char** p = taosHashGet(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t));
261
  if (p == NULL) {  // it is a new group id
H
Haojun Liao 已提交
262
    list = addNewGroup(pBuf, groupId);
263
  } else {
S
Shengliang Guan 已提交
264
    list = (SIDList)(*p);
265 266
  }

H
Haojun Liao 已提交
267
  pBuf->numOfPages += 1;
268

H
Haojun Liao 已提交
269
  SPageInfo* ppi = malloc(sizeof(SPageInfo));
270 271

  ppi->pageId = pageId;
S
Shengliang Guan 已提交
272
  ppi->pData = NULL;
273 274
  ppi->offset = -1;
  ppi->length = -1;
S
Shengliang Guan 已提交
275 276
  ppi->used = true;
  ppi->pn = NULL;
277

S
Shengliang Guan 已提交
278
  return *(SPageInfo**)taosArrayPush(list, &ppi);
279 280
}

H
Haojun Liao 已提交
281
static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
282
  SListIter iter = {0};
H
Haojun Liao 已提交
283
  tdListInitIter(pBuf->lruList, &iter, TD_LIST_BACKWARD);
284 285

  SListNode* pn = NULL;
S
Shengliang Guan 已提交
286 287
  while ((pn = tdListNext(&iter)) != NULL) {
    SPageInfo* pageInfo = *(SPageInfo**)pn->data;
288 289 290
    assert(pageInfo->pageId >= 0 && pageInfo->pn == pn);

    if (!pageInfo->used) {
291
      //      printf("%d is chosen\n", pageInfo->pageId);
292
      break;
H
Haojun Liao 已提交
293
    } else {
294
      //      printf("page %d is used, dirty:%d\n", pageInfo->pageId, pageInfo->dirty);
295 296 297
    }
  }

298 299 300 301 302 303 304 305 306
  //  int32_t pos = listNEles(pBuf->lruList);
  //  SListIter iter1 = {0};
  //  tdListInitIter(pBuf->lruList, &iter1, TD_LIST_BACKWARD);
  //  SListNode* pn1 = NULL;
  //  while((pn1 = tdListNext(&iter1)) != NULL) {
  //    SPageInfo* pageInfo = *(SPageInfo**) pn1->data;
  //    printf("page %d is used, dirty:%d, pos:%d\n", pageInfo->pageId, pageInfo->dirty, pos - 1);
  //    pos -= 1;
  //  }
H
Haojun Liao 已提交
307

308 309 310
  return pn;
}

H
Haojun Liao 已提交
311
static char* evacOneDataPage(SDiskbasedBuf* pBuf) {
S
Shengliang Guan 已提交
312
  char*      bufPage = NULL;
H
Haojun Liao 已提交
313
  SListNode* pn = getEldestUnrefedPage(pBuf);
314 315 316

  // all pages are referenced by user, try to allocate new space
  if (pn == NULL) {
317
    assert(0);
H
Haojun Liao 已提交
318
    int32_t prev = pBuf->inMemPages;
319 320

    // increase by 50% of previous mem pages
H
Haojun Liao 已提交
321
    pBuf->inMemPages = (int32_t)(pBuf->inMemPages * 1.5f);
322

S
Shengliang Guan 已提交
323 324
    //    qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pBuf, prev,
    //          pBuf->inMemPages, pBuf->pageSize);
325
  } else {
H
Haojun Liao 已提交
326
    tdListPopNode(pBuf->lruList, pn);
327

S
Shengliang Guan 已提交
328
    SPageInfo* d = *(SPageInfo**)pn->data;
329 330 331 332 333
    assert(d->pn == pn);

    d->pn = NULL;
    tfree(pn);

H
Haojun Liao 已提交
334
    bufPage = flushPageToDisk(pBuf, d);
335 336 337 338 339
  }

  return bufPage;
}

S
Shengliang Guan 已提交
340
static void lruListPushFront(SList* pList, SPageInfo* pi) {
341 342 343 344 345
  tdListPrepend(pList, &pi);
  SListNode* front = tdListGetHead(pList);
  pi->pn = front;
}

S
Shengliang Guan 已提交
346
static void lruListMoveToFront(SList* pList, SPageInfo* pi) {
347 348 349 350
  tdListPopNode(pList, pi->pn);
  tdListPrependNode(pList, pi->pn);
}

H
Haojun Liao 已提交
351 352
static SPageInfo* getPageInfoFromPayload(void* page) {
  int32_t offset = offsetof(SPageInfo, pData);
353
  char*   p = page - offset;
H
Haojun Liao 已提交
354

355
  SPageInfo* ppi = ((SPageInfo**)p)[0];
H
Haojun Liao 已提交
356
  return ppi;
357 358
}

359 360
int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId,
                           const char* dir) {
H
Haojun Liao 已提交
361 362
  *pBuf = calloc(1, sizeof(SDiskbasedBuf));

H
Haojun Liao 已提交
363 364
  SDiskbasedBuf* pPBuf = *pBuf;
  if (pPBuf == NULL) {
H
Haojun Liao 已提交
365 366 367
    return TSDB_CODE_OUT_OF_MEMORY;
  }

368 369
  pPBuf->pageSize = pagesize;
  pPBuf->numOfPages = 0;  // all pages are in buffer in the first place
H
Haojun Liao 已提交
370
  pPBuf->totalBufSize = 0;
371 372 373 374 375 376 377 378
  pPBuf->inMemPages = inMemBufSize / pagesize;  // maximum allowed pages, it is a soft limit.
  pPBuf->allocateId = -1;
  pPBuf->comp = true;
  pPBuf->pFile = NULL;
  pPBuf->qId = qId;
  pPBuf->fileSize = 0;
  pPBuf->pFree = taosArrayInit(4, sizeof(SFreeListItem));
  pPBuf->freePgList = tdListNew(POINTER_BYTES);
H
Haojun Liao 已提交
379 380 381 382

  // at least more than 2 pages must be in memory
  assert(inMemBufSize >= pagesize * 2);

H
Haojun Liao 已提交
383
  pPBuf->lruList = tdListNew(POINTER_BYTES);
H
Haojun Liao 已提交
384 385 386

  // init id hash table
  _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
387 388
  pPBuf->groupSet = taosHashInit(10, fn, true, false);
  pPBuf->assistBuf = malloc(pPBuf->pageSize + 2);  // EXTRA BYTES
H
Haojun Liao 已提交
389
  pPBuf->all = taosHashInit(10, fn, true, false);
H
Haojun Liao 已提交
390 391 392

  char path[PATH_MAX] = {0};
  taosGetTmpfilePath(dir, "paged-buf", path);
H
Haojun Liao 已提交
393
  pPBuf->path = strdup(path);
H
Haojun Liao 已提交
394

H
Haojun Liao 已提交
395
  pPBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
H
Haojun Liao 已提交
396

397 398 399
  //  qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId,
  //  pPBuf->pageSize,
  //         pPBuf->inMemPages, pPBuf->path);
H
Haojun Liao 已提交
400 401

  return TSDB_CODE_SUCCESS;
402 403
}

H
Haojun Liao 已提交
404
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) {
H
Haojun Liao 已提交
405
  pBuf->statis.getPages += 1;
406 407

  char* availablePage = NULL;
H
Haojun Liao 已提交
408 409
  if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
    availablePage = evacOneDataPage(pBuf);
410

411 412
    // Failed to allocate a new buffer page, and there is an error occurs.
    if (availablePage == NULL) {
H
Haojun Liao 已提交
413
      assert(0);
414 415
      return NULL;
    }
H
Haojun Liao 已提交
416 417
  }

H
Haojun Liao 已提交
418 419 420
  SPageInfo* pi = NULL;
  if (listNEles(pBuf->freePgList) != 0) {
    SListNode* pItem = tdListPopHead(pBuf->freePgList);
421
    pi = *(SPageInfo**)pItem->data;
H
Haojun Liao 已提交
422
    pi->used = true;
423
    *pageId = pi->pageId;
H
Haojun Liao 已提交
424
    tfree(pItem);
425
  } else {  // create a new pageinfo
H
Haojun Liao 已提交
426 427 428 429 430 431 432 433 434 435
    // register new id in this group
    *pageId = (++pBuf->allocateId);

    // register page id info
    pi = registerPage(pBuf, groupId, *pageId);

    // add to hash map
    taosHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES);
    pBuf->totalBufSize += pBuf->pageSize;
  }
436 437

  // add to LRU list
H
Haojun Liao 已提交
438 439
  assert(listNEles(pBuf->lruList) < pBuf->inMemPages && pBuf->inMemPages > 0);
  lruListPushFront(pBuf->lruList, pi);
440 441 442

  // allocate buf
  if (availablePage == NULL) {
H
Haojun Liao 已提交
443
    pi->pData = calloc(1, getAllocPageSize(pBuf->pageSize));  // add extract bytes in case of zipped buffer increased.
444 445 446 447 448
  } else {
    pi->pData = availablePage;
  }

  ((void**)pi->pData)[0] = pi;
S
Shengliang Guan 已提交
449
  return (void*)(GET_DATA_PAYLOAD(pi));
450 451
}

H
Haojun Liao 已提交
452
void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
H
Haojun Liao 已提交
453 454
  assert(pBuf != NULL && id >= 0);
  pBuf->statis.getPages += 1;
455

H
Haojun Liao 已提交
456
  SPageInfo** pi = taosHashGet(pBuf->all, &id, sizeof(int32_t));
457 458
  assert(pi != NULL && *pi != NULL);

S
Shengliang Guan 已提交
459
  if ((*pi)->pData != NULL) {  // it is in memory
460
    // no need to update the LRU list if only one page exists
H
Haojun Liao 已提交
461
    if (pBuf->numOfPages == 1) {
462
      (*pi)->used = true;
S
Shengliang Guan 已提交
463
      return (void*)(GET_DATA_PAYLOAD(*pi));
464 465
    }

S
Shengliang Guan 已提交
466
    SPageInfo** pInfo = (SPageInfo**)((*pi)->pn->data);
467 468
    assert(*pInfo == *pi);

H
Haojun Liao 已提交
469
    lruListMoveToFront(pBuf->lruList, (*pi));
470 471
    (*pi)->used = true;

S
Shengliang Guan 已提交
472 473
    return (void*)(GET_DATA_PAYLOAD(*pi));
  } else {  // not in memory
474
    assert((*pi)->pData == NULL && (*pi)->pn == NULL && (*pi)->length >= 0 && (*pi)->offset >= 0);
475 476

    char* availablePage = NULL;
H
Haojun Liao 已提交
477 478
    if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
      availablePage = evacOneDataPage(pBuf);
479 480 481
      if (availablePage == NULL) {
        return NULL;
      }
482 483 484
    }

    if (availablePage == NULL) {
H
Haojun Liao 已提交
485
      (*pi)->pData = calloc(1, getAllocPageSize(pBuf->pageSize));
486 487 488 489
    } else {
      (*pi)->pData = availablePage;
    }

H
Haojun Liao 已提交
490
    // set the ptr to the new SPageInfo
491 492
    ((void**)((*pi)->pData))[0] = (*pi);

H
Haojun Liao 已提交
493
    lruListPushFront(pBuf->lruList, *pi);
494 495
    (*pi)->used = true;

H
Haojun Liao 已提交
496 497 498 499 500
    int32_t code = loadPageFromDisk(pBuf, *pi);
    if (code != 0) {
      return NULL;
    }

S
Shengliang Guan 已提交
501
    return (void*)(GET_DATA_PAYLOAD(*pi));
502 503 504
  }
}

H
Haojun Liao 已提交
505 506
void releaseBufPage(SDiskbasedBuf* pBuf, void* page) {
  assert(pBuf != NULL && page != NULL);
H
Haojun Liao 已提交
507
  SPageInfo* ppi = getPageInfoFromPayload(page);
H
Haojun Liao 已提交
508
  releaseBufPageInfo(pBuf, ppi);
509 510
}

H
Haojun Liao 已提交
511
void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
H
Haojun Liao 已提交
512
  assert(pi->pData != NULL && pi->used == true);
513 514

  pi->used = false;
H
Haojun Liao 已提交
515
  pBuf->statis.releasePages += 1;
516 517
}

H
Haojun Liao 已提交
518
size_t getNumOfBufGroupId(const SDiskbasedBuf* pBuf) { return taosHashGetSize(pBuf->groupSet); }
519

H
Haojun Liao 已提交
520
size_t getTotalBufSize(const SDiskbasedBuf* pBuf) { return (size_t)pBuf->totalBufSize; }
521

H
Haojun Liao 已提交
522 523
SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId) {
  assert(pBuf != NULL);
524

H
Haojun Liao 已提交
525
  char** p = taosHashGet(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t));
526
  if (p == NULL) {  // it is a new group id
H
Haojun Liao 已提交
527
    return pBuf->emptyDummyIdList;
528
  } else {
S
Shengliang Guan 已提交
529
    return (SArray*)(*p);
530 531 532
  }
}

H
Haojun Liao 已提交
533
void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
H
Haojun Liao 已提交
534
  if (pBuf == NULL) {
535 536 537
    return;
  }

H
Haojun Liao 已提交
538
  dBufPrintStatis(pBuf);
H
Haojun Liao 已提交
539

540
  if (pBuf->pFile != NULL) {
S
Shengliang Guan 已提交
541 542 543 544 545
    uDebug(
        "Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page "
        "size:%.2f Kb, %" PRIx64 "\n",
        pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
        listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->qId);
546

S
Shengliang Guan 已提交
547
    taosCloseFile(&pBuf->pFile);
548
  } else {
S
Shengliang Guan 已提交
549
    uDebug("Paged buffer closed, total:%.2f Kb, no file created, %" PRIx64, pBuf->totalBufSize / 1024.0, pBuf->qId);
550 551
  }

H
Haojun Liao 已提交
552 553
  // print the statistics information
  {
S
Shengliang Guan 已提交
554 555 556 557 558 559
    SDiskbasedBufStatis* ps = &pBuf->statis;
    uDebug(
        "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f "
        "Kb\n",
        ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f,
        ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages));
H
Haojun Liao 已提交
560
  }
561

H
Haojun Liao 已提交
562 563 564 565
  remove(pBuf->path);
  tfree(pBuf->path);

  SArray** p = taosHashIterate(pBuf->groupSet, NULL);
S
Shengliang Guan 已提交
566
  while (p) {
567
    size_t n = taosArrayGetSize(*p);
S
Shengliang Guan 已提交
568
    for (int32_t i = 0; i < n; ++i) {
569 570 571 572 573 574
      SPageInfo* pi = taosArrayGetP(*p, i);
      tfree(pi->pData);
      tfree(pi);
    }

    taosArrayDestroy(*p);
H
Haojun Liao 已提交
575
    p = taosHashIterate(pBuf->groupSet, p);
576 577
  }

H
Haojun Liao 已提交
578
  tdListFree(pBuf->lruList);
H
Haojun Liao 已提交
579 580
  tdListFree(pBuf->freePgList);

H
Haojun Liao 已提交
581
  taosArrayDestroy(pBuf->emptyDummyIdList);
H
Haojun Liao 已提交
582 583
  taosArrayDestroy(pBuf->pFree);

H
Haojun Liao 已提交
584 585
  taosHashCleanup(pBuf->groupSet);
  taosHashCleanup(pBuf->all);
586

H
Haojun Liao 已提交
587 588
  tfree(pBuf->assistBuf);
  tfree(pBuf);
589 590
}

591
SPageInfo* getLastPageInfo(SIDList pList) {
S
Shengliang Guan 已提交
592
  size_t     size = taosArrayGetSize(pList);
593 594 595 596
  SPageInfo* pPgInfo = taosArrayGetP(pList, size - 1);
  return pPgInfo;
}

597
int32_t getPageId(const SPageInfo* pPgInfo) {
598 599 600 601
  ASSERT(pPgInfo != NULL);
  return pPgInfo->pageId;
}

S
Shengliang Guan 已提交
602
int32_t getBufPageSize(const SDiskbasedBuf* pBuf) { return pBuf->pageSize; }
H
Haojun Liao 已提交
603

S
Shengliang Guan 已提交
604
int32_t getNumOfInMemBufPages(const SDiskbasedBuf* pBuf) { return pBuf->inMemPages; }
H
Haojun Liao 已提交
605

S
Shengliang Guan 已提交
606
bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf) { return pBuf->fileSize == 0; }
H
Haojun Liao 已提交
607

H
Haojun Liao 已提交
608
void setBufPageDirty(void* pPage, bool dirty) {
H
Haojun Liao 已提交
609
  SPageInfo* ppi = getPageInfoFromPayload(pPage);
H
Haojun Liao 已提交
610
  ppi->dirty = dirty;
611 612
}

613
void setBufPageCompressOnDisk(SDiskbasedBuf* pBuf, bool comp) { pBuf->comp = comp; }
H
Haojun Liao 已提交
614

615
void dBufSetBufPageRecycled(SDiskbasedBuf* pBuf, void* pPage) {
H
Haojun Liao 已提交
616 617
  SPageInfo* ppi = getPageInfoFromPayload(pPage);

618
  ppi->used = false;
H
Haojun Liao 已提交
619 620
  ppi->dirty = false;

H
Haojun Liao 已提交
621 622 623 624 625 626
  // add this pageinfo into the free page info list
  SListNode* pNode = tdListPopNode(pBuf->lruList, ppi->pn);
  tfree(ppi->pData);
  tfree(pNode);

  tdListAppend(pBuf->freePgList, &ppi);
H
Haojun Liao 已提交
627
}
H
Haojun Liao 已提交
628

629
void dBufSetPrintInfo(SDiskbasedBuf* pBuf) { pBuf->printStatis = true; }
630

S
Shengliang Guan 已提交
631
SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf) { return pBuf->statis; }
H
Haojun Liao 已提交
632

H
Haojun Liao 已提交
633
void dBufPrintStatis(const SDiskbasedBuf* pBuf) {
H
Haojun Liao 已提交
634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649
  if (!pBuf->printStatis) {
    return;
  }

  const SDiskbasedBufStatis* ps = &pBuf->statis;

  printf(
      "Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page size:%.2f "
      "Kb, %" PRIx64 "\n",
      pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
      listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->qId);

  printf(
      "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f Kb\n",
      ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->loadPages,
      ps->loadBytes / (1024.0 * ps->loadPages));
650
}