tpagedbuf.c 19.9 KB
Newer Older
S
Shengliang Guan 已提交
1
#define _DEFAULT_SOURCE
H
Haojun Liao 已提交
2
#include "tpagedbuf.h"
3 4
#include "taoserror.h"
#include "tcompression.h"
H
Haojun Liao 已提交
5
#include "thash.h"
S
Shengliang Guan 已提交
6
#include "tlog.h"
7

S
Shengliang Guan 已提交
8
#define GET_DATA_PAYLOAD(_p)          ((char*)(_p)->pData + POINTER_BYTES)
9 10
#define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages)

11
typedef struct SPageDiskInfo {
S
Shengliang Guan 已提交
12 13
  int64_t offset;
  int32_t length;
H
Haojun Liao 已提交
14
} SPageDiskInfo, SFreeListItem;
15

H
Haojun Liao 已提交
16
struct SPageInfo {
17
  SListNode* pn;  // point to list node struct
S
Shengliang Guan 已提交
18 19 20
  void*      pData;
  int64_t    offset;
  int32_t    pageId;
21
  int32_t    length : 29;
S
Shengliang Guan 已提交
22 23
  bool       used : 1;   // set current page is in used
  bool       dirty : 1;  // set current buffer page is dirty or not
H
Haojun Liao 已提交
24
};
25

H
Haojun Liao 已提交
26
struct SDiskbasedBuf {
27 28
  int32_t   numOfPages;
  int64_t   totalBufSize;
S
Shengliang Guan 已提交
29
  uint64_t  fileSize;  // disk file size
30
  TdFilePtr pFile;
S
Shengliang Guan 已提交
31 32 33 34
  int32_t   allocateId;  // allocated page id
  char*     path;        // file path
  int32_t   pageSize;    // current used page size
  int32_t   inMemPages;  // numOfPages that are allocated in memory
35 36
  SList*    freePgList;  // free page list
  SHashObj* groupSet;    // id hash table, todo remove it
37 38
  SHashObj* all;
  SList*    lruList;
S
Shengliang Guan 已提交
39 40 41 42 43 44
  void*     emptyDummyIdList;  // dummy id list
  void*     assistBuf;         // assistant buffer for compress/decompress data
  SArray*   pFree;             // free area in file
  bool      comp;              // compressed before flushed to disk
  uint64_t  nextPos;           // next page flush position

H
Haojun Liao 已提交
45 46
  char*     id;          // for debug purpose
  bool      printStatis;  // Print statistics info when closing this buffer.
H
Haojun Liao 已提交
47
  SDiskbasedBufStatis statis;
H
Haojun Liao 已提交
48
};
49

H
Haojun Liao 已提交
50
static int32_t createDiskFile(SDiskbasedBuf* pBuf) {
51
  pBuf->pFile = taosOpenFile(pBuf->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
52
  if (pBuf->pFile == NULL) {
53 54 55 56 57 58
    return TAOS_SYSTEM_ERROR(errno);
  }

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
59
static char* doCompressData(void* data, int32_t srcSize, int32_t* dst, SDiskbasedBuf* pBuf) {  // do nothing
H
Haojun Liao 已提交
60
  if (!pBuf->comp) {
61 62 63 64
    *dst = srcSize;
    return data;
  }

H
Haojun Liao 已提交
65
  *dst = tsCompressString(data, srcSize, 1, pBuf->assistBuf, srcSize, ONE_STAGE_COMP, NULL, 0);
66

H
Haojun Liao 已提交
67
  memcpy(data, pBuf->assistBuf, *dst);
68 69 70
  return data;
}

S
Shengliang Guan 已提交
71
static char* doDecompressData(void* data, int32_t srcSize, int32_t* dst, SDiskbasedBuf* pBuf) {  // do nothing
H
Haojun Liao 已提交
72
  if (!pBuf->comp) {
73 74 75 76
    *dst = srcSize;
    return data;
  }

H
Haojun Liao 已提交
77
  *dst = tsDecompressString(data, srcSize, 1, pBuf->assistBuf, pBuf->pageSize, ONE_STAGE_COMP, NULL, 0);
78
  if (*dst > 0) {
H
Haojun Liao 已提交
79
    memcpy(data, pBuf->assistBuf, *dst);
80 81 82 83
  }
  return data;
}

H
Haojun Liao 已提交
84 85 86
static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) {
  if (pBuf->pFree == NULL) {
    return pBuf->nextPos;
87 88 89
  } else {
    int32_t offset = -1;

H
Haojun Liao 已提交
90
    size_t num = taosArrayGetSize(pBuf->pFree);
S
Shengliang Guan 已提交
91
    for (int32_t i = 0; i < num; ++i) {
H
Haojun Liao 已提交
92
      SFreeListItem* pi = taosArrayGet(pBuf->pFree, i);
H
Haojun Liao 已提交
93
      if (pi->length >= size) {
94 95
        offset = pi->offset;
        pi->offset += (int32_t)size;
H
Haojun Liao 已提交
96
        pi->length -= (int32_t)size;
97 98 99 100 101 102

        return offset;
      }
    }

    // no available recycle space, allocate new area in file
H
Haojun Liao 已提交
103
    return pBuf->nextPos;
104 105 106
  }
}

107
static void setPageNotInBuf(SPageInfo* pPageInfo) { pPageInfo->pData = NULL; }
H
Haojun Liao 已提交
108

109
static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + 2; }
H
Haojun Liao 已提交
110

H
Haojun Liao 已提交
111 112 113 114 115 116 117 118
/**
 *   +--------------------------+-------------------+--------------+
 *   | PTR to SPageInfo (8bytes)| Payload (PageSize)| 2 Extra Bytes|
 *   +--------------------------+-------------------+--------------+
 * @param pBuf
 * @param pg
 * @return
 */
H
Haojun Liao 已提交
119
static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
120 121
  assert(!pg->used && pg->pData != NULL);

H
Haojun Liao 已提交
122
  int32_t size = pBuf->pageSize;
H
Haojun Liao 已提交
123 124
  char*   t = NULL;
  if (pg->offset == -1 || pg->dirty) {
H
Haojun Liao 已提交
125 126 127
    void* payload = GET_DATA_PAYLOAD(pg);
    t = doCompressData(payload, pBuf->pageSize, &size, pBuf);
    assert(size >= 0);
H
Haojun Liao 已提交
128
  }
129 130

  // this page is flushed to disk for the first time
H
Haojun Liao 已提交
131 132 133
  if (pg->dirty) {
    if (pg->offset == -1) {
      assert(pg->dirty == true);
H
Haojun Liao 已提交
134

H
Haojun Liao 已提交
135 136
      pg->offset = allocatePositionInFile(pBuf, size);
      pBuf->nextPos += size;
137

138
      int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
139
      if (ret == -1) {
140 141 142
        terrno = TAOS_SYSTEM_ERROR(errno);
        return NULL;
      }
143

144 145 146 147 148
      ret = (int32_t)taosWriteFile(pBuf->pFile, t, size);
      if (ret != size) {
        terrno = TAOS_SYSTEM_ERROR(errno);
        return NULL;
      }
149

H
Haojun Liao 已提交
150 151 152
      if (pBuf->fileSize < pg->offset + size) {
        pBuf->fileSize = pg->offset + size;
      }
H
Haojun Liao 已提交
153

H
Haojun Liao 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166
      pBuf->statis.flushBytes += size;
      pBuf->statis.flushPages += 1;
    } else {
      // length becomes greater, current space is not enough, allocate new place, otherwise, do nothing
      if (pg->length < size) {
        // 1. add current space to free list
        SPageDiskInfo dinfo = {.length = pg->length, .offset = pg->offset};
        taosArrayPush(pBuf->pFree, &dinfo);

        // 2. allocate new position, and update the info
        pg->offset = allocatePositionInFile(pBuf, size);
        pBuf->nextPos += size;
      }
167

168 169
      // 3. write to disk.
      int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
170
      if (ret == -1) {
171 172 173
        terrno = TAOS_SYSTEM_ERROR(errno);
        return NULL;
      }
174

175 176 177 178 179
      ret = (int32_t)taosWriteFile(pBuf->pFile, t, size);
      if (ret != size) {
        terrno = TAOS_SYSTEM_ERROR(errno);
        return NULL;
      }
180

H
Haojun Liao 已提交
181 182 183
      if (pBuf->fileSize < pg->offset + size) {
        pBuf->fileSize = pg->offset + size;
      }
184

H
Haojun Liao 已提交
185 186
      pBuf->statis.flushBytes += size;
      pBuf->statis.flushPages += 1;
187
    }
188
  } else {  // NOTE: the size may be -1, the this recycle page has not been flushed to disk yet.
H
Haojun Liao 已提交
189
    size = pg->length;
190 191
  }

H
Haojun Liao 已提交
192
  ASSERT(size > 0 || (pg->offset == -1 && pg->length == -1));
193

H
Haojun Liao 已提交
194
  char* pDataBuf = pg->pData;
H
Haojun Liao 已提交
195
  memset(pDataBuf, 0, getAllocPageSize(pBuf->pageSize));
196

H
Haojun Liao 已提交
197
  pg->length = size;  // on disk size
H
Haojun Liao 已提交
198
  return pDataBuf;
199 200
}

H
Haojun Liao 已提交
201
static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
202
  int32_t ret = TSDB_CODE_SUCCESS;
S
Shengliang Guan 已提交
203
  assert(((int64_t)pBuf->numOfPages * pBuf->pageSize) == pBuf->totalBufSize && pBuf->numOfPages >= pBuf->inMemPages);
204

205
  if (pBuf->pFile == NULL) {
H
Haojun Liao 已提交
206
    if ((ret = createDiskFile(pBuf)) != TSDB_CODE_SUCCESS) {
207 208 209 210 211
      terrno = ret;
      return NULL;
    }
  }

H
Haojun Liao 已提交
212 213 214 215 216
  char* p = doFlushPageToDisk(pBuf, pg);
  setPageNotInBuf(pg);
  pg->dirty = false;

  return p;
217 218 219
}

// load file block data in disk
H
Haojun Liao 已提交
220
static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
221
  int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
222
  if (ret == -1) {
H
Haojun Liao 已提交
223 224 225 226
    ret = TAOS_SYSTEM_ERROR(errno);
    return ret;
  }

227
  void* pPage = (void*)GET_DATA_PAYLOAD(pg);
H
Haojun Liao 已提交
228
  ret = (int32_t)taosReadFile(pBuf->pFile, pPage, pg->length);
229
  if (ret != pg->length) {
H
Haojun Liao 已提交
230 231
    ret = TAOS_SYSTEM_ERROR(errno);
    return ret;
232 233
  }

H
Haojun Liao 已提交
234 235
  pBuf->statis.loadBytes += pg->length;
  pBuf->statis.loadPages += 1;
236 237

  int32_t fullSize = 0;
H
Haojun Liao 已提交
238
  doDecompressData(pPage, pg->length, &fullSize, pBuf);
H
Haojun Liao 已提交
239
  return 0;
240 241
}

H
Haojun Liao 已提交
242
static SIDList addNewGroup(SDiskbasedBuf* pBuf, int32_t groupId) {
S
Shengliang Guan 已提交
243
  assert(taosHashGet(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t)) == NULL);
244 245

  SArray* pa = taosArrayInit(1, POINTER_BYTES);
H
Haojun Liao 已提交
246
  int32_t ret = taosHashPut(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t), &pa, POINTER_BYTES);
247 248 249 250 251
  assert(ret == 0);

  return pa;
}

H
Haojun Liao 已提交
252
static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t pageId) {
253 254
  SIDList list = NULL;

H
Haojun Liao 已提交
255
  char** p = taosHashGet(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t));
256
  if (p == NULL) {  // it is a new group id
H
Haojun Liao 已提交
257
    list = addNewGroup(pBuf, groupId);
258
  } else {
S
Shengliang Guan 已提交
259
    list = (SIDList)(*p);
260 261
  }

H
Haojun Liao 已提交
262
  pBuf->numOfPages += 1;
263

wafwerar's avatar
wafwerar 已提交
264
  SPageInfo* ppi = taosMemoryMalloc(sizeof(SPageInfo));
265 266

  ppi->pageId = pageId;
267
  ppi->pData  = NULL;
268 269
  ppi->offset = -1;
  ppi->length = -1;
270 271 272
  ppi->used   = true;
  ppi->pn     = NULL;
  ppi->dirty  = false;
273

S
Shengliang Guan 已提交
274
  return *(SPageInfo**)taosArrayPush(list, &ppi);
275 276
}

H
Haojun Liao 已提交
277
static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
278
  SListIter iter = {0};
H
Haojun Liao 已提交
279
  tdListInitIter(pBuf->lruList, &iter, TD_LIST_BACKWARD);
280 281

  SListNode* pn = NULL;
S
Shengliang Guan 已提交
282 283
  while ((pn = tdListNext(&iter)) != NULL) {
    SPageInfo* pageInfo = *(SPageInfo**)pn->data;
284 285 286
    assert(pageInfo->pageId >= 0 && pageInfo->pn == pn);

    if (!pageInfo->used) {
287
      //      printf("%d is chosen\n", pageInfo->pageId);
288
      break;
H
Haojun Liao 已提交
289
    } else {
290
      //      printf("page %d is used, dirty:%d\n", pageInfo->pageId, pageInfo->dirty);
291 292 293
    }
  }

294 295 296 297 298 299 300 301 302
  //  int32_t pos = listNEles(pBuf->lruList);
  //  SListIter iter1 = {0};
  //  tdListInitIter(pBuf->lruList, &iter1, TD_LIST_BACKWARD);
  //  SListNode* pn1 = NULL;
  //  while((pn1 = tdListNext(&iter1)) != NULL) {
  //    SPageInfo* pageInfo = *(SPageInfo**) pn1->data;
  //    printf("page %d is used, dirty:%d, pos:%d\n", pageInfo->pageId, pageInfo->dirty, pos - 1);
  //    pos -= 1;
  //  }
H
Haojun Liao 已提交
303

304 305 306
  return pn;
}

H
Haojun Liao 已提交
307
static char* evacOneDataPage(SDiskbasedBuf* pBuf) {
S
Shengliang Guan 已提交
308
  char*      bufPage = NULL;
H
Haojun Liao 已提交
309
  SListNode* pn = getEldestUnrefedPage(pBuf);
310 311 312

  // all pages are referenced by user, try to allocate new space
  if (pn == NULL) {
313
    assert(0);
H
Haojun Liao 已提交
314
    int32_t prev = pBuf->inMemPages;
315 316

    // increase by 50% of previous mem pages
H
Haojun Liao 已提交
317
    pBuf->inMemPages = (int32_t)(pBuf->inMemPages * 1.5f);
318

S
Shengliang Guan 已提交
319 320
    //    qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pBuf, prev,
    //          pBuf->inMemPages, pBuf->pageSize);
321
  } else {
H
Haojun Liao 已提交
322
    tdListPopNode(pBuf->lruList, pn);
323

S
Shengliang Guan 已提交
324
    SPageInfo* d = *(SPageInfo**)pn->data;
325 326 327
    assert(d->pn == pn);

    d->pn = NULL;
wafwerar's avatar
wafwerar 已提交
328
    taosMemoryFreeClear(pn);
329

H
Haojun Liao 已提交
330
    bufPage = flushPageToDisk(pBuf, d);
331 332 333 334 335
  }

  return bufPage;
}

S
Shengliang Guan 已提交
336
static void lruListPushFront(SList* pList, SPageInfo* pi) {
337 338 339 340 341
  tdListPrepend(pList, &pi);
  SListNode* front = tdListGetHead(pList);
  pi->pn = front;
}

S
Shengliang Guan 已提交
342
static void lruListMoveToFront(SList* pList, SPageInfo* pi) {
343 344 345 346
  tdListPopNode(pList, pi->pn);
  tdListPrependNode(pList, pi->pn);
}

H
Haojun Liao 已提交
347
static SPageInfo* getPageInfoFromPayload(void* page) {
348
  char*   p = (char *)page - POINTER_BYTES;
H
Haojun Liao 已提交
349

350
  SPageInfo* ppi = ((SPageInfo**)p)[0];
H
Haojun Liao 已提交
351
  return ppi;
352 353
}

H
Haojun Liao 已提交
354
int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, const char* id,
355
                           const char* dir) {
wafwerar's avatar
wafwerar 已提交
356
  *pBuf = taosMemoryCalloc(1, sizeof(SDiskbasedBuf));
H
Haojun Liao 已提交
357

H
Haojun Liao 已提交
358 359
  SDiskbasedBuf* pPBuf = *pBuf;
  if (pPBuf == NULL) {
H
Haojun Liao 已提交
360 361 362
    return TSDB_CODE_OUT_OF_MEMORY;
  }

363
  pPBuf->pageSize = pagesize;
H
Haojun Liao 已提交
364
  pPBuf->numOfPages   = 0;  // all pages are in buffer in the first place
H
Haojun Liao 已提交
365
  pPBuf->totalBufSize = 0;
366 367
  pPBuf->inMemPages = inMemBufSize / pagesize;  // maximum allowed pages, it is a soft limit.
  pPBuf->allocateId = -1;
H
Haojun Liao 已提交
368 369
  pPBuf->pFile    = NULL;
  pPBuf->id       = strdup(id);
370 371 372
  pPBuf->fileSize = 0;
  pPBuf->pFree = taosArrayInit(4, sizeof(SFreeListItem));
  pPBuf->freePgList = tdListNew(POINTER_BYTES);
H
Haojun Liao 已提交
373 374 375 376

  // at least more than 2 pages must be in memory
  assert(inMemBufSize >= pagesize * 2);

H
Haojun Liao 已提交
377
  pPBuf->lruList = tdListNew(POINTER_BYTES);
H
Haojun Liao 已提交
378 379 380

  // init id hash table
  _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
381
  pPBuf->groupSet = taosHashInit(10, fn, true, false);
wafwerar's avatar
wafwerar 已提交
382
  pPBuf->assistBuf = taosMemoryMalloc(pPBuf->pageSize + 2);  // EXTRA BYTES
H
Haojun Liao 已提交
383
  pPBuf->all = taosHashInit(10, fn, true, false);
H
Haojun Liao 已提交
384 385 386

  char path[PATH_MAX] = {0};
  taosGetTmpfilePath(dir, "paged-buf", path);
H
Haojun Liao 已提交
387
  pPBuf->path = strdup(path);
H
Haojun Liao 已提交
388

H
Haojun Liao 已提交
389
  pPBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
H
Haojun Liao 已提交
390

391 392 393
  //  qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId,
  //  pPBuf->pageSize,
  //         pPBuf->inMemPages, pPBuf->path);
H
Haojun Liao 已提交
394 395

  return TSDB_CODE_SUCCESS;
396 397
}

H
Haojun Liao 已提交
398
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) {
H
Haojun Liao 已提交
399
  pBuf->statis.getPages += 1;
400 401

  char* availablePage = NULL;
H
Haojun Liao 已提交
402 403
  if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
    availablePage = evacOneDataPage(pBuf);
404

405 406
    // Failed to allocate a new buffer page, and there is an error occurs.
    if (availablePage == NULL) {
H
Haojun Liao 已提交
407
      assert(0);
408 409
      return NULL;
    }
H
Haojun Liao 已提交
410 411
  }

H
Haojun Liao 已提交
412 413 414
  SPageInfo* pi = NULL;
  if (listNEles(pBuf->freePgList) != 0) {
    SListNode* pItem = tdListPopHead(pBuf->freePgList);
415
    pi = *(SPageInfo**)pItem->data;
H
Haojun Liao 已提交
416
    pi->used = true;
417
    *pageId = pi->pageId;
wafwerar's avatar
wafwerar 已提交
418
    taosMemoryFreeClear(pItem);
419
  } else {  // create a new pageinfo
H
Haojun Liao 已提交
420 421 422 423 424 425 426 427 428 429
    // register new id in this group
    *pageId = (++pBuf->allocateId);

    // register page id info
    pi = registerPage(pBuf, groupId, *pageId);

    // add to hash map
    taosHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES);
    pBuf->totalBufSize += pBuf->pageSize;
  }
430 431

  // add to LRU list
H
Haojun Liao 已提交
432 433
  assert(listNEles(pBuf->lruList) < pBuf->inMemPages && pBuf->inMemPages > 0);
  lruListPushFront(pBuf->lruList, pi);
434 435 436

  // allocate buf
  if (availablePage == NULL) {
wafwerar's avatar
wafwerar 已提交
437
    pi->pData = taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize));  // add extract bytes in case of zipped buffer increased.
438 439 440 441 442
  } else {
    pi->pData = availablePage;
  }

  ((void**)pi->pData)[0] = pi;
S
Shengliang Guan 已提交
443
  return (void*)(GET_DATA_PAYLOAD(pi));
444 445
}

H
Haojun Liao 已提交
446
void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
H
Haojun Liao 已提交
447 448
  assert(pBuf != NULL && id >= 0);
  pBuf->statis.getPages += 1;
449

H
Haojun Liao 已提交
450
  SPageInfo** pi = taosHashGet(pBuf->all, &id, sizeof(int32_t));
451 452
  assert(pi != NULL && *pi != NULL);

S
Shengliang Guan 已提交
453
  if ((*pi)->pData != NULL) {  // it is in memory
454
    // no need to update the LRU list if only one page exists
H
Haojun Liao 已提交
455
    if (pBuf->numOfPages == 1) {
456
      (*pi)->used = true;
S
Shengliang Guan 已提交
457
      return (void*)(GET_DATA_PAYLOAD(*pi));
458 459
    }

S
Shengliang Guan 已提交
460
    SPageInfo** pInfo = (SPageInfo**)((*pi)->pn->data);
461 462
    assert(*pInfo == *pi);

H
Haojun Liao 已提交
463
    lruListMoveToFront(pBuf->lruList, (*pi));
464 465
    (*pi)->used = true;

S
Shengliang Guan 已提交
466 467
    return (void*)(GET_DATA_PAYLOAD(*pi));
  } else {  // not in memory
468
    assert((*pi)->pData == NULL && (*pi)->pn == NULL && (((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1)));
469 470

    char* availablePage = NULL;
H
Haojun Liao 已提交
471 472
    if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
      availablePage = evacOneDataPage(pBuf);
473 474 475
      if (availablePage == NULL) {
        return NULL;
      }
476 477 478
    }

    if (availablePage == NULL) {
wafwerar's avatar
wafwerar 已提交
479
      (*pi)->pData = taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize));
480 481 482 483
    } else {
      (*pi)->pData = availablePage;
    }

H
Haojun Liao 已提交
484
    // set the ptr to the new SPageInfo
485 486
    ((void**)((*pi)->pData))[0] = (*pi);

H
Haojun Liao 已提交
487
    lruListPushFront(pBuf->lruList, *pi);
488 489
    (*pi)->used = true;

490 491 492 493 494 495
    // some data has been flushed to disk, and needs to be loaded into buffer again.
    if ((*pi)->length > 0 && (*pi)->offset >= 0) {
      int32_t code = loadPageFromDisk(pBuf, *pi);
      if (code != 0) {
        return NULL;
      }
H
Haojun Liao 已提交
496 497
    }

S
Shengliang Guan 已提交
498
    return (void*)(GET_DATA_PAYLOAD(*pi));
499 500 501
  }
}

H
Haojun Liao 已提交
502 503
void releaseBufPage(SDiskbasedBuf* pBuf, void* page) {
  assert(pBuf != NULL && page != NULL);
H
Haojun Liao 已提交
504
  SPageInfo* ppi = getPageInfoFromPayload(page);
H
Haojun Liao 已提交
505
  releaseBufPageInfo(pBuf, ppi);
506 507
}

H
Haojun Liao 已提交
508
void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
H
Haojun Liao 已提交
509
  assert(pi->pData != NULL && pi->used == true);
510 511

  pi->used = false;
H
Haojun Liao 已提交
512
  pBuf->statis.releasePages += 1;
513 514
}

H
Haojun Liao 已提交
515
size_t getNumOfBufGroupId(const SDiskbasedBuf* pBuf) { return taosHashGetSize(pBuf->groupSet); }
516

H
Haojun Liao 已提交
517
size_t getTotalBufSize(const SDiskbasedBuf* pBuf) { return (size_t)pBuf->totalBufSize; }
518

H
Haojun Liao 已提交
519 520
SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId) {
  assert(pBuf != NULL);
521

H
Haojun Liao 已提交
522
  char** p = taosHashGet(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t));
523
  if (p == NULL) {  // it is a new group id
H
Haojun Liao 已提交
524
    return pBuf->emptyDummyIdList;
525
  } else {
S
Shengliang Guan 已提交
526
    return (SArray*)(*p);
527 528 529
  }
}

H
Haojun Liao 已提交
530
void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
H
Haojun Liao 已提交
531
  if (pBuf == NULL) {
532 533 534
    return;
  }

H
Haojun Liao 已提交
535
  dBufPrintStatis(pBuf);
H
Haojun Liao 已提交
536

537
  if (pBuf->pFile != NULL) {
S
Shengliang Guan 已提交
538 539
    uDebug(
        "Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page "
H
Haojun Liao 已提交
540
        "size:%.2f Kb, %s\n",
S
Shengliang Guan 已提交
541
        pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
H
Haojun Liao 已提交
542
        listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id);
543

S
Shengliang Guan 已提交
544
    taosCloseFile(&pBuf->pFile);
545
  } else {
H
Haojun Liao 已提交
546
    uDebug("Paged buffer closed, total:%.2f Kb, no file created, %s", pBuf->totalBufSize / 1024.0, pBuf->id);
547 548
  }

H
Haojun Liao 已提交
549 550
  // print the statistics information
  {
S
Shengliang Guan 已提交
551
    SDiskbasedBufStatis* ps = &pBuf->statis;
552 553 554 555 556 557 558 559 560 561
    if (ps->loadPages == 0) {
      uDebug(
          "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages)",
          ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->loadPages);
    } else {
      uDebug(
          "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f Kb",
          ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f,
          ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages));
    }
H
Haojun Liao 已提交
562
  }
563

564
  taosRemoveFile(pBuf->path);
wafwerar's avatar
wafwerar 已提交
565
  taosMemoryFreeClear(pBuf->path);
H
Haojun Liao 已提交
566 567

  SArray** p = taosHashIterate(pBuf->groupSet, NULL);
S
Shengliang Guan 已提交
568
  while (p) {
569
    size_t n = taosArrayGetSize(*p);
S
Shengliang Guan 已提交
570
    for (int32_t i = 0; i < n; ++i) {
571
      SPageInfo* pi = taosArrayGetP(*p, i);
wafwerar's avatar
wafwerar 已提交
572 573
      taosMemoryFreeClear(pi->pData);
      taosMemoryFreeClear(pi);
574 575 576
    }

    taosArrayDestroy(*p);
H
Haojun Liao 已提交
577
    p = taosHashIterate(pBuf->groupSet, p);
578 579
  }

H
Haojun Liao 已提交
580
  tdListFree(pBuf->lruList);
H
Haojun Liao 已提交
581 582
  tdListFree(pBuf->freePgList);

H
Haojun Liao 已提交
583
  taosArrayDestroy(pBuf->emptyDummyIdList);
H
Haojun Liao 已提交
584 585
  taosArrayDestroy(pBuf->pFree);

H
Haojun Liao 已提交
586 587
  taosHashCleanup(pBuf->groupSet);
  taosHashCleanup(pBuf->all);
588

wafwerar's avatar
wafwerar 已提交
589 590 591
  taosMemoryFreeClear(pBuf->id);
  taosMemoryFreeClear(pBuf->assistBuf);
  taosMemoryFreeClear(pBuf);
592 593
}

594
SPageInfo* getLastPageInfo(SIDList pList) {
S
Shengliang Guan 已提交
595
  size_t     size = taosArrayGetSize(pList);
596 597 598 599
  SPageInfo* pPgInfo = taosArrayGetP(pList, size - 1);
  return pPgInfo;
}

600
int32_t getPageId(const SPageInfo* pPgInfo) {
601 602 603 604
  ASSERT(pPgInfo != NULL);
  return pPgInfo->pageId;
}

S
Shengliang Guan 已提交
605
int32_t getBufPageSize(const SDiskbasedBuf* pBuf) { return pBuf->pageSize; }
H
Haojun Liao 已提交
606

S
Shengliang Guan 已提交
607
int32_t getNumOfInMemBufPages(const SDiskbasedBuf* pBuf) { return pBuf->inMemPages; }
H
Haojun Liao 已提交
608

S
Shengliang Guan 已提交
609
bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf) { return pBuf->fileSize == 0; }
H
Haojun Liao 已提交
610

H
Haojun Liao 已提交
611
void setBufPageDirty(void* pPage, bool dirty) {
H
Haojun Liao 已提交
612
  SPageInfo* ppi = getPageInfoFromPayload(pPage);
H
Haojun Liao 已提交
613
  ppi->dirty = dirty;
614 615
}

616
void setBufPageCompressOnDisk(SDiskbasedBuf* pBuf, bool comp) { pBuf->comp = comp; }
H
Haojun Liao 已提交
617

618
void dBufSetBufPageRecycled(SDiskbasedBuf* pBuf, void* pPage) {
H
Haojun Liao 已提交
619 620
  SPageInfo* ppi = getPageInfoFromPayload(pPage);

621
  ppi->used = false;
H
Haojun Liao 已提交
622 623
  ppi->dirty = false;

H
Haojun Liao 已提交
624 625
  // add this pageinfo into the free page info list
  SListNode* pNode = tdListPopNode(pBuf->lruList, ppi->pn);
wafwerar's avatar
wafwerar 已提交
626 627
  taosMemoryFreeClear(ppi->pData);
  taosMemoryFreeClear(pNode);
H
Haojun Liao 已提交
628 629

  tdListAppend(pBuf->freePgList, &ppi);
H
Haojun Liao 已提交
630
}
H
Haojun Liao 已提交
631

632
void dBufSetPrintInfo(SDiskbasedBuf* pBuf) { pBuf->printStatis = true; }
633

S
Shengliang Guan 已提交
634
SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf) { return pBuf->statis; }
H
Haojun Liao 已提交
635

H
Haojun Liao 已提交
636
void dBufPrintStatis(const SDiskbasedBuf* pBuf) {
H
Haojun Liao 已提交
637 638 639 640 641 642 643 644
  if (!pBuf->printStatis) {
    return;
  }

  const SDiskbasedBufStatis* ps = &pBuf->statis;

  printf(
      "Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page size:%.2f "
H
Haojun Liao 已提交
645
      "Kb, %s\n",
H
Haojun Liao 已提交
646
      pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
H
Haojun Liao 已提交
647
      listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id);
H
Haojun Liao 已提交
648 649 650 651 652

  printf(
      "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f Kb\n",
      ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->loadPages,
      ps->loadBytes / (1024.0 * ps->loadPages));
653
}
5
54liuyao 已提交
654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681

void clearDiskbasedBuf(SDiskbasedBuf* pBuf) {
  SArray** p = taosHashIterate(pBuf->groupSet, NULL);
  while (p) {
    size_t n = taosArrayGetSize(*p);
    for (int32_t i = 0; i < n; ++i) {
      SPageInfo* pi = taosArrayGetP(*p, i);
      taosMemoryFreeClear(pi->pData);
      taosMemoryFreeClear(pi);
    }
    taosArrayDestroy(*p);
    p = taosHashIterate(pBuf->groupSet, p);
  }

  tdListEmpty(pBuf->lruList);
  tdListEmpty(pBuf->freePgList);

  taosArrayClear(pBuf->emptyDummyIdList);
  taosArrayClear(pBuf->pFree);

  taosHashClear(pBuf->groupSet);
  taosHashClear(pBuf->all);

  pBuf->numOfPages   = 0;  // all pages are in buffer in the first place
  pBuf->totalBufSize = 0;
  pBuf->allocateId = -1;
  pBuf->fileSize = 0;
}