tqMetaStore.c 18.3 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
L
Liu Jicong 已提交
15
#include "tqMetaStore.h"
L
Liu Jicong 已提交
16
// TODO:replace by an abstract file layer
L
Liu Jicong 已提交
17
#include <fcntl.h>
L
Liu Jicong 已提交
18
#include <string.h>
L
Liu Jicong 已提交
19
#include <unistd.h>
L
Liu Jicong 已提交
20
#include "osDir.h"
L
Liu Jicong 已提交
21

L
Liu Jicong 已提交
22
#define TQ_META_NAME "tq.meta"
L
Liu Jicong 已提交
23
#define TQ_IDX_NAME "tq.idx"
L
Liu Jicong 已提交
24

L
Liu Jicong 已提交
25 26
static int32_t tqHandlePutCommitted(STqMetaStore*, int64_t key, void* value);
static void*   tqHandleGetUncommitted(STqMetaStore*, int64_t key);
L
Liu Jicong 已提交
27

L
Liu Jicong 已提交
28 29
static inline void tqLinkUnpersist(STqMetaStore* pMeta, STqMetaList* pNode) {
  if (pNode->unpersistNext == NULL) {
L
Liu Jicong 已提交
30 31 32 33 34
    pNode->unpersistNext = pMeta->unpersistHead->unpersistNext;
    pNode->unpersistPrev = pMeta->unpersistHead;
    pMeta->unpersistHead->unpersistNext->unpersistPrev = pNode;
    pMeta->unpersistHead->unpersistNext = pNode;
  }
L
Liu Jicong 已提交
35 36
}

L
Liu Jicong 已提交
37 38 39 40 41 42 43
static inline int tqSeekLastPage(int fd) {
  int offset = lseek(fd, 0, SEEK_END);
  int pageNo = offset / TQ_PAGE_SIZE;
  int curPageOffset = pageNo * TQ_PAGE_SIZE;
  return lseek(fd, curPageOffset, SEEK_SET);
}

L
Liu Jicong 已提交
44 45
// TODO: the struct is tightly coupled with index entry
typedef struct STqIdxPageHead {
L
Liu Jicong 已提交
46 47
  int16_t writeOffset;
  int8_t  unused[14];
L
Liu Jicong 已提交
48
} STqIdxPageHead;
L
Liu Jicong 已提交
49

L
Liu Jicong 已提交
50 51 52 53
typedef struct STqIdxPageBuf {
  STqIdxPageHead head;
  char           buffer[TQ_IDX_PAGE_BODY_SIZE];
} STqIdxPageBuf;
L
Liu Jicong 已提交
54

L
Liu Jicong 已提交
55
static inline int tqReadLastPage(int fd, STqIdxPageBuf* pBuf) {
L
Liu Jicong 已提交
56 57
  int offset = tqSeekLastPage(fd);
  int nBytes;
L
Liu Jicong 已提交
58
  if ((nBytes = read(fd, pBuf, TQ_PAGE_SIZE)) == -1) {
L
Liu Jicong 已提交
59 60
    return -1;
  }
L
Liu Jicong 已提交
61
  if (nBytes == 0) {
L
Liu Jicong 已提交
62 63 64 65 66 67 68
    memset(pBuf, 0, TQ_PAGE_SIZE);
    pBuf->head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE;
  }
  ASSERT(nBytes == 0 || nBytes == pBuf->head.writeOffset);

  return lseek(fd, offset, SEEK_SET);
}
L
Liu Jicong 已提交
69

L
Liu Jicong 已提交
70 71 72 73 74
STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserialize deserializer, FTqDelete deleter,
                          int32_t tqConfigFlag) {
  STqMetaStore* pMeta = malloc(sizeof(STqMetaStore));
  if (pMeta == NULL) {
    // close
L
Liu Jicong 已提交
75 76
    return NULL;
  }
L
Liu Jicong 已提交
77
  memset(pMeta, 0, sizeof(STqMetaStore));
L
Liu Jicong 已提交
78

L
Liu Jicong 已提交
79
  // concat data file name and index file name
L
Liu Jicong 已提交
80
  size_t pathLen = strlen(path);
L
Liu Jicong 已提交
81 82 83
  pMeta->dirPath = malloc(pathLen + 1);
  if (pMeta->dirPath != NULL) {
    // TODO: memory insufficient
L
Liu Jicong 已提交
84 85
  }
  strcpy(pMeta->dirPath, path);
L
Liu Jicong 已提交
86 87

  char name[pathLen + 10];
L
Liu Jicong 已提交
88

L
Liu Jicong 已提交
89
  strcpy(name, path);
90
  if (taosDirExist(name) != 0 && taosMkDir(name) != 0) {
L
Liu Jicong 已提交
91 92
    ASSERT(false);
  }
L
Liu Jicong 已提交
93
  strcat(name, "/" TQ_IDX_NAME);
L
Liu Jicong 已提交
94
  int idxFd = open(name, O_RDWR | O_CREAT, 0755);
L
Liu Jicong 已提交
95
  if (idxFd < 0) {
L
Liu Jicong 已提交
96
    ASSERT(false);
L
Liu Jicong 已提交
97 98
    // close file
    // free memory
L
Liu Jicong 已提交
99 100
    return NULL;
  }
L
Liu Jicong 已提交
101

L
Liu Jicong 已提交
102
  pMeta->idxFd = idxFd;
L
Liu Jicong 已提交
103
  pMeta->unpersistHead = malloc(sizeof(STqMetaList));
L
Liu Jicong 已提交
104
  if (pMeta->unpersistHead == NULL) {
L
Liu Jicong 已提交
105
    ASSERT(false);
L
Liu Jicong 已提交
106 107
    // close file
    // free memory
L
Liu Jicong 已提交
108 109
    return NULL;
  }
L
Liu Jicong 已提交
110
  memset(pMeta->unpersistHead, 0, sizeof(STqMetaList));
L
Liu Jicong 已提交
111
  pMeta->unpersistHead->unpersistNext = pMeta->unpersistHead->unpersistPrev = pMeta->unpersistHead;
L
Liu Jicong 已提交
112 113 114

  strcpy(name, path);
  strcat(name, "/" TQ_META_NAME);
L
Liu Jicong 已提交
115
  int fileFd = open(name, O_RDWR | O_CREAT, 0755);
L
Liu Jicong 已提交
116
  if (fileFd < 0) {
L
Liu Jicong 已提交
117 118 119
    ASSERT(false);
    return NULL;
  }
L
Liu Jicong 已提交
120 121

  pMeta->fileFd = fileFd;
L
Liu Jicong 已提交
122

L
Liu Jicong 已提交
123 124 125
  pMeta->pSerializer = serializer;
  pMeta->pDeserializer = deserializer;
  pMeta->pDeleter = deleter;
L
Liu Jicong 已提交
126
  pMeta->tqConfigFlag = tqConfigFlag;
L
Liu Jicong 已提交
127

L
Liu Jicong 已提交
128 129 130 131 132
  // read idx file and load into memory
  STqIdxPageBuf      idxBuf;
  STqSerializedHead* serializedObj = malloc(TQ_PAGE_SIZE);
  if (serializedObj == NULL) {
    // TODO:memory insufficient
L
Liu Jicong 已提交
133
  }
L
Liu Jicong 已提交
134 135
  int  idxRead;
  int  allocated = TQ_PAGE_SIZE;
L
Liu Jicong 已提交
136
  bool readEnd = false;
L
Liu Jicong 已提交
137 138 139
  while ((idxRead = read(idxFd, &idxBuf, TQ_PAGE_SIZE))) {
    if (idxRead == -1) {
      // TODO: handle error
L
Liu Jicong 已提交
140 141
      ASSERT(false);
    }
L
Liu Jicong 已提交
142
    ASSERT(idxBuf.head.writeOffset == idxRead);
L
Liu Jicong 已提交
143 144 145 146 147
    // loop read every entry
    for (int i = 0; i < idxBuf.head.writeOffset - TQ_IDX_PAGE_HEAD_SIZE; i += TQ_IDX_SIZE) {
      STqMetaList* pNode = malloc(sizeof(STqMetaList));
      if (pNode == NULL) {
        // TODO: free memory and return error
L
Liu Jicong 已提交
148
      }
L
Liu Jicong 已提交
149
      memset(pNode, 0, sizeof(STqMetaList));
L
Liu Jicong 已提交
150 151 152
      memcpy(&pNode->handle, &idxBuf.buffer[i], TQ_IDX_SIZE);

      lseek(fileFd, pNode->handle.offset, SEEK_SET);
L
Liu Jicong 已提交
153 154 155 156
      if (allocated < pNode->handle.serializedSize) {
        void* ptr = realloc(serializedObj, pNode->handle.serializedSize);
        if (ptr == NULL) {
          // TODO: memory insufficient
L
Liu Jicong 已提交
157
        }
L
Liu Jicong 已提交
158 159
        serializedObj = ptr;
        allocated = pNode->handle.serializedSize;
L
Liu Jicong 已提交
160
      }
L
Liu Jicong 已提交
161
      serializedObj->ssize = pNode->handle.serializedSize;
L
Liu Jicong 已提交
162 163
      if (read(fileFd, serializedObj, pNode->handle.serializedSize) != pNode->handle.serializedSize) {
        // TODO: read error
L
Liu Jicong 已提交
164
      }
L
Liu Jicong 已提交
165 166
      if (serializedObj->action == TQ_ACTION_INUSE) {
        if (serializedObj->ssize != sizeof(STqSerializedHead)) {
L
Liu Jicong 已提交
167
          pMeta->pDeserializer(serializedObj, &pNode->handle.valueInUse);
L
Liu Jicong 已提交
168 169 170
        } else {
          pNode->handle.valueInUse = TQ_DELETE_TOKEN;
        }
L
Liu Jicong 已提交
171 172
      } else if (serializedObj->action == TQ_ACTION_INTXN) {
        if (serializedObj->ssize != sizeof(STqSerializedHead)) {
L
Liu Jicong 已提交
173
          pMeta->pDeserializer(serializedObj, &pNode->handle.valueInTxn);
L
Liu Jicong 已提交
174 175 176
        } else {
          pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
        }
L
Liu Jicong 已提交
177 178
      } else if (serializedObj->action == TQ_ACTION_INUSE_CONT) {
        if (serializedObj->ssize != sizeof(STqSerializedHead)) {
L
Liu Jicong 已提交
179
          pMeta->pDeserializer(serializedObj, &pNode->handle.valueInUse);
L
Liu Jicong 已提交
180 181 182
        } else {
          pNode->handle.valueInUse = TQ_DELETE_TOKEN;
        }
L
Liu Jicong 已提交
183 184
        STqSerializedHead* ptr = POINTER_SHIFT(serializedObj, serializedObj->ssize);
        if (ptr->ssize != sizeof(STqSerializedHead)) {
L
Liu Jicong 已提交
185
          pMeta->pDeserializer(ptr, &pNode->handle.valueInTxn);
L
Liu Jicong 已提交
186 187 188 189 190 191
        } else {
          pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
        }
      } else {
        ASSERT(0);
      }
L
Liu Jicong 已提交
192

L
Liu Jicong 已提交
193 194
      // put into list
      int          bucketKey = pNode->handle.key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
195
      STqMetaList* pBucketNode = pMeta->bucket[bucketKey];
L
Liu Jicong 已提交
196
      if (pBucketNode == NULL) {
L
Liu Jicong 已提交
197
        pMeta->bucket[bucketKey] = pNode;
L
Liu Jicong 已提交
198
      } else if (pBucketNode->handle.key == pNode->handle.key) {
L
Liu Jicong 已提交
199 200 201
        pNode->next = pBucketNode->next;
        pMeta->bucket[bucketKey] = pNode;
      } else {
L
Liu Jicong 已提交
202 203
        while (pBucketNode->next && pBucketNode->next->handle.key != pNode->handle.key) {
          pBucketNode = pBucketNode->next;
L
Liu Jicong 已提交
204
        }
L
Liu Jicong 已提交
205
        if (pBucketNode->next) {
L
Liu Jicong 已提交
206
          ASSERT(pBucketNode->next->handle.key == pNode->handle.key);
L
Liu Jicong 已提交
207
          STqMetaList* pNodeFound = pBucketNode->next;
L
Liu Jicong 已提交
208 209 210
          pNode->next = pNodeFound->next;
          pBucketNode->next = pNode;
          pBucketNode = pNodeFound;
L
Liu Jicong 已提交
211
        } else {
L
Liu Jicong 已提交
212 213
          pNode->next = pMeta->bucket[bucketKey];
          pMeta->bucket[bucketKey] = pNode;
L
Liu Jicong 已提交
214 215 216
          pBucketNode = NULL;
        }
      }
L
Liu Jicong 已提交
217 218
      if (pBucketNode) {
        if (pBucketNode->handle.valueInUse && pBucketNode->handle.valueInUse != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
219
          pMeta->pDeleter(pBucketNode->handle.valueInUse);
L
Liu Jicong 已提交
220
        }
L
Liu Jicong 已提交
221
        if (pBucketNode->handle.valueInTxn && pBucketNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
222
          pMeta->pDeleter(pBucketNode->handle.valueInTxn);
L
Liu Jicong 已提交
223 224 225
        }
        free(pBucketNode);
      }
L
Liu Jicong 已提交
226 227
    }
  }
L
Liu Jicong 已提交
228
  free(serializedObj);
L
Liu Jicong 已提交
229 230 231
  return pMeta;
}

L
Liu Jicong 已提交
232
int32_t tqStoreClose(STqMetaStore* pMeta) {
L
Liu Jicong 已提交
233
  // commit data and idx
L
Liu Jicong 已提交
234
  tqStorePersist(pMeta);
L
Liu Jicong 已提交
235
  ASSERT(pMeta->unpersistHead && pMeta->unpersistHead->next == NULL);
L
Liu Jicong 已提交
236 237
  close(pMeta->fileFd);
  close(pMeta->idxFd);
L
Liu Jicong 已提交
238 239
  // free memory
  for (int i = 0; i < TQ_BUCKET_SIZE; i++) {
L
Liu Jicong 已提交
240
    STqMetaList* pNode = pMeta->bucket[i];
L
Liu Jicong 已提交
241
    while (pNode) {
L
Liu Jicong 已提交
242 243
      ASSERT(pNode->unpersistNext == NULL);
      ASSERT(pNode->unpersistPrev == NULL);
L
Liu Jicong 已提交
244
      if (pNode->handle.valueInTxn && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
245
        pMeta->pDeleter(pNode->handle.valueInTxn);
L
Liu Jicong 已提交
246
      }
L
Liu Jicong 已提交
247
      if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
248
        pMeta->pDeleter(pNode->handle.valueInUse);
L
Liu Jicong 已提交
249
      }
L
Liu Jicong 已提交
250
      STqMetaList* next = pNode->next;
L
Liu Jicong 已提交
251 252
      free(pNode);
      pNode = next;
L
Liu Jicong 已提交
253 254
    }
  }
L
Liu Jicong 已提交
255
  free(pMeta->dirPath);
L
Liu Jicong 已提交
256
  free(pMeta->unpersistHead);
L
Liu Jicong 已提交
257
  free(pMeta);
L
Liu Jicong 已提交
258 259 260
  return 0;
}

L
Liu Jicong 已提交
261
int32_t tqStoreDelete(STqMetaStore* pMeta) {
L
Liu Jicong 已提交
262 263
  close(pMeta->fileFd);
  close(pMeta->idxFd);
L
Liu Jicong 已提交
264 265
  // free memory
  for (int i = 0; i < TQ_BUCKET_SIZE; i++) {
L
Liu Jicong 已提交
266
    STqMetaList* pNode = pMeta->bucket[i];
L
Liu Jicong 已提交
267
    pMeta->bucket[i] = NULL;
L
Liu Jicong 已提交
268 269
    while (pNode) {
      if (pNode->handle.valueInTxn && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
270
        pMeta->pDeleter(pNode->handle.valueInTxn);
L
Liu Jicong 已提交
271
      }
L
Liu Jicong 已提交
272
      if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
273
        pMeta->pDeleter(pNode->handle.valueInUse);
L
Liu Jicong 已提交
274
      }
L
Liu Jicong 已提交
275
      STqMetaList* next = pNode->next;
L
Liu Jicong 已提交
276 277 278 279 280 281 282 283
      free(pNode);
      pNode = next;
    }
  }
  free(pMeta->unpersistHead);
  taosRemoveDir(pMeta->dirPath);
  free(pMeta->dirPath);
  free(pMeta);
L
Liu Jicong 已提交
284 285 286
  return 0;
}

L
Liu Jicong 已提交
287
// TODO: wrap in tfile
L
Liu Jicong 已提交
288
int32_t tqStorePersist(STqMetaStore* pMeta) {
L
Liu Jicong 已提交
289 290 291 292 293 294 295
  STqIdxPageBuf      idxBuf;
  int64_t*           bufPtr = (int64_t*)idxBuf.buffer;
  STqMetaList*       pHead = pMeta->unpersistHead;
  STqMetaList*       pNode = pHead->unpersistNext;
  STqSerializedHead* pSHead = malloc(sizeof(STqSerializedHead));
  if (pSHead == NULL) {
    // TODO: memory error
L
Liu Jicong 已提交
296 297 298 299
    return -1;
  }
  pSHead->ver = TQ_SVER;
  pSHead->checksum = 0;
L
Liu Jicong 已提交
300 301
  pSHead->ssize = sizeof(STqSerializedHead);
  int allocatedSize = sizeof(STqSerializedHead);
L
Liu Jicong 已提交
302
  int offset = lseek(pMeta->fileFd, 0, SEEK_CUR);
L
Liu Jicong 已提交
303 304 305

  tqReadLastPage(pMeta->idxFd, &idxBuf);

L
Liu Jicong 已提交
306
  if (idxBuf.head.writeOffset == TQ_PAGE_SIZE) {
L
Liu Jicong 已提交
307 308 309 310 311 312 313
    lseek(pMeta->idxFd, 0, SEEK_END);
    memset(&idxBuf, 0, TQ_PAGE_SIZE);
    idxBuf.head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE;
  } else {
    bufPtr = POINTER_SHIFT(&idxBuf, idxBuf.head.writeOffset);
  }

L
Liu Jicong 已提交
314
  while (pHead != pNode) {
L
Liu Jicong 已提交
315 316
    int nBytes = 0;

L
Liu Jicong 已提交
317 318
    if (pNode->handle.valueInUse) {
      if (pNode->handle.valueInTxn) {
L
Liu Jicong 已提交
319 320 321 322 323
        pSHead->action = TQ_ACTION_INUSE_CONT;
      } else {
        pSHead->action = TQ_ACTION_INUSE;
      }

L
Liu Jicong 已提交
324 325
      if (pNode->handle.valueInUse == TQ_DELETE_TOKEN) {
        pSHead->ssize = sizeof(STqSerializedHead);
L
Liu Jicong 已提交
326
      } else {
L
Liu Jicong 已提交
327
        pMeta->pSerializer(pNode->handle.valueInUse, &pSHead);
L
Liu Jicong 已提交
328 329 330 331 332
      }
      nBytes = write(pMeta->fileFd, pSHead, pSHead->ssize);
      ASSERT(nBytes == pSHead->ssize);
    }

L
Liu Jicong 已提交
333
    if (pNode->handle.valueInTxn) {
L
Liu Jicong 已提交
334
      pSHead->action = TQ_ACTION_INTXN;
L
Liu Jicong 已提交
335 336
      if (pNode->handle.valueInTxn == TQ_DELETE_TOKEN) {
        pSHead->ssize = sizeof(STqSerializedHead);
L
Liu Jicong 已提交
337
      } else {
L
Liu Jicong 已提交
338
        pMeta->pSerializer(pNode->handle.valueInTxn, &pSHead);
L
Liu Jicong 已提交
339 340 341 342 343
      }
      int nBytesTxn = write(pMeta->fileFd, pSHead, pSHead->ssize);
      ASSERT(nBytesTxn == pSHead->ssize);
      nBytes += nBytesTxn;
    }
L
Liu Jicong 已提交
344 345
    pNode->handle.offset = offset;
    offset += nBytes;
L
Liu Jicong 已提交
346

L
Liu Jicong 已提交
347 348
    // write idx file
    // TODO: endian check and convert
L
Liu Jicong 已提交
349 350 351
    *(bufPtr++) = pNode->handle.key;
    *(bufPtr++) = pNode->handle.offset;
    *(bufPtr++) = (int64_t)nBytes;
L
Liu Jicong 已提交
352
    idxBuf.head.writeOffset += TQ_IDX_SIZE;
L
Liu Jicong 已提交
353

L
Liu Jicong 已提交
354
    if (idxBuf.head.writeOffset >= TQ_PAGE_SIZE) {
L
Liu Jicong 已提交
355
      nBytes = write(pMeta->idxFd, &idxBuf, TQ_PAGE_SIZE);
L
Liu Jicong 已提交
356
      // TODO: handle error with tfile
L
Liu Jicong 已提交
357 358
      ASSERT(nBytes == TQ_PAGE_SIZE);
      memset(&idxBuf, 0, TQ_PAGE_SIZE);
L
Liu Jicong 已提交
359
      idxBuf.head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE;
L
Liu Jicong 已提交
360
      bufPtr = (int64_t*)&idxBuf.buffer;
L
Liu Jicong 已提交
361
    }
L
Liu Jicong 已提交
362
    // remove from unpersist list
L
Liu Jicong 已提交
363 364 365 366 367
    pHead->unpersistNext = pNode->unpersistNext;
    pHead->unpersistNext->unpersistPrev = pHead;
    pNode->unpersistPrev = pNode->unpersistNext = NULL;
    pNode = pHead->unpersistNext;

L
Liu Jicong 已提交
368 369 370
    // remove from bucket
    if (pNode->handle.valueInUse == TQ_DELETE_TOKEN && pNode->handle.valueInTxn == NULL) {
      int          bucketKey = pNode->handle.key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
371
      STqMetaList* pBucketHead = pMeta->bucket[bucketKey];
L
Liu Jicong 已提交
372
      if (pBucketHead == pNode) {
L
Liu Jicong 已提交
373
        pMeta->bucket[bucketKey] = pNode->next;
L
Liu Jicong 已提交
374
      } else {
L
Liu Jicong 已提交
375
        STqMetaList* pBucketNode = pBucketHead;
L
Liu Jicong 已提交
376 377
        while (pBucketNode->next != NULL && pBucketNode->next != pNode) {
          pBucketNode = pBucketNode->next;
L
Liu Jicong 已提交
378
        }
L
Liu Jicong 已提交
379
        // impossible for pBucket->next == NULL
L
Liu Jicong 已提交
380 381
        ASSERT(pBucketNode->next == pNode);
        pBucketNode->next = pNode->next;
L
Liu Jicong 已提交
382
      }
L
Liu Jicong 已提交
383
      free(pNode);
L
Liu Jicong 已提交
384
    }
L
Liu Jicong 已提交
385
  }
L
Liu Jicong 已提交
386

L
Liu Jicong 已提交
387
  // write left bytes
L
Liu Jicong 已提交
388
  free(pSHead);
L
Liu Jicong 已提交
389 390
  // TODO: write new version in tfile
  if ((char*)bufPtr != idxBuf.buffer) {
L
Liu Jicong 已提交
391
    int nBytes = write(pMeta->idxFd, &idxBuf, idxBuf.head.writeOffset);
L
Liu Jicong 已提交
392
    // TODO: handle error in tfile
L
Liu Jicong 已提交
393
    ASSERT(nBytes == idxBuf.head.writeOffset);
L
Liu Jicong 已提交
394
  }
L
Liu Jicong 已提交
395
  // TODO: using fsync in tfile
L
Liu Jicong 已提交
396 397
  fsync(pMeta->idxFd);
  fsync(pMeta->fileFd);
L
Liu Jicong 已提交
398 399 400
  return 0;
}

L
Liu Jicong 已提交
401
static int32_t tqHandlePutCommitted(STqMetaStore* pMeta, int64_t key, void* value) {
L
Liu Jicong 已提交
402
  int64_t      bucketKey = key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
403
  STqMetaList* pNode = pMeta->bucket[bucketKey];
L
Liu Jicong 已提交
404 405 406 407
  while (pNode) {
    if (pNode->handle.key == key) {
      // TODO: think about thread safety
      if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
408
        pMeta->pDeleter(pNode->handle.valueInUse);
L
Liu Jicong 已提交
409
      }
L
Liu Jicong 已提交
410
      // change pointer ownership
L
Liu Jicong 已提交
411
      pNode->handle.valueInUse = value;
L
Liu Jicong 已提交
412
      return 0;
L
Liu Jicong 已提交
413 414 415 416
    } else {
      pNode = pNode->next;
    }
  }
L
Liu Jicong 已提交
417 418 419
  STqMetaList* pNewNode = malloc(sizeof(STqMetaList));
  if (pNewNode == NULL) {
    // TODO: memory error
L
Liu Jicong 已提交
420 421
    return -1;
  }
L
Liu Jicong 已提交
422
  memset(pNewNode, 0, sizeof(STqMetaList));
L
Liu Jicong 已提交
423 424
  pNewNode->handle.key = key;
  pNewNode->handle.valueInUse = value;
L
Liu Jicong 已提交
425
  // put into unpersist list
L
Liu Jicong 已提交
426 427 428 429
  pNewNode->unpersistPrev = pMeta->unpersistHead;
  pNewNode->unpersistNext = pMeta->unpersistHead->unpersistNext;
  pMeta->unpersistHead->unpersistNext->unpersistPrev = pNewNode;
  pMeta->unpersistHead->unpersistNext = pNewNode;
L
Liu Jicong 已提交
430 431 432
  return 0;
}

L
Liu Jicong 已提交
433
void* tqHandleGet(STqMetaStore* pMeta, int64_t key) {
L
Liu Jicong 已提交
434
  int64_t      bucketKey = key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
435
  STqMetaList* pNode = pMeta->bucket[bucketKey];
L
Liu Jicong 已提交
436 437 438
  while (pNode) {
    if (pNode->handle.key == key) {
      if (pNode->handle.valueInUse != NULL && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
439
        return pNode->handle.valueInUse;
L
Liu Jicong 已提交
440 441 442 443 444 445 446
      } else {
        return NULL;
      }
    } else {
      pNode = pNode->next;
    }
  }
L
Liu Jicong 已提交
447 448 449
  return NULL;
}

L
Liu Jicong 已提交
450
void* tqHandleTouchGet(STqMetaStore* pMeta, int64_t key) {
L
Liu Jicong 已提交
451
  int64_t      bucketKey = key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
452
  STqMetaList* pNode = pMeta->bucket[bucketKey];
L
Liu Jicong 已提交
453 454 455
  while (pNode) {
    if (pNode->handle.key == key) {
      if (pNode->handle.valueInUse != NULL && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
456 457 458 459
        tqLinkUnpersist(pMeta, pNode);
        return pNode->handle.valueInUse;
      } else {
        return NULL;
L
Liu Jicong 已提交
460
      }
L
Liu Jicong 已提交
461 462 463 464
    } else {
      pNode = pNode->next;
    }
  }
L
Liu Jicong 已提交
465
  return NULL;
L
Liu Jicong 已提交
466 467
}

L
Liu Jicong 已提交
468
static inline int32_t tqHandlePutImpl(STqMetaStore* pMeta, int64_t key, void* value) {
L
Liu Jicong 已提交
469
  int64_t      bucketKey = key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
470
  STqMetaList* pNode = pMeta->bucket[bucketKey];
L
Liu Jicong 已提交
471 472 473 474 475
  while (pNode) {
    if (pNode->handle.key == key) {
      // TODO: think about thread safety
      if (pNode->handle.valueInTxn) {
        if (TqDupIntxnReject(pMeta->tqConfigFlag)) {
L
Liu Jicong 已提交
476 477
          return -2;
        }
L
Liu Jicong 已提交
478
        if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
479
          pMeta->pDeleter(pNode->handle.valueInTxn);
L
Liu Jicong 已提交
480
        }
L
Liu Jicong 已提交
481
      }
L
Liu Jicong 已提交
482
      pNode->handle.valueInTxn = value;
L
Liu Jicong 已提交
483
      tqLinkUnpersist(pMeta, pNode);
L
Liu Jicong 已提交
484 485 486 487 488
      return 0;
    } else {
      pNode = pNode->next;
    }
  }
L
Liu Jicong 已提交
489 490 491
  STqMetaList* pNewNode = malloc(sizeof(STqMetaList));
  if (pNewNode == NULL) {
    // TODO: memory error
L
Liu Jicong 已提交
492 493
    return -1;
  }
L
Liu Jicong 已提交
494
  memset(pNewNode, 0, sizeof(STqMetaList));
L
Liu Jicong 已提交
495
  pNewNode->handle.key = key;
L
Liu Jicong 已提交
496
  pNewNode->handle.valueInTxn = value;
L
Liu Jicong 已提交
497
  pNewNode->next = pMeta->bucket[bucketKey];
L
Liu Jicong 已提交
498
  pMeta->bucket[bucketKey] = pNewNode;
L
Liu Jicong 已提交
499
  tqLinkUnpersist(pMeta, pNewNode);
L
Liu Jicong 已提交
500 501 502
  return 0;
}

L
Liu Jicong 已提交
503
int32_t tqHandleMovePut(STqMetaStore* pMeta, int64_t key, void* value) { return tqHandlePutImpl(pMeta, key, value); }
L
Liu Jicong 已提交
504

L
Liu Jicong 已提交
505
int32_t tqHandleCopyPut(STqMetaStore* pMeta, int64_t key, void* value, size_t vsize) {
L
Liu Jicong 已提交
506 507 508
  void* vmem = malloc(vsize);
  if (vmem == NULL) {
    // TODO: memory error
L
Liu Jicong 已提交
509 510 511 512 513 514
    return -1;
  }
  memcpy(vmem, value, vsize);
  return tqHandlePutImpl(pMeta, key, vmem);
}

L
Liu Jicong 已提交
515
static void* tqHandleGetUncommitted(STqMetaStore* pMeta, int64_t key) {
L
Liu Jicong 已提交
516
  int64_t      bucketKey = key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
517
  STqMetaList* pNode = pMeta->bucket[bucketKey];
L
Liu Jicong 已提交
518 519 520
  while (pNode) {
    if (pNode->handle.key == key) {
      if (pNode->handle.valueInTxn != NULL && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
521
        return pNode->handle.valueInTxn;
L
Liu Jicong 已提交
522 523 524 525 526 527 528
      } else {
        return NULL;
      }
    } else {
      pNode = pNode->next;
    }
  }
L
Liu Jicong 已提交
529 530 531
  return NULL;
}

L
Liu Jicong 已提交
532
int32_t tqHandleCommit(STqMetaStore* pMeta, int64_t key) {
L
Liu Jicong 已提交
533
  int64_t      bucketKey = key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
534
  STqMetaList* pNode = pMeta->bucket[bucketKey];
L
Liu Jicong 已提交
535 536 537
  while (pNode) {
    if (pNode->handle.key == key) {
      if (pNode->handle.valueInTxn == NULL) {
L
Liu Jicong 已提交
538 539
        return -1;
      }
L
Liu Jicong 已提交
540
      if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
541
        pMeta->pDeleter(pNode->handle.valueInUse);
L
Liu Jicong 已提交
542 543
      }
      pNode->handle.valueInUse = pNode->handle.valueInTxn;
L
Liu Jicong 已提交
544
      pNode->handle.valueInTxn = NULL;
L
Liu Jicong 已提交
545
      tqLinkUnpersist(pMeta, pNode);
L
Liu Jicong 已提交
546 547 548 549 550
      return 0;
    } else {
      pNode = pNode->next;
    }
  }
L
Liu Jicong 已提交
551
  return -2;
L
Liu Jicong 已提交
552 553
}

L
Liu Jicong 已提交
554
int32_t tqHandleAbort(STqMetaStore* pMeta, int64_t key) {
L
Liu Jicong 已提交
555
  int64_t      bucketKey = key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
556
  STqMetaList* pNode = pMeta->bucket[bucketKey];
L
Liu Jicong 已提交
557 558 559 560
  while (pNode) {
    if (pNode->handle.key == key) {
      if (pNode->handle.valueInTxn) {
        if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
561
          pMeta->pDeleter(pNode->handle.valueInTxn);
L
Liu Jicong 已提交
562
        }
L
Liu Jicong 已提交
563
        pNode->handle.valueInTxn = NULL;
L
Liu Jicong 已提交
564
        tqLinkUnpersist(pMeta, pNode);
L
Liu Jicong 已提交
565 566 567 568 569 570 571 572
        return 0;
      }
      return -1;
    } else {
      pNode = pNode->next;
    }
  }
  return -2;
L
Liu Jicong 已提交
573 574
}

L
Liu Jicong 已提交
575
int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) {
L
Liu Jicong 已提交
576
  int64_t      bucketKey = key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
577
  STqMetaList* pNode = pMeta->bucket[bucketKey];
L
Liu Jicong 已提交
578 579 580
  while (pNode) {
    if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
      if (pNode->handle.valueInTxn) {
L
Liu Jicong 已提交
581
        pMeta->pDeleter(pNode->handle.valueInTxn);
L
Liu Jicong 已提交
582
      }
L
Liu Jicong 已提交
583

L
Liu Jicong 已提交
584 585
      pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
      tqLinkUnpersist(pMeta, pNode);
L
Liu Jicong 已提交
586
      return 0;
L
Liu Jicong 已提交
587 588 589 590
    } else {
      pNode = pNode->next;
    }
  }
L
Liu Jicong 已提交
591
  // no such key
L
Liu Jicong 已提交
592
  return -1;
L
Liu Jicong 已提交
593 594
}

L
Liu Jicong 已提交
595 596
// TODO: clean deleted idx and data from persistent file
int32_t tqStoreCompact(STqMetaStore* pMeta) { return 0; }