tpagedbuf.c 16.8 KB
Newer Older
H
Haojun Liao 已提交
1 2
#include "os.h"
#include "ulog.h"
H
Haojun Liao 已提交
3
#include "tpagedbuf.h"
4 5
#include "taoserror.h"
#include "tcompression.h"
H
Haojun Liao 已提交
6
#include "thash.h"
7 8 9 10

#define GET_DATA_PAYLOAD(_p) ((char *)(_p)->pData + POINTER_BYTES)
#define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages)

11 12 13 14 15 16
typedef struct SFreeListItem {
  int32_t offset;
  int32_t len;
} SFreeListItem;

typedef struct SPageDiskInfo {
17
  int64_t  offset;
H
Haojun Liao 已提交
18
  int32_t  length;
19 20 21 22 23
} SPageDiskInfo;

typedef struct SPageInfo {
  SListNode*    pn;       // point to list node
  void*         pData;
24
  int64_t       offset;
25
  int32_t       pageId;
26 27 28
  int32_t       length:30;
  bool          used:1;     // set current page is in used
  bool          dirty:1;    // set current buffer page is dirty or not
29 30 31 32 33
} SPageInfo;

typedef struct SDiskbasedBuf {
  int32_t   numOfPages;
  int64_t   totalBufSize;
H
Haojun Liao 已提交
34
  uint64_t  fileSize;            // disk file size
35 36 37 38 39 40 41 42 43 44 45 46
  FILE*     file;
  int32_t   allocateId;          // allocated page id
  char*     path;                // file path
  int32_t   pageSize;            // current used page size
  int32_t   inMemPages;          // numOfPages that are allocated in memory
  SHashObj* groupSet;            // id hash table
  SHashObj* all;
  SList*    lruList;
  void*     emptyDummyIdList;    // dummy id list
  void*     assistBuf;           // assistant buffer for compress/decompress data
  SArray*   pFree;               // free area in file
  bool      comp;                // compressed before flushed to disk
H
Haojun Liao 已提交
47
  uint64_t  nextPos;             // next page flush position
48 49

  uint64_t  qId;                 // for debug purpose
H
Haojun Liao 已提交
50
  bool      printStatis;         // Print statistics info when closing this buffer.
H
Haojun Liao 已提交
51
  SDiskbasedBufStatis statis;
52 53
} SDiskbasedBuf;

H
Haojun Liao 已提交
54
static void printStatisData(const SDiskbasedBuf* pBuf);
55

H
Haojun Liao 已提交
56 57 58 59
  int32_t createDiskbasedBuffer(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) {
  *pBuf = calloc(1, sizeof(SDiskbasedBuf));

  SDiskbasedBuf* pResBuf = *pBuf;
60
  if (pResBuf == NULL) {
S
Shengliang Guan 已提交
61
    return TSDB_CODE_OUT_OF_MEMORY;  
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
  }

  pResBuf->pageSize     = pagesize;
  pResBuf->numOfPages   = 0;                        // all pages are in buffer in the first place
  pResBuf->totalBufSize = 0;
  pResBuf->inMemPages   = inMemBufSize/pagesize;    // maximum allowed pages, it is a soft limit.
  pResBuf->allocateId   = -1;
  pResBuf->comp         = true;
  pResBuf->file         = NULL;
  pResBuf->qId          = qId;
  pResBuf->fileSize     = 0;

  // at least more than 2 pages must be in memory
  assert(inMemBufSize >= pagesize * 2);

  pResBuf->lruList = tdListNew(POINTER_BYTES);

  // init id hash table
  pResBuf->groupSet  = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
  pResBuf->assistBuf = malloc(pResBuf->pageSize + 2); // EXTRA BYTES
  pResBuf->all = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);

  char path[PATH_MAX] = {0};
H
Haojun Liao 已提交
85
  taosGetTmpfilePath(dir, "qbuf", path);
86 87 88 89 90 91 92 93 94 95
  pResBuf->path = strdup(path);

  pResBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));

//  qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId, pResBuf->pageSize,
//         pResBuf->inMemPages, pResBuf->path);

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
96 97 98 99
static int32_t createDiskFile(SDiskbasedBuf* pBuf) {
  pBuf->file = fopen(pBuf->path, "wb+");
  if (pBuf->file == NULL) {
//    qError("failed to create tmp file: %s on disk. %s", pBuf->path, strerror(errno));
100 101 102 103 104 105
    return TAOS_SYSTEM_ERROR(errno);
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
106 107
static char* doCompressData(void* data, int32_t srcSize, int32_t *dst, SDiskbasedBuf* pBuf) { // do nothing
  if (!pBuf->comp) {
108 109 110 111
    *dst = srcSize;
    return data;
  }

H
Haojun Liao 已提交
112
  *dst = tsCompressString(data, srcSize, 1, pBuf->assistBuf, srcSize, ONE_STAGE_COMP, NULL, 0);
113

H
Haojun Liao 已提交
114
  memcpy(data, pBuf->assistBuf, *dst);
115 116 117
  return data;
}

H
Haojun Liao 已提交
118 119
static char* doDecompressData(void* data, int32_t srcSize, int32_t *dst, SDiskbasedBuf* pBuf) { // do nothing
  if (!pBuf->comp) {
120 121 122 123
    *dst = srcSize;
    return data;
  }

H
Haojun Liao 已提交
124
  *dst = tsDecompressString(data, srcSize, 1, pBuf->assistBuf, pBuf->pageSize+sizeof(SFilePage), ONE_STAGE_COMP, NULL, 0);
125
  if (*dst > 0) {
H
Haojun Liao 已提交
126
    memcpy(data, pBuf->assistBuf, *dst);
127 128 129 130
  }
  return data;
}

H
Haojun Liao 已提交
131 132 133
static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) {
  if (pBuf->pFree == NULL) {
    return pBuf->nextPos;
134 135 136
  } else {
    int32_t offset = -1;

H
Haojun Liao 已提交
137
    size_t num = taosArrayGetSize(pBuf->pFree);
138
    for(int32_t i = 0; i < num; ++i) {
H
Haojun Liao 已提交
139
      SFreeListItem* pi = taosArrayGet(pBuf->pFree, i);
140 141 142 143 144 145 146 147 148 149
      if (pi->len >= size) {
        offset = pi->offset;
        pi->offset += (int32_t)size;
        pi->len -= (int32_t)size;

        return offset;
      }
    }

    // no available recycle space, allocate new area in file
H
Haojun Liao 已提交
150
    return pBuf->nextPos;
151 152 153
  }
}

H
Haojun Liao 已提交
154
static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
155 156 157
  assert(!pg->used && pg->pData != NULL);

  int32_t size = -1;
H
Haojun Liao 已提交
158 159 160 161 162
  char*   t = NULL;
  if (pg->offset == -1 || pg->dirty) {
    SFilePage* pPage = (SFilePage*) GET_DATA_PAYLOAD(pg);
    t = doCompressData(pPage->data, pBuf->pageSize, &size, pBuf);
  }
163 164

  // this page is flushed to disk for the first time
165
  if (pg->offset == -1) {
H
Haojun Liao 已提交
166 167 168 169
    assert(pg->dirty == true);

    pg->offset = allocatePositionInFile(pBuf, size);
    pBuf->nextPos += size;
170

H
Haojun Liao 已提交
171
    int32_t ret = fseek(pBuf->file, pg->offset, SEEK_SET);
H
Haojun Liao 已提交
172 173 174 175
    if (ret != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return NULL;
    }
176

H
Haojun Liao 已提交
177
    ret = (int32_t) fwrite(t, 1, size, pBuf->file);
H
Haojun Liao 已提交
178 179 180 181
    if (ret != size) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return NULL;
    }
182

H
Haojun Liao 已提交
183 184
    if (pBuf->fileSize < pg->offset + size) {
      pBuf->fileSize = pg->offset + size;
185
    }
H
Haojun Liao 已提交
186 187 188 189

    pBuf->statis.flushBytes += size;
    pBuf->statis.flushPages += 1;
  } else if (pg->dirty) {
190
    // length becomes greater, current space is not enough, allocate new place, otherwise, do nothing
191
    if (pg->length < size) {
192
      // 1. add current space to free list
193
      SPageDiskInfo dinfo = {.length = pg->length, .offset = pg->offset};
H
Haojun Liao 已提交
194
      taosArrayPush(pBuf->pFree, &dinfo);
195 196

      // 2. allocate new position, and update the info
H
Haojun Liao 已提交
197 198
      pg->offset = allocatePositionInFile(pBuf, size);
      pBuf->nextPos += size;
199 200
    }

H
Haojun Liao 已提交
201 202
    // 3. write to disk.
    int32_t ret = fseek(pBuf->file, pg->offset, SEEK_SET);
H
Haojun Liao 已提交
203 204 205
    if (ret != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return NULL;
206 207
    }

H
Haojun Liao 已提交
208
    ret = (int32_t)fwrite(t, 1, size, pBuf->file);
H
Haojun Liao 已提交
209 210 211
    if (ret != size) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return NULL;
212 213
    }

H
Haojun Liao 已提交
214 215
    if (pBuf->fileSize < pg->offset + size) {
      pBuf->fileSize = pg->offset + size;
216
    }
H
Haojun Liao 已提交
217 218 219

    pBuf->statis.flushBytes += size;
    pBuf->statis.flushPages += 1;
220 221
  }

H
Haojun Liao 已提交
222
  char* pDataBuf = pg->pData;
H
Haojun Liao 已提交
223
  memset(pDataBuf, 0, pBuf->pageSize + sizeof(SFilePage));
224

H
Haojun Liao 已提交
225
  pg->pData  = NULL;  // this means the data is not in buffer
226
  pg->length = size;
H
Haojun Liao 已提交
227
  pg->dirty  = false;
228

H
Haojun Liao 已提交
229
  return pDataBuf;
230 231
}

H
Haojun Liao 已提交
232
static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
233
  int32_t ret = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
234
  assert(((int64_t) pBuf->numOfPages * pBuf->pageSize) == pBuf->totalBufSize && pBuf->numOfPages >= pBuf->inMemPages);
235

H
Haojun Liao 已提交
236 237
  if (pBuf->file == NULL) {
    if ((ret = createDiskFile(pBuf)) != TSDB_CODE_SUCCESS) {
238 239 240 241 242
      terrno = ret;
      return NULL;
    }
  }

H
Haojun Liao 已提交
243
  return doFlushPageToDisk(pBuf, pg);
244 245 246
}

// load file block data in disk
H
Haojun Liao 已提交
247 248 249 250 251 252 253
static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
  int32_t ret = fseek(pBuf->file, pg->offset, SEEK_SET);
  if (ret != 0) {
    ret = TAOS_SYSTEM_ERROR(errno);
    return ret;
  }

H
Haojun Liao 已提交
254 255
  SFilePage* pPage = (SFilePage*) GET_DATA_PAYLOAD(pg);
  ret = (int32_t)fread(pPage->data, 1, pg->length, pBuf->file);
256
  if (ret != pg->length) {
H
Haojun Liao 已提交
257 258
    ret = TAOS_SYSTEM_ERROR(errno);
    return ret;
259 260
  }

H
Haojun Liao 已提交
261 262
  pBuf->statis.loadBytes += pg->length;
  pBuf->statis.loadPages += 1;
263 264

  int32_t fullSize = 0;
H
Haojun Liao 已提交
265
  doDecompressData(pPage->data, pg->length, &fullSize, pBuf);
H
Haojun Liao 已提交
266
  return 0;
267 268
}

H
Haojun Liao 已提交
269 270
static SIDList addNewGroup(SDiskbasedBuf* pBuf, int32_t groupId) {
  assert(taosHashGet(pBuf->groupSet, (const char*) &groupId, sizeof(int32_t)) == NULL);
271 272

  SArray* pa = taosArrayInit(1, POINTER_BYTES);
H
Haojun Liao 已提交
273
  int32_t ret = taosHashPut(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t), &pa, POINTER_BYTES);
274 275 276 277 278
  assert(ret == 0);

  return pa;
}

H
Haojun Liao 已提交
279
static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t pageId) {
280 281
  SIDList list = NULL;

H
Haojun Liao 已提交
282
  char** p = taosHashGet(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t));
283
  if (p == NULL) {  // it is a new group id
H
Haojun Liao 已提交
284
    list = addNewGroup(pBuf, groupId);
285 286 287 288
  } else {
    list = (SIDList) (*p);
  }

H
Haojun Liao 已提交
289
  pBuf->numOfPages += 1;
290 291 292 293 294

  SPageInfo* ppi = malloc(sizeof(SPageInfo));//{ .info = PAGE_INFO_INITIALIZER, .pageId = pageId, .pn = NULL};

  ppi->pageId = pageId;
  ppi->pData  = NULL;
295 296
  ppi->offset = -1;
  ppi->length = -1;
297 298 299 300 301 302
  ppi->used   = true;
  ppi->pn     = NULL;

  return *(SPageInfo**) taosArrayPush(list, &ppi);
}

H
Haojun Liao 已提交
303
static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
304
  SListIter iter = {0};
H
Haojun Liao 已提交
305
  tdListInitIter(pBuf->lruList, &iter, TD_LIST_BACKWARD);
306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321

  SListNode* pn = NULL;
  while((pn = tdListNext(&iter)) != NULL) {
    assert(pn != NULL);

    SPageInfo* pageInfo = *(SPageInfo**) pn->data;
    assert(pageInfo->pageId >= 0 && pageInfo->pn == pn);

    if (!pageInfo->used) {
      break;
    }
  }

  return pn;
}

H
Haojun Liao 已提交
322
static char* evacOneDataPage(SDiskbasedBuf* pBuf) {
323
  char* bufPage = NULL;
H
Haojun Liao 已提交
324
  SListNode* pn = getEldestUnrefedPage(pBuf);
325 326 327

  // all pages are referenced by user, try to allocate new space
  if (pn == NULL) {
328
    assert(0);
H
Haojun Liao 已提交
329
    int32_t prev = pBuf->inMemPages;
330 331

    // increase by 50% of previous mem pages
H
Haojun Liao 已提交
332
    pBuf->inMemPages = (int32_t)(pBuf->inMemPages * 1.5f);
333

H
Haojun Liao 已提交
334 335
//    qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pBuf, prev,
//          pBuf->inMemPages, pBuf->pageSize);
336
  } else {
H
Haojun Liao 已提交
337
    tdListPopNode(pBuf->lruList, pn);
338 339 340 341 342 343 344

    SPageInfo* d = *(SPageInfo**) pn->data;
    assert(d->pn == pn);

    d->pn = NULL;
    tfree(pn);

H
Haojun Liao 已提交
345
    bufPage = flushPageToDisk(pBuf, d);
346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
  }

  return bufPage;
}

static void lruListPushFront(SList *pList, SPageInfo* pi) {
  tdListPrepend(pList, &pi);
  SListNode* front = tdListGetHead(pList);
  pi->pn = front;
}

static void lruListMoveToFront(SList *pList, SPageInfo* pi) {
  tdListPopNode(pList, pi->pn);
  tdListPrependNode(pList, pi->pn);
}

static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) {
  return pageSize + POINTER_BYTES + 2 + sizeof(SFilePage);
}

H
Haojun Liao 已提交
366 367
SFilePage* getNewDataBuf(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) {
  pBuf->statis.getPages += 1;
368 369

  char* availablePage = NULL;
H
Haojun Liao 已提交
370 371
  if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
    availablePage = evacOneDataPage(pBuf);
372

373 374 375 376
    // Failed to allocate a new buffer page, and there is an error occurs.
    if (availablePage == NULL) {
      return NULL;
    }
H
Haojun Liao 已提交
377 378
  }

379
  // register new id in this group
H
Haojun Liao 已提交
380
  *pageId = (++pBuf->allocateId);
381 382

  // register page id info
H
Haojun Liao 已提交
383
  SPageInfo* pi = registerPage(pBuf, groupId, *pageId);
384 385

  // add to LRU list
H
Haojun Liao 已提交
386 387
  assert(listNEles(pBuf->lruList) < pBuf->inMemPages && pBuf->inMemPages > 0);
  lruListPushFront(pBuf->lruList, pi);
388 389

  // add to hash map
H
Haojun Liao 已提交
390
  taosHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES);
391 392 393

  // allocate buf
  if (availablePage == NULL) {
H
Haojun Liao 已提交
394
    pi->pData = calloc(1, getAllocPageSize(pBuf->pageSize));  // add extract bytes in case of zipped buffer increased.
395 396 397 398
  } else {
    pi->pData = availablePage;
  }

H
Haojun Liao 已提交
399
  pBuf->totalBufSize += pBuf->pageSize;
400 401 402 403 404 405 406

  ((void**)pi->pData)[0] = pi;
  pi->used = true;

  return (void *)(GET_DATA_PAYLOAD(pi));
}

H
Haojun Liao 已提交
407 408 409
SFilePage* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
  assert(pBuf != NULL && id >= 0);
  pBuf->statis.getPages += 1;
410

H
Haojun Liao 已提交
411
  SPageInfo** pi = taosHashGet(pBuf->all, &id, sizeof(int32_t));
412 413 414 415
  assert(pi != NULL && *pi != NULL);

  if ((*pi)->pData != NULL) { // it is in memory
    // no need to update the LRU list if only one page exists
H
Haojun Liao 已提交
416
    if (pBuf->numOfPages == 1) {
417 418 419 420 421 422 423
      (*pi)->used = true;
      return (void *)(GET_DATA_PAYLOAD(*pi));
    }

    SPageInfo** pInfo = (SPageInfo**) ((*pi)->pn->data);
    assert(*pInfo == *pi);

H
Haojun Liao 已提交
424
    lruListMoveToFront(pBuf->lruList, (*pi));
425 426 427 428 429
    (*pi)->used = true;

    return (void *)(GET_DATA_PAYLOAD(*pi));

  } else { // not in memory
430
    assert((*pi)->pData == NULL && (*pi)->pn == NULL && (*pi)->length >= 0 && (*pi)->offset >= 0);
431 432

    char* availablePage = NULL;
H
Haojun Liao 已提交
433 434
    if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
      availablePage = evacOneDataPage(pBuf);
435 436 437
      if (availablePage == NULL) {
        return NULL;
      }
438 439 440
    }

    if (availablePage == NULL) {
H
Haojun Liao 已提交
441
      (*pi)->pData = calloc(1, getAllocPageSize(pBuf->pageSize));
442 443 444 445 446 447
    } else {
      (*pi)->pData = availablePage;
    }

    ((void**)((*pi)->pData))[0] = (*pi);

H
Haojun Liao 已提交
448
    lruListPushFront(pBuf->lruList, *pi);
449 450
    (*pi)->used = true;

H
Haojun Liao 已提交
451 452 453 454 455
    int32_t code = loadPageFromDisk(pBuf, *pi);
    if (code != 0) {
      return NULL;
    }

456 457 458 459
    return (void *)(GET_DATA_PAYLOAD(*pi));
  }
}

H
Haojun Liao 已提交
460 461 462 463
void releaseBufPage(SDiskbasedBuf* pBuf, void* page) {
  assert(pBuf != NULL && page != NULL);
  int32_t offset = offsetof(SPageInfo, pData);
  char* p = page - offset;
464 465

  SPageInfo* ppi = ((SPageInfo**) p)[0];
H
Haojun Liao 已提交
466
  releaseBufPageInfo(pBuf, ppi);
467 468
}

H
Haojun Liao 已提交
469
void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
470 471 472
  assert(pi->pData != NULL && pi->used);

  pi->used = false;
H
Haojun Liao 已提交
473
  pBuf->statis.releasePages += 1;
474 475
}

H
Haojun Liao 已提交
476
size_t getNumOfResultBufGroupId(const SDiskbasedBuf* pBuf) { return taosHashGetSize(pBuf->groupSet); }
477

H
Haojun Liao 已提交
478
size_t getTotalBufSize(const SDiskbasedBuf* pBuf) { return (size_t)pBuf->totalBufSize; }
479

H
Haojun Liao 已提交
480 481
SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId) {
  assert(pBuf != NULL);
482

H
Haojun Liao 已提交
483
  char** p = taosHashGet(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t));
484
  if (p == NULL) {  // it is a new group id
H
Haojun Liao 已提交
485
    return pBuf->emptyDummyIdList;
486 487 488 489 490
  } else {
    return (SArray*) (*p);
  }
}

H
Haojun Liao 已提交
491 492
void destroyResultBuf(SDiskbasedBuf* pBuf) {
  if (pBuf == NULL) {
493 494 495
    return;
  }

H
Haojun Liao 已提交
496 497 498 499 500 501
  printStatisData(pBuf);

  if (pBuf->file != NULL) {
  uDebug("Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page size:%.2f Kb, %"PRIx64"\n",
      pBuf->totalBufSize/1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
      listNEles(pBuf->lruList), pBuf->fileSize/1024.0, pBuf->pageSize/1024.0f, pBuf->qId);
502

H
Haojun Liao 已提交
503
  fclose(pBuf->file);
504
  } else {
H
Haojun Liao 已提交
505
    uDebug("Paged buffer closed, total:%.2f Kb, no file created, %"PRIx64, pBuf->totalBufSize/1024.0, pBuf->qId);
506 507
  }

H
Haojun Liao 已提交
508 509 510 511 512 513 514
  // print the statistics information
  {
    SDiskbasedBufStatis *ps = &pBuf->statis;
    uDebug("Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f Kb\n"
        , ps->getPages, ps->releasePages, ps->flushBytes/1024.0f, ps->flushPages, ps->loadBytes/1024.0f, ps->loadPages
        , ps->loadBytes/(1024.0 * ps->loadPages));
  }
515

H
Haojun Liao 已提交
516 517 518 519
  remove(pBuf->path);
  tfree(pBuf->path);

  SArray** p = taosHashIterate(pBuf->groupSet, NULL);
520 521 522 523 524 525 526 527 528
  while(p) {
    size_t n = taosArrayGetSize(*p);
    for(int32_t i = 0; i < n; ++i) {
      SPageInfo* pi = taosArrayGetP(*p, i);
      tfree(pi->pData);
      tfree(pi);
    }

    taosArrayDestroy(*p);
H
Haojun Liao 已提交
529
    p = taosHashIterate(pBuf->groupSet, p);
530 531
  }

H
Haojun Liao 已提交
532 533 534 535
  tdListFree(pBuf->lruList);
  taosArrayDestroy(pBuf->emptyDummyIdList);
  taosHashCleanup(pBuf->groupSet);
  taosHashCleanup(pBuf->all);
536

H
Haojun Liao 已提交
537 538
  tfree(pBuf->assistBuf);
  tfree(pBuf);
539 540
}

541
SPageInfo* getLastPageInfo(SIDList pList) {
542
  size_t size = taosArrayGetSize(pList);
543 544 545 546
  SPageInfo* pPgInfo = taosArrayGetP(pList, size - 1);
  return pPgInfo;
}

547
int32_t getPageId(const SPageInfo* pPgInfo) {
548 549 550 551
  ASSERT(pPgInfo != NULL);
  return pPgInfo->pageId;
}

H
Haojun Liao 已提交
552 553 554 555
int32_t getBufPageSize(const SDiskbasedBuf* pBuf) {
  return pBuf->pageSize;
}

H
Haojun Liao 已提交
556
int32_t getNumOfInMemBufPages(const SDiskbasedBuf* pBuf) {
H
Haojun Liao 已提交
557
  return pBuf->inMemPages;
H
Haojun Liao 已提交
558 559
}

H
Haojun Liao 已提交
560 561 562 563 564 565 566 567 568 569
bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf) {
  return pBuf->fileSize == 0;
}

void setBufPageDirty(SFilePage* pPage, bool dirty) {
  int32_t offset = offsetof(SPageInfo, pData);  // todo extract method
  char* p = (char*)pPage - offset;

  SPageInfo* ppi = ((SPageInfo**) p)[0];
  ppi->dirty = dirty;
570 571
}

H
Haojun Liao 已提交
572 573
void printStatisBeforeClose(SDiskbasedBuf* pBuf) {
  pBuf->printStatis = true;
574
}
575

H
Haojun Liao 已提交
576 577 578 579
SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf) {
  return pBuf->statis;
}

H
Haojun Liao 已提交
580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596
void printStatisData(const SDiskbasedBuf* pBuf) {
  if (!pBuf->printStatis) {
    return;
  }

  const SDiskbasedBufStatis* ps = &pBuf->statis;

  printf(
      "Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page size:%.2f "
      "Kb, %" PRIx64 "\n",
      pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
      listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->qId);

  printf(
      "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f Kb\n",
      ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->loadPages,
      ps->loadBytes / (1024.0 * ps->loadPages));
597
}