tqMetaStore.c 18.3 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
L
Liu Jicong 已提交
15
#include "tqMetaStore.h"
L
Liu Jicong 已提交
16
//TODO:replace by an abstract file layer
L
Liu Jicong 已提交
17
#include "osDir.h"
L
Liu Jicong 已提交
18
#include <fcntl.h>
L
Liu Jicong 已提交
19
#include <string.h>
L
Liu Jicong 已提交
20 21
#include <unistd.h>

L
Liu Jicong 已提交
22 23 24
#define TQ_META_NAME "tq.meta"
#define TQ_IDX_NAME  "tq.idx"

L
Liu Jicong 已提交
25

L
Liu Jicong 已提交
26 27
static int32_t tqHandlePutCommitted(TqMetaStore*, int64_t key, void* value);
static void*   tqHandleGetUncommitted(TqMetaStore*, int64_t key);
L
Liu Jicong 已提交
28

L
Liu Jicong 已提交
29
static inline void tqLinkUnpersist(TqMetaStore *pMeta, TqMetaList* pNode) {
L
Liu Jicong 已提交
30 31 32 33 34 35
  if(pNode->unpersistNext == NULL) {
    pNode->unpersistNext = pMeta->unpersistHead->unpersistNext;
    pNode->unpersistPrev = pMeta->unpersistHead;
    pMeta->unpersistHead->unpersistNext->unpersistPrev = pNode;
    pMeta->unpersistHead->unpersistNext = pNode;
  }
L
Liu Jicong 已提交
36 37
}

L
Liu Jicong 已提交
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
static inline int tqSeekLastPage(int fd) {
  int offset = lseek(fd, 0, SEEK_END);
  int pageNo = offset / TQ_PAGE_SIZE;
  int curPageOffset = pageNo * TQ_PAGE_SIZE;
  return lseek(fd, curPageOffset, SEEK_SET);
}

//TODO: the struct is tightly coupled with index entry
typedef struct TqIdxPageHead {
  int16_t writeOffset;
  int8_t  unused[14];
} TqIdxPageHead;

typedef struct TqIdxPageBuf {
  TqIdxPageHead head;
  char buffer[TQ_IDX_PAGE_BODY_SIZE];
} TqIdxPageBuf;

static inline int tqReadLastPage(int fd, TqIdxPageBuf* pBuf) {
  int offset = tqSeekLastPage(fd);
  int nBytes;
  if((nBytes = read(fd, pBuf, TQ_PAGE_SIZE)) == -1) {
    return -1;
  }
  if(nBytes == 0) {
    memset(pBuf, 0, TQ_PAGE_SIZE);
    pBuf->head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE;
  }
  ASSERT(nBytes == 0 || nBytes == pBuf->head.writeOffset);

  return lseek(fd, offset, SEEK_SET);
}
L
Liu Jicong 已提交
70

L
Liu Jicong 已提交
71
TqMetaStore* tqStoreOpen(const char* path,
L
Liu Jicong 已提交
72 73
    int serializer(const void* pObj, TqSerializedHead** ppHead),
    const void* deserializer(const TqSerializedHead* pHead, void** ppObj),
L
Liu Jicong 已提交
74
    void deleter(void* pObj)) {
L
Liu Jicong 已提交
75 76 77 78 79
  TqMetaStore* pMeta = malloc(sizeof(TqMetaStore)); 
  if(pMeta == NULL) {
    //close
    return NULL;
  }
L
Liu Jicong 已提交
80
  memset(pMeta, 0, sizeof(TqMetaStore));
L
Liu Jicong 已提交
81 82 83

  //concat data file name and index file name
  size_t pathLen = strlen(path);
L
Liu Jicong 已提交
84 85 86 87 88 89
  pMeta->dirPath = malloc(pathLen+1);
  if(pMeta->dirPath != NULL) {
    //TODO: memory insufficient
  }
  strcpy(pMeta->dirPath, path);
  
L
Liu Jicong 已提交
90 91
  char name[pathLen+10];

L
Liu Jicong 已提交
92
  strcpy(name, path);
L
Liu Jicong 已提交
93 94 95
  if(!taosDirExist(name) && !taosMkDir(name)) {
    ASSERT(false);
  }
L
Liu Jicong 已提交
96
  strcat(name, "/" TQ_IDX_NAME);
L
Liu Jicong 已提交
97
  int idxFd = open(name, O_RDWR | O_CREAT, 0755);
L
Liu Jicong 已提交
98
  if(idxFd < 0) {
L
Liu Jicong 已提交
99
    ASSERT(false);
L
Liu Jicong 已提交
100 101 102 103
    //close file
    //free memory
    return NULL;
  }
L
Liu Jicong 已提交
104

L
Liu Jicong 已提交
105 106 107
  pMeta->idxFd = idxFd;
  pMeta->unpersistHead = malloc(sizeof(TqMetaList));
  if(pMeta->unpersistHead == NULL) {
L
Liu Jicong 已提交
108
    ASSERT(false);
L
Liu Jicong 已提交
109 110 111 112
    //close file
    //free memory
    return NULL;
  }
L
Liu Jicong 已提交
113 114 115 116
  memset(pMeta->unpersistHead, 0, sizeof(TqMetaList));
  pMeta->unpersistHead->unpersistNext
    = pMeta->unpersistHead->unpersistPrev
    = pMeta->unpersistHead;
L
Liu Jicong 已提交
117 118 119

  strcpy(name, path);
  strcat(name, "/" TQ_META_NAME);
L
Liu Jicong 已提交
120 121 122 123 124
  int fileFd = open(name, O_RDWR | O_CREAT, 0755);
  if(fileFd < 0){
    ASSERT(false);
    return NULL;
  }
L
Liu Jicong 已提交
125 126 127

  pMeta->fileFd = fileFd;
  
L
Liu Jicong 已提交
128 129 130
  pMeta->serializer = serializer;
  pMeta->deserializer = deserializer;
  pMeta->deleter = deleter;
L
Liu Jicong 已提交
131 132

  //read idx file and load into memory
L
Liu Jicong 已提交
133
  TqIdxPageBuf idxBuf;
L
Liu Jicong 已提交
134 135
  TqSerializedHead* serializedObj = malloc(TQ_PAGE_SIZE);
  if(serializedObj == NULL) {
L
Liu Jicong 已提交
136 137
    //TODO:memory insufficient
  }
L
Liu Jicong 已提交
138 139
  int idxRead;
  int allocated = TQ_PAGE_SIZE;
L
Liu Jicong 已提交
140 141
  bool readEnd = false;
  while((idxRead = read(idxFd, &idxBuf, TQ_PAGE_SIZE))) {
L
Liu Jicong 已提交
142 143 144 145
    if(idxRead == -1) {
      //TODO: handle error
      ASSERT(false);
    }
L
Liu Jicong 已提交
146
    ASSERT(idxBuf.head.writeOffset == idxRead);
L
Liu Jicong 已提交
147
    //loop read every entry
L
Liu Jicong 已提交
148
    for(int i = 0; i < idxBuf.head.writeOffset - TQ_IDX_PAGE_HEAD_SIZE; i += TQ_IDX_SIZE) {
L
Liu Jicong 已提交
149
      TqMetaList *pNode = malloc(sizeof(TqMetaList));
L
Liu Jicong 已提交
150 151 152
      if(pNode == NULL) {
        //TODO: free memory and return error
      }
L
Liu Jicong 已提交
153
      memset(pNode, 0, sizeof(TqMetaList));
L
Liu Jicong 已提交
154 155 156
      memcpy(&pNode->handle, &idxBuf.buffer[i], TQ_IDX_SIZE);

      lseek(fileFd, pNode->handle.offset, SEEK_SET);
L
Liu Jicong 已提交
157 158
      if(allocated < pNode->handle.serializedSize) {
        void *ptr = realloc(serializedObj, pNode->handle.serializedSize);
L
Liu Jicong 已提交
159 160 161
        if(ptr == NULL) {
          //TODO: memory insufficient
        }
L
Liu Jicong 已提交
162 163
        serializedObj = ptr;
        allocated = pNode->handle.serializedSize;
L
Liu Jicong 已提交
164
      }
L
Liu Jicong 已提交
165 166
      serializedObj->ssize = pNode->handle.serializedSize;
      if(read(fileFd, serializedObj, pNode->handle.serializedSize) != pNode->handle.serializedSize) {
L
Liu Jicong 已提交
167 168
        //TODO: read error
      }
L
Liu Jicong 已提交
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
      if(serializedObj->action == TQ_ACTION_INUSE) {
        if(serializedObj->ssize != sizeof(TqSerializedHead)) {
          pMeta->deserializer(serializedObj, &pNode->handle.valueInUse);
        } else {
          pNode->handle.valueInUse = TQ_DELETE_TOKEN;
        }
      } else if(serializedObj->action == TQ_ACTION_INTXN) {
        if(serializedObj->ssize != sizeof(TqSerializedHead)) {
          pMeta->deserializer(serializedObj, &pNode->handle.valueInTxn);
        } else {
          pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
        }
      } else if(serializedObj->action == TQ_ACTION_INUSE_CONT) {
        if(serializedObj->ssize != sizeof(TqSerializedHead)) {
          pMeta->deserializer(serializedObj, &pNode->handle.valueInUse);
        } else {
          pNode->handle.valueInUse = TQ_DELETE_TOKEN;
        }
L
Liu Jicong 已提交
187 188 189
        TqSerializedHead* ptr = POINTER_SHIFT(serializedObj, serializedObj->ssize);
        if(ptr->ssize != sizeof(TqSerializedHead)) {
          pMeta->deserializer(ptr, &pNode->handle.valueInTxn);
L
Liu Jicong 已提交
190 191 192 193 194 195
        } else {
          pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
        }
      } else {
        ASSERT(0);
      }
L
Liu Jicong 已提交
196 197

      //put into list
L
Liu Jicong 已提交
198
      int bucketKey = pNode->handle.key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
199 200 201 202 203 204 205 206
      TqMetaList* pBucketNode = pMeta->bucket[bucketKey];
      if(pBucketNode == NULL) {
        pMeta->bucket[bucketKey] = pNode;
      } else if(pBucketNode->handle.key == pNode->handle.key) {
        pNode->next = pBucketNode->next;
        pMeta->bucket[bucketKey] = pNode;
      } else {
        while(pBucketNode->next &&
L
Liu Jicong 已提交
207
            pBucketNode->next->handle.key != pNode->handle.key) {
L
Liu Jicong 已提交
208 209 210 211
          pBucketNode = pBucketNode->next; 
        }
        if(pBucketNode->next) {
          ASSERT(pBucketNode->next->handle.key == pNode->handle.key);
L
Liu Jicong 已提交
212 213 214 215
          TqMetaList *pNodeFound = pBucketNode->next;
          pNode->next = pNodeFound->next;
          pBucketNode->next = pNode;
          pBucketNode = pNodeFound;
L
Liu Jicong 已提交
216
        } else {
L
Liu Jicong 已提交
217 218
          pNode->next = pMeta->bucket[bucketKey];
          pMeta->bucket[bucketKey] = pNode;
L
Liu Jicong 已提交
219 220 221 222 223 224 225 226 227 228 229 230 231 232
          pBucketNode = NULL;
        }
      }
      if(pBucketNode) {
        if(pBucketNode->handle.valueInUse
            && pBucketNode->handle.valueInUse != TQ_DELETE_TOKEN) {
          pMeta->deleter(pBucketNode->handle.valueInUse);
        }
        if(pBucketNode->handle.valueInTxn
            && pBucketNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
          pMeta->deleter(pBucketNode->handle.valueInTxn);
        }
        free(pBucketNode);
      }
L
Liu Jicong 已提交
233 234
    }
  }
L
Liu Jicong 已提交
235
  free(serializedObj);
L
Liu Jicong 已提交
236 237 238 239 240
  return pMeta;
}

int32_t tqStoreClose(TqMetaStore* pMeta) {
  //commit data and idx
L
Liu Jicong 已提交
241 242 243 244
  tqStorePersist(pMeta);
  ASSERT(pMeta->unpersistHead && pMeta->unpersistHead->next==NULL);
  close(pMeta->fileFd);
  close(pMeta->idxFd);
L
Liu Jicong 已提交
245
  //free memory
L
Liu Jicong 已提交
246
  for(int i = 0; i < TQ_BUCKET_SIZE; i++) {
L
Liu Jicong 已提交
247 248 249 250
    TqMetaList* pNode = pMeta->bucket[i];
    while(pNode) {
      ASSERT(pNode->unpersistNext == NULL);
      ASSERT(pNode->unpersistPrev == NULL);
L
Liu Jicong 已提交
251 252
      if(pNode->handle.valueInTxn
          && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
253
        pMeta->deleter(pNode->handle.valueInTxn);
L
Liu Jicong 已提交
254
      }
L
Liu Jicong 已提交
255 256
      if(pNode->handle.valueInUse
          && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
257
        pMeta->deleter(pNode->handle.valueInUse);
L
Liu Jicong 已提交
258
      }
L
Liu Jicong 已提交
259 260 261
      TqMetaList* next = pNode->next;
      free(pNode);
      pNode = next;
L
Liu Jicong 已提交
262 263
    }
  }
L
Liu Jicong 已提交
264
  free(pMeta->dirPath);
L
Liu Jicong 已提交
265
  free(pMeta->unpersistHead);
L
Liu Jicong 已提交
266
  free(pMeta);
L
Liu Jicong 已提交
267 268 269 270
  return 0;
}

int32_t tqStoreDelete(TqMetaStore* pMeta) {
L
Liu Jicong 已提交
271 272
  close(pMeta->fileFd);
  close(pMeta->idxFd);
L
Liu Jicong 已提交
273
  //free memory
L
Liu Jicong 已提交
274 275 276 277
  for(int i = 0; i < TQ_BUCKET_SIZE; i++) {
    TqMetaList* pNode = pMeta->bucket[i];
    pMeta->bucket[i] = NULL;
    while(pNode) {
L
Liu Jicong 已提交
278 279
      if(pNode->handle.valueInTxn
          && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
280 281
        pMeta->deleter(pNode->handle.valueInTxn);
      }
L
Liu Jicong 已提交
282 283
      if(pNode->handle.valueInUse
          && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
284 285 286 287 288 289 290 291 292 293 294
        pMeta->deleter(pNode->handle.valueInUse);
      }
      TqMetaList* next = pNode->next;
      free(pNode);
      pNode = next;
    }
  }
  free(pMeta->unpersistHead);
  taosRemoveDir(pMeta->dirPath);
  free(pMeta->dirPath);
  free(pMeta);
L
Liu Jicong 已提交
295 296 297
  return 0;
}

L
Liu Jicong 已提交
298
//TODO: wrap in tfile
L
Liu Jicong 已提交
299
int32_t tqStorePersist(TqMetaStore* pMeta) {
L
Liu Jicong 已提交
300 301
  TqIdxPageBuf idxBuf;
  int64_t* bufPtr = (int64_t*)idxBuf.buffer;
L
Liu Jicong 已提交
302 303
  TqMetaList *pHead = pMeta->unpersistHead;
  TqMetaList *pNode = pHead->unpersistNext;
L
Liu Jicong 已提交
304 305 306 307 308 309 310 311 312 313
  TqSerializedHead *pSHead = malloc(sizeof(TqSerializedHead));
  if(pSHead == NULL) {
    //TODO: memory error
    return -1;
  }
  pSHead->ver = TQ_SVER;
  pSHead->checksum = 0;
  pSHead->ssize = sizeof(TqSerializedHead);
  int allocatedSize = sizeof(TqSerializedHead);
  int offset = lseek(pMeta->fileFd, 0, SEEK_CUR);
L
Liu Jicong 已提交
314 315 316 317 318 319 320 321 322 323 324

  tqReadLastPage(pMeta->idxFd, &idxBuf);

  if(idxBuf.head.writeOffset == TQ_PAGE_SIZE) {
    lseek(pMeta->idxFd, 0, SEEK_END);
    memset(&idxBuf, 0, TQ_PAGE_SIZE);
    idxBuf.head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE;
  } else {
    bufPtr = POINTER_SHIFT(&idxBuf, idxBuf.head.writeOffset);
  }

L
Liu Jicong 已提交
325
  while(pHead != pNode) {
L
Liu Jicong 已提交
326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
    int nBytes = 0;

    if(pNode->handle.valueInUse) {
      if(pNode->handle.valueInTxn) {
        pSHead->action = TQ_ACTION_INUSE_CONT;
      } else {
        pSHead->action = TQ_ACTION_INUSE;
      }

      if(pNode->handle.valueInUse == TQ_DELETE_TOKEN) {
        pSHead->ssize = sizeof(TqSerializedHead);
      } else {
        pMeta->serializer(pNode->handle.valueInUse, &pSHead);
      }
      nBytes = write(pMeta->fileFd, pSHead, pSHead->ssize);
      ASSERT(nBytes == pSHead->ssize);
    }

    if(pNode->handle.valueInTxn) {
      pSHead->action = TQ_ACTION_INTXN;
      if(pNode->handle.valueInTxn == TQ_DELETE_TOKEN) {
        pSHead->ssize = sizeof(TqSerializedHead);
      } else {
        pMeta->serializer(pNode->handle.valueInTxn, &pSHead);
      }
      int nBytesTxn = write(pMeta->fileFd, pSHead, pSHead->ssize);
      ASSERT(nBytesTxn == pSHead->ssize);
      nBytes += nBytesTxn;
    }
L
Liu Jicong 已提交
355 356
    pNode->handle.offset = offset;
    offset += nBytes;
L
Liu Jicong 已提交
357

L
Liu Jicong 已提交
358 359 360 361 362
    //write idx file
    //TODO: endian check and convert
    *(bufPtr++) = pNode->handle.key;
    *(bufPtr++) = pNode->handle.offset;
    *(bufPtr++) = (int64_t)nBytes;
L
Liu Jicong 已提交
363
    idxBuf.head.writeOffset += TQ_IDX_SIZE;
L
Liu Jicong 已提交
364

L
Liu Jicong 已提交
365 366
    if(idxBuf.head.writeOffset >= TQ_PAGE_SIZE) {
      nBytes = write(pMeta->idxFd, &idxBuf, TQ_PAGE_SIZE);
L
Liu Jicong 已提交
367
      //TODO: handle error with tfile
L
Liu Jicong 已提交
368 369
      ASSERT(nBytes == TQ_PAGE_SIZE);
      memset(&idxBuf, 0, TQ_PAGE_SIZE);
L
Liu Jicong 已提交
370
      idxBuf.head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE;
L
Liu Jicong 已提交
371
      bufPtr = (int64_t*)&idxBuf.buffer;
L
Liu Jicong 已提交
372 373 374 375 376 377 378 379 380 381 382
    }
    //remove from unpersist list
    pHead->unpersistNext = pNode->unpersistNext;
    pHead->unpersistNext->unpersistPrev = pHead;
    pNode->unpersistPrev = pNode->unpersistNext = NULL;
    pNode = pHead->unpersistNext;

    //remove from bucket
    if(pNode->handle.valueInUse == TQ_DELETE_TOKEN &&
        pNode->handle.valueInTxn == NULL
        ) {
L
Liu Jicong 已提交
383
      int bucketKey = pNode->handle.key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
384 385
      TqMetaList* pBucketHead = pMeta->bucket[bucketKey];
      if(pBucketHead == pNode) {
L
Liu Jicong 已提交
386
        pMeta->bucket[bucketKey] = pNode->next;
L
Liu Jicong 已提交
387 388 389 390 391 392
      } else {
        TqMetaList* pBucketNode = pBucketHead;
        while(pBucketNode->next != NULL
            && pBucketNode->next != pNode) {
          pBucketNode = pBucketNode->next; 
        }
L
Liu Jicong 已提交
393 394 395
        //impossible for pBucket->next == NULL
        ASSERT(pBucketNode->next == pNode);
        pBucketNode->next = pNode->next;
L
Liu Jicong 已提交
396
      }
L
Liu Jicong 已提交
397
      free(pNode);
L
Liu Jicong 已提交
398
    }
L
Liu Jicong 已提交
399
  }
L
Liu Jicong 已提交
400

L
Liu Jicong 已提交
401
  //write left bytes
L
Liu Jicong 已提交
402
  free(pSHead);
L
Liu Jicong 已提交
403 404 405
  //TODO: write new version in tfile
  if((char*)bufPtr != idxBuf.buffer) {
    int nBytes = write(pMeta->idxFd, &idxBuf, idxBuf.head.writeOffset);
L
Liu Jicong 已提交
406
    //TODO: handle error in tfile
L
Liu Jicong 已提交
407
    ASSERT(nBytes == idxBuf.head.writeOffset);
L
Liu Jicong 已提交
408 409 410 411
  }
  //TODO: using fsync in tfile
  fsync(pMeta->idxFd);
  fsync(pMeta->fileFd);
L
Liu Jicong 已提交
412 413 414
  return 0;
}

L
Liu Jicong 已提交
415
static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value) {
L
Liu Jicong 已提交
416
  int64_t bucketKey = key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
417 418 419 420
  TqMetaList* pNode = pMeta->bucket[bucketKey];
  while(pNode) {
    if(pNode->handle.key == key) {
      //TODO: think about thread safety
L
Liu Jicong 已提交
421 422
      if(pNode->handle.valueInUse
          && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
423 424
        pMeta->deleter(pNode->handle.valueInUse);
      }
L
Liu Jicong 已提交
425 426
      //change pointer ownership
      pNode->handle.valueInUse = value;
L
Liu Jicong 已提交
427
      return 0;
L
Liu Jicong 已提交
428 429 430 431
    } else {
      pNode = pNode->next;
    }
  }
L
Liu Jicong 已提交
432 433 434 435 436 437 438 439 440 441 442 443 444
  TqMetaList *pNewNode = malloc(sizeof(TqMetaList));
  if(pNewNode == NULL) {
    //TODO: memory error
    return -1;
  }
  memset(pNewNode, 0, sizeof(TqMetaList));
  pNewNode->handle.key = key;
  pNewNode->handle.valueInUse = value;
  //put into unpersist list
  pNewNode->unpersistPrev = pMeta->unpersistHead;
  pNewNode->unpersistNext = pMeta->unpersistHead->unpersistNext;
  pMeta->unpersistHead->unpersistNext->unpersistPrev = pNewNode;
  pMeta->unpersistHead->unpersistNext = pNewNode;
L
Liu Jicong 已提交
445 446 447
  return 0;
}

L
Liu Jicong 已提交
448
void* tqHandleGet(TqMetaStore* pMeta, int64_t key) {
L
Liu Jicong 已提交
449
  int64_t bucketKey = key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
450 451 452
  TqMetaList* pNode = pMeta->bucket[bucketKey];
  while(pNode) {
    if(pNode->handle.key == key) {
L
Liu Jicong 已提交
453 454
      if(pNode->handle.valueInUse != NULL
          && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
455
        return pNode->handle.valueInUse;
L
Liu Jicong 已提交
456 457 458 459 460 461 462
      } else {
        return NULL;
      }
    } else {
      pNode = pNode->next;
    }
  }
L
Liu Jicong 已提交
463 464 465
  return NULL;
}

L
Liu Jicong 已提交
466
int32_t tqHandleMovePut(TqMetaStore* pMeta, int64_t key, void* value) {
L
Liu Jicong 已提交
467
  int64_t bucketKey = key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
468 469 470 471
  TqMetaList* pNode = pMeta->bucket[bucketKey];
  while(pNode) {
    if(pNode->handle.key == key) {
      //TODO: think about thread safety
L
Liu Jicong 已提交
472 473
      if(pNode->handle.valueInTxn
          && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
474 475
        pMeta->deleter(pNode->handle.valueInTxn);
      }
L
Liu Jicong 已提交
476 477
      //change pointer ownership
      pNode->handle.valueInTxn = value;
L
Liu Jicong 已提交
478
      tqLinkUnpersist(pMeta, pNode);
L
Liu Jicong 已提交
479
      return 0;
L
Liu Jicong 已提交
480 481 482 483
    } else {
      pNode = pNode->next;
    }
  }
L
Liu Jicong 已提交
484 485 486 487 488 489 490 491
  TqMetaList *pNewNode = malloc(sizeof(TqMetaList));
  if(pNewNode == NULL) {
    //TODO: memory error
    return -1;
  }
  memset(pNewNode, 0, sizeof(TqMetaList));
  pNewNode->handle.key = key;
  pNewNode->handle.valueInTxn = value;
L
Liu Jicong 已提交
492
  pNewNode->next = pMeta->bucket[bucketKey];
L
Liu Jicong 已提交
493
  pMeta->bucket[bucketKey] = pNewNode;
L
Liu Jicong 已提交
494
  tqLinkUnpersist(pMeta, pNewNode);
L
Liu Jicong 已提交
495 496 497 498 499 500 501 502 503 504
  return 0;
}

int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsize) {
  void *vmem = malloc(vsize);
  if(vmem == NULL) {
    //TODO: memory error
    return -1;
  }
  memcpy(vmem, value, vsize);
L
Liu Jicong 已提交
505
  int64_t bucketKey = key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
506 507 508 509
  TqMetaList* pNode = pMeta->bucket[bucketKey];
  while(pNode) {
    if(pNode->handle.key == key) {
      //TODO: think about thread safety
L
Liu Jicong 已提交
510 511
      if(pNode->handle.valueInTxn
          && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
512 513 514 515
        pMeta->deleter(pNode->handle.valueInTxn);
      }
      //change pointer ownership
      pNode->handle.valueInTxn = vmem;
L
Liu Jicong 已提交
516
      tqLinkUnpersist(pMeta, pNode);
L
Liu Jicong 已提交
517 518 519 520 521 522 523 524 525 526 527 528 529 530
      return 0;
    } else {
      pNode = pNode->next;
    }
  }
  TqMetaList *pNewNode = malloc(sizeof(TqMetaList));
  if(pNewNode == NULL) {
    //TODO: memory error
    return -1;
  }
  memset(pNewNode, 0, sizeof(TqMetaList));
  pNewNode->handle.key = key;
  pNewNode->handle.valueInTxn = vmem;
  pNewNode->next = pMeta->bucket[bucketKey];
L
Liu Jicong 已提交
531
  pMeta->bucket[bucketKey] = pNewNode;
L
Liu Jicong 已提交
532
  tqLinkUnpersist(pMeta, pNewNode);
L
Liu Jicong 已提交
533 534 535
  return 0;
}

L
Liu Jicong 已提交
536
static void* tqHandleGetUncommitted(TqMetaStore* pMeta, int64_t key) {
L
Liu Jicong 已提交
537
  int64_t bucketKey = key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
538 539 540
  TqMetaList* pNode = pMeta->bucket[bucketKey];
  while(pNode) {
    if(pNode->handle.key == key) {
L
Liu Jicong 已提交
541 542
      if(pNode->handle.valueInTxn != NULL
          && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
543
        return pNode->handle.valueInTxn;
L
Liu Jicong 已提交
544 545 546 547 548 549 550
      } else {
        return NULL;
      }
    } else {
      pNode = pNode->next;
    }
  }
L
Liu Jicong 已提交
551 552 553 554
  return NULL;
}

int32_t tqHandleCommit(TqMetaStore* pMeta, int64_t key) {
L
Liu Jicong 已提交
555
  int64_t bucketKey = key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
556 557 558
  TqMetaList* pNode = pMeta->bucket[bucketKey];
  while(pNode) {
    if(pNode->handle.key == key) {
L
Liu Jicong 已提交
559 560 561
      if(pNode->handle.valueInTxn == NULL) {
        return -1;
      }
L
Liu Jicong 已提交
562 563
      if(pNode->handle.valueInUse
          && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
564 565 566
        pMeta->deleter(pNode->handle.valueInUse);
      }
      pNode->handle.valueInUse = pNode->handle.valueInTxn;
L
Liu Jicong 已提交
567
      pNode->handle.valueInTxn = NULL;
L
Liu Jicong 已提交
568
      tqLinkUnpersist(pMeta, pNode);
L
Liu Jicong 已提交
569 570 571 572 573
      return 0;
    } else {
      pNode = pNode->next;
    }
  }
L
Liu Jicong 已提交
574
  return -2;
L
Liu Jicong 已提交
575 576 577
}

int32_t tqHandleAbort(TqMetaStore* pMeta, int64_t key) {
L
Liu Jicong 已提交
578
  int64_t bucketKey = key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
579 580 581
  TqMetaList* pNode = pMeta->bucket[bucketKey];
  while(pNode) {
    if(pNode->handle.key == key) {
L
Liu Jicong 已提交
582 583 584 585
      if(pNode->handle.valueInTxn) {
        if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
          pMeta->deleter(pNode->handle.valueInTxn);
        }
L
Liu Jicong 已提交
586
        pNode->handle.valueInTxn = NULL;
L
Liu Jicong 已提交
587
        tqLinkUnpersist(pMeta, pNode);
L
Liu Jicong 已提交
588 589 590 591 592 593 594 595
        return 0;
      }
      return -1;
    } else {
      pNode = pNode->next;
    }
  }
  return -2;
L
Liu Jicong 已提交
596 597 598
}

int32_t tqHandleDel(TqMetaStore* pMeta, int64_t key) {
L
Liu Jicong 已提交
599
  int64_t bucketKey = key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
600 601
  TqMetaList* pNode = pMeta->bucket[bucketKey];
  while(pNode) {
L
Liu Jicong 已提交
602 603 604 605
    if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
      if(pNode->handle.valueInTxn) {
        pMeta->deleter(pNode->handle.valueInTxn);
      }
L
Liu Jicong 已提交
606 607
      pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
      tqLinkUnpersist(pMeta, pNode);
L
Liu Jicong 已提交
608
      return 0;
L
Liu Jicong 已提交
609 610 611 612
    } else {
      pNode = pNode->next;
    }
  }
L
Liu Jicong 已提交
613 614
  //no such key
  return -1;
L
Liu Jicong 已提交
615 616
}

L
Liu Jicong 已提交
617 618 619 620
//TODO: clean deleted idx and data from persistent file
int32_t tqStoreCompact(TqMetaStore *pMeta) {
  return 0;
}