tqMetaStore.c 20.0 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
H
Hongze Cheng 已提交
15
#include "tq.h"
H
Hongze Cheng 已提交
16 17 18 19
// #include <fcntl.h>
// #include <string.h>
// #include <unistd.h>
// #include "osDir.h"
L
Liu Jicong 已提交
20

L
Liu Jicong 已提交
21
#define TQ_META_NAME "tq.meta"
H
Hongze Cheng 已提交
22
#define TQ_IDX_NAME  "tq.idx"
L
Liu Jicong 已提交
23

L
Liu Jicong 已提交
24 25
static int32_t tqHandlePutCommitted(STqMetaStore*, int64_t key, void* value);
static void*   tqHandleGetUncommitted(STqMetaStore*, int64_t key);
L
Liu Jicong 已提交
26

L
Liu Jicong 已提交
27 28
static inline void tqLinkUnpersist(STqMetaStore* pMeta, STqMetaList* pNode) {
  if (pNode->unpersistNext == NULL) {
L
Liu Jicong 已提交
29 30 31 32 33
    pNode->unpersistNext = pMeta->unpersistHead->unpersistNext;
    pNode->unpersistPrev = pMeta->unpersistHead;
    pMeta->unpersistHead->unpersistNext->unpersistPrev = pNode;
    pMeta->unpersistHead->unpersistNext = pNode;
  }
L
Liu Jicong 已提交
34 35
}

36 37
static inline int64_t tqSeekLastPage(TdFilePtr pFile) {
  int offset = taosLSeekFile(pFile, 0, SEEK_END);
L
Liu Jicong 已提交
38 39
  int pageNo = offset / TQ_PAGE_SIZE;
  int curPageOffset = pageNo * TQ_PAGE_SIZE;
40
  return taosLSeekFile(pFile, curPageOffset, SEEK_SET);
L
Liu Jicong 已提交
41 42
}

L
Liu Jicong 已提交
43 44
// TODO: the struct is tightly coupled with index entry
typedef struct STqIdxPageHead {
L
Liu Jicong 已提交
45 46
  int16_t writeOffset;
  int8_t  unused[14];
L
Liu Jicong 已提交
47
} STqIdxPageHead;
L
Liu Jicong 已提交
48

L
Liu Jicong 已提交
49 50 51 52
typedef struct STqIdxPageBuf {
  STqIdxPageHead head;
  char           buffer[TQ_IDX_PAGE_BODY_SIZE];
} STqIdxPageBuf;
L
Liu Jicong 已提交
53

54 55
static inline int tqReadLastPage(TdFilePtr pFile, STqIdxPageBuf* pBuf) {
  int offset = tqSeekLastPage(pFile);
L
Liu Jicong 已提交
56
  int nBytes;
57
  if ((nBytes = taosReadFile(pFile, pBuf, TQ_PAGE_SIZE)) == -1) {
L
Liu Jicong 已提交
58
    terrno = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
59 60
    return -1;
  }
L
Liu Jicong 已提交
61
  if (nBytes == 0) {
L
Liu Jicong 已提交
62 63 64 65 66
    memset(pBuf, 0, TQ_PAGE_SIZE);
    pBuf->head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE;
  }
  ASSERT(nBytes == 0 || nBytes == pBuf->head.writeOffset);

67
  return taosLSeekFile(pFile, offset, SEEK_SET);
L
Liu Jicong 已提交
68
}
L
Liu Jicong 已提交
69

L
Liu Jicong 已提交
70 71
STqMetaStore* tqStoreOpen(STQ* pTq, const char* path, FTqSerialize serializer, FTqDeserialize deserializer,
                          FTqDelete deleter, int32_t tqConfigFlag) {
wafwerar's avatar
wafwerar 已提交
72
  STqMetaStore* pMeta = taosMemoryCalloc(1, sizeof(STqMetaStore));
L
Liu Jicong 已提交
73
  if (pMeta == NULL) {
L
Liu Jicong 已提交
74
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
75 76
    return NULL;
  }
L
Liu Jicong 已提交
77
  pMeta->pTq = pTq;
L
Liu Jicong 已提交
78

L
Liu Jicong 已提交
79
  // concat data file name and index file name
L
Liu Jicong 已提交
80
  size_t pathLen = strlen(path);
wafwerar's avatar
wafwerar 已提交
81
  pMeta->dirPath = taosMemoryMalloc(pathLen + 1);
L
Liu Jicong 已提交
82 83
  if (pMeta->dirPath == NULL) {
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
wafwerar's avatar
wafwerar 已提交
84
    taosMemoryFree(pMeta);
L
Liu Jicong 已提交
85
    return NULL;
L
Liu Jicong 已提交
86 87
  }
  strcpy(pMeta->dirPath, path);
L
Liu Jicong 已提交
88

H
Hongze Cheng 已提交
89
  char* name = taosMemoryMalloc(pathLen + 10);
L
Liu Jicong 已提交
90

L
Liu Jicong 已提交
91
  strcpy(name, path);
92
  if (!taosDirExist(name) && taosMkDir(name) != 0) {
L
Liu Jicong 已提交
93 94
    terrno = TSDB_CODE_TQ_FAILED_TO_CREATE_DIR;
    tqError("failed to create dir:%s since %s ", name, terrstr());
L
Liu Jicong 已提交
95
  }
L
Liu Jicong 已提交
96
  strcat(name, "/" TQ_IDX_NAME);
97
  TdFilePtr pIdxFile = taosOpenFile(name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ);
98
  if (pIdxFile == NULL) {
L
Liu Jicong 已提交
99 100
    terrno = TAOS_SYSTEM_ERROR(errno);
    tqError("failed to open file:%s since %s ", name, terrstr());
L
Liu Jicong 已提交
101
    // free memory
wafwerar's avatar
wafwerar 已提交
102
    taosMemoryFree(name);
L
Liu Jicong 已提交
103 104
    return NULL;
  }
L
Liu Jicong 已提交
105

106
  pMeta->pIdxFile = pIdxFile;
wafwerar's avatar
wafwerar 已提交
107
  pMeta->unpersistHead = taosMemoryCalloc(1, sizeof(STqMetaList));
L
Liu Jicong 已提交
108
  if (pMeta->unpersistHead == NULL) {
L
Liu Jicong 已提交
109
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
wafwerar's avatar
wafwerar 已提交
110
    taosMemoryFree(name);
L
Liu Jicong 已提交
111 112
    return NULL;
  }
L
Liu Jicong 已提交
113
  pMeta->unpersistHead->unpersistNext = pMeta->unpersistHead->unpersistPrev = pMeta->unpersistHead;
L
Liu Jicong 已提交
114 115 116

  strcpy(name, path);
  strcat(name, "/" TQ_META_NAME);
117
  TdFilePtr pFile = taosOpenFile(name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ);
118
  if (pFile == NULL) {
L
Liu Jicong 已提交
119 120
    terrno = TAOS_SYSTEM_ERROR(errno);
    tqError("failed to open file:%s since %s", name, terrstr());
wafwerar's avatar
wafwerar 已提交
121
    taosMemoryFree(name);
L
Liu Jicong 已提交
122 123
    return NULL;
  }
wafwerar's avatar
wafwerar 已提交
124
  taosMemoryFree(name);
L
Liu Jicong 已提交
125

126
  pMeta->pFile = pFile;
L
Liu Jicong 已提交
127

L
Liu Jicong 已提交
128 129 130
  pMeta->pSerializer = serializer;
  pMeta->pDeserializer = deserializer;
  pMeta->pDeleter = deleter;
L
Liu Jicong 已提交
131
  pMeta->tqConfigFlag = tqConfigFlag;
L
Liu Jicong 已提交
132

L
Liu Jicong 已提交
133 134
  // read idx file and load into memory
  STqIdxPageBuf      idxBuf;
wafwerar's avatar
wafwerar 已提交
135
  STqSerializedHead* serializedObj = taosMemoryMalloc(TQ_PAGE_SIZE);
L
Liu Jicong 已提交
136
  if (serializedObj == NULL) {
L
Liu Jicong 已提交
137
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
138
  }
L
Liu Jicong 已提交
139 140
  int  idxRead;
  int  allocated = TQ_PAGE_SIZE;
L
Liu Jicong 已提交
141
  bool readEnd = false;
142
  while ((idxRead = taosReadFile(pIdxFile, &idxBuf, TQ_PAGE_SIZE))) {
L
Liu Jicong 已提交
143 144
    if (idxRead == -1) {
      // TODO: handle error
L
Liu Jicong 已提交
145 146
      terrno = TAOS_SYSTEM_ERROR(errno);
      tqError("failed to read tq index file since %s", terrstr());
L
Liu Jicong 已提交
147
    }
L
Liu Jicong 已提交
148
    ASSERT(idxBuf.head.writeOffset == idxRead);
L
Liu Jicong 已提交
149 150
    // loop read every entry
    for (int i = 0; i < idxBuf.head.writeOffset - TQ_IDX_PAGE_HEAD_SIZE; i += TQ_IDX_SIZE) {
wafwerar's avatar
wafwerar 已提交
151
      STqMetaList* pNode = taosMemoryCalloc(1, sizeof(STqMetaList));
L
Liu Jicong 已提交
152
      if (pNode == NULL) {
L
Liu Jicong 已提交
153 154
        terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
        // TODO: free memory
L
Liu Jicong 已提交
155
      }
L
Liu Jicong 已提交
156 157
      memcpy(&pNode->handle, &idxBuf.buffer[i], TQ_IDX_SIZE);

158
      taosLSeekFile(pFile, pNode->handle.offset, SEEK_SET);
L
Liu Jicong 已提交
159
      if (allocated < pNode->handle.serializedSize) {
wafwerar's avatar
wafwerar 已提交
160
        void* ptr = taosMemoryRealloc(serializedObj, pNode->handle.serializedSize);
L
Liu Jicong 已提交
161
        if (ptr == NULL) {
L
Liu Jicong 已提交
162 163
          terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
          // TODO: free memory
L
Liu Jicong 已提交
164
        }
L
Liu Jicong 已提交
165 166
        serializedObj = ptr;
        allocated = pNode->handle.serializedSize;
L
Liu Jicong 已提交
167
      }
L
Liu Jicong 已提交
168
      serializedObj->ssize = pNode->handle.serializedSize;
169
      if (taosReadFile(pFile, serializedObj, pNode->handle.serializedSize) != pNode->handle.serializedSize) {
L
Liu Jicong 已提交
170
        // TODO: read error
L
Liu Jicong 已提交
171
      }
L
Liu Jicong 已提交
172 173
      if (serializedObj->action == TQ_ACTION_INUSE) {
        if (serializedObj->ssize != sizeof(STqSerializedHead)) {
L
Liu Jicong 已提交
174
          pMeta->pDeserializer(pTq, serializedObj, &pNode->handle.valueInUse);
L
Liu Jicong 已提交
175 176 177
        } else {
          pNode->handle.valueInUse = TQ_DELETE_TOKEN;
        }
L
Liu Jicong 已提交
178 179
      } else if (serializedObj->action == TQ_ACTION_INTXN) {
        if (serializedObj->ssize != sizeof(STqSerializedHead)) {
L
Liu Jicong 已提交
180
          pMeta->pDeserializer(pTq, serializedObj, &pNode->handle.valueInTxn);
L
Liu Jicong 已提交
181 182 183
        } else {
          pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
        }
L
Liu Jicong 已提交
184 185
      } else if (serializedObj->action == TQ_ACTION_INUSE_CONT) {
        if (serializedObj->ssize != sizeof(STqSerializedHead)) {
L
Liu Jicong 已提交
186
          pMeta->pDeserializer(pTq, serializedObj, &pNode->handle.valueInUse);
L
Liu Jicong 已提交
187 188 189
        } else {
          pNode->handle.valueInUse = TQ_DELETE_TOKEN;
        }
L
Liu Jicong 已提交
190 191
        STqSerializedHead* ptr = POINTER_SHIFT(serializedObj, serializedObj->ssize);
        if (ptr->ssize != sizeof(STqSerializedHead)) {
L
Liu Jicong 已提交
192
          pMeta->pDeserializer(pTq, ptr, &pNode->handle.valueInTxn);
L
Liu Jicong 已提交
193 194 195 196 197 198
        } else {
          pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
        }
      } else {
        ASSERT(0);
      }
L
Liu Jicong 已提交
199

L
Liu Jicong 已提交
200 201
      // put into list
      int          bucketKey = pNode->handle.key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
202
      STqMetaList* pBucketNode = pMeta->bucket[bucketKey];
L
Liu Jicong 已提交
203
      if (pBucketNode == NULL) {
L
Liu Jicong 已提交
204
        pMeta->bucket[bucketKey] = pNode;
L
Liu Jicong 已提交
205
      } else if (pBucketNode->handle.key == pNode->handle.key) {
L
Liu Jicong 已提交
206 207 208
        pNode->next = pBucketNode->next;
        pMeta->bucket[bucketKey] = pNode;
      } else {
L
Liu Jicong 已提交
209 210
        while (pBucketNode->next && pBucketNode->next->handle.key != pNode->handle.key) {
          pBucketNode = pBucketNode->next;
L
Liu Jicong 已提交
211
        }
L
Liu Jicong 已提交
212
        if (pBucketNode->next) {
L
Liu Jicong 已提交
213
          ASSERT(pBucketNode->next->handle.key == pNode->handle.key);
L
Liu Jicong 已提交
214
          STqMetaList* pNodeFound = pBucketNode->next;
L
Liu Jicong 已提交
215 216 217
          pNode->next = pNodeFound->next;
          pBucketNode->next = pNode;
          pBucketNode = pNodeFound;
L
Liu Jicong 已提交
218
        } else {
L
Liu Jicong 已提交
219 220
          pNode->next = pMeta->bucket[bucketKey];
          pMeta->bucket[bucketKey] = pNode;
L
Liu Jicong 已提交
221 222 223
          pBucketNode = NULL;
        }
      }
L
Liu Jicong 已提交
224 225
      if (pBucketNode) {
        if (pBucketNode->handle.valueInUse && pBucketNode->handle.valueInUse != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
226
          pMeta->pDeleter(pBucketNode->handle.valueInUse);
L
Liu Jicong 已提交
227
        }
L
Liu Jicong 已提交
228
        if (pBucketNode->handle.valueInTxn && pBucketNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
229
          pMeta->pDeleter(pBucketNode->handle.valueInTxn);
L
Liu Jicong 已提交
230
        }
wafwerar's avatar
wafwerar 已提交
231
        taosMemoryFree(pBucketNode);
L
Liu Jicong 已提交
232
      }
L
Liu Jicong 已提交
233 234
    }
  }
wafwerar's avatar
wafwerar 已提交
235
  taosMemoryFree(serializedObj);
L
Liu Jicong 已提交
236 237 238
  return pMeta;
}

L
Liu Jicong 已提交
239
int32_t tqStoreClose(STqMetaStore* pMeta) {
L
Liu Jicong 已提交
240
  // commit data and idx
L
Liu Jicong 已提交
241
  tqStorePersist(pMeta);
L
Liu Jicong 已提交
242
  ASSERT(pMeta->unpersistHead && pMeta->unpersistHead->next == NULL);
243 244
  taosCloseFile(&pMeta->pFile);
  taosCloseFile(&pMeta->pIdxFile);
L
Liu Jicong 已提交
245 246
  // free memory
  for (int i = 0; i < TQ_BUCKET_SIZE; i++) {
L
Liu Jicong 已提交
247
    STqMetaList* pNode = pMeta->bucket[i];
L
Liu Jicong 已提交
248
    while (pNode) {
L
Liu Jicong 已提交
249 250
      ASSERT(pNode->unpersistNext == NULL);
      ASSERT(pNode->unpersistPrev == NULL);
L
Liu Jicong 已提交
251
      if (pNode->handle.valueInTxn && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
252
        pMeta->pDeleter(pNode->handle.valueInTxn);
L
Liu Jicong 已提交
253
      }
L
Liu Jicong 已提交
254
      if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
255
        pMeta->pDeleter(pNode->handle.valueInUse);
L
Liu Jicong 已提交
256
      }
L
Liu Jicong 已提交
257
      STqMetaList* next = pNode->next;
wafwerar's avatar
wafwerar 已提交
258
      taosMemoryFree(pNode);
L
Liu Jicong 已提交
259
      pNode = next;
L
Liu Jicong 已提交
260 261
    }
  }
wafwerar's avatar
wafwerar 已提交
262 263 264
  taosMemoryFree(pMeta->dirPath);
  taosMemoryFree(pMeta->unpersistHead);
  taosMemoryFree(pMeta);
L
Liu Jicong 已提交
265 266 267
  return 0;
}

L
Liu Jicong 已提交
268
int32_t tqStoreDelete(STqMetaStore* pMeta) {
269 270
  taosCloseFile(&pMeta->pFile);
  taosCloseFile(&pMeta->pIdxFile);
L
Liu Jicong 已提交
271 272
  // free memory
  for (int i = 0; i < TQ_BUCKET_SIZE; i++) {
L
Liu Jicong 已提交
273
    STqMetaList* pNode = pMeta->bucket[i];
L
Liu Jicong 已提交
274
    pMeta->bucket[i] = NULL;
L
Liu Jicong 已提交
275 276
    while (pNode) {
      if (pNode->handle.valueInTxn && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
277
        pMeta->pDeleter(pNode->handle.valueInTxn);
L
Liu Jicong 已提交
278
      }
L
Liu Jicong 已提交
279
      if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
280
        pMeta->pDeleter(pNode->handle.valueInUse);
L
Liu Jicong 已提交
281
      }
L
Liu Jicong 已提交
282
      STqMetaList* next = pNode->next;
wafwerar's avatar
wafwerar 已提交
283
      taosMemoryFree(pNode);
L
Liu Jicong 已提交
284 285 286
      pNode = next;
    }
  }
wafwerar's avatar
wafwerar 已提交
287
  taosMemoryFree(pMeta->unpersistHead);
L
Liu Jicong 已提交
288
  taosRemoveDir(pMeta->dirPath);
wafwerar's avatar
wafwerar 已提交
289 290
  taosMemoryFree(pMeta->dirPath);
  taosMemoryFree(pMeta);
L
Liu Jicong 已提交
291 292 293
  return 0;
}

L
Liu Jicong 已提交
294
int32_t tqStorePersist(STqMetaStore* pMeta) {
L
Liu Jicong 已提交
295 296 297 298
  STqIdxPageBuf      idxBuf;
  int64_t*           bufPtr = (int64_t*)idxBuf.buffer;
  STqMetaList*       pHead = pMeta->unpersistHead;
  STqMetaList*       pNode = pHead->unpersistNext;
wafwerar's avatar
wafwerar 已提交
299
  STqSerializedHead* pSHead = taosMemoryMalloc(sizeof(STqSerializedHead));
L
Liu Jicong 已提交
300
  if (pSHead == NULL) {
L
Liu Jicong 已提交
301
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
302 303 304 305
    return -1;
  }
  pSHead->ver = TQ_SVER;
  pSHead->checksum = 0;
L
Liu Jicong 已提交
306
  pSHead->ssize = sizeof(STqSerializedHead);
L
Liu Jicong 已提交
307
  /*int allocatedSize = sizeof(STqSerializedHead);*/
308
  int offset = taosLSeekFile(pMeta->pFile, 0, SEEK_CUR);
L
Liu Jicong 已提交
309

310
  tqReadLastPage(pMeta->pIdxFile, &idxBuf);
L
Liu Jicong 已提交
311

L
Liu Jicong 已提交
312
  if (idxBuf.head.writeOffset == TQ_PAGE_SIZE) {
313
    taosLSeekFile(pMeta->pIdxFile, 0, SEEK_END);
L
Liu Jicong 已提交
314 315 316 317 318 319
    memset(&idxBuf, 0, TQ_PAGE_SIZE);
    idxBuf.head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE;
  } else {
    bufPtr = POINTER_SHIFT(&idxBuf, idxBuf.head.writeOffset);
  }

L
Liu Jicong 已提交
320
  while (pHead != pNode) {
L
Liu Jicong 已提交
321 322
    int nBytes = 0;

L
Liu Jicong 已提交
323 324
    if (pNode->handle.valueInUse) {
      if (pNode->handle.valueInTxn) {
L
Liu Jicong 已提交
325 326 327 328 329
        pSHead->action = TQ_ACTION_INUSE_CONT;
      } else {
        pSHead->action = TQ_ACTION_INUSE;
      }

L
Liu Jicong 已提交
330 331
      if (pNode->handle.valueInUse == TQ_DELETE_TOKEN) {
        pSHead->ssize = sizeof(STqSerializedHead);
L
Liu Jicong 已提交
332
      } else {
L
Liu Jicong 已提交
333
        pMeta->pSerializer(pNode->handle.valueInUse, &pSHead);
L
Liu Jicong 已提交
334
      }
335
      nBytes = taosWriteFile(pMeta->pFile, pSHead, pSHead->ssize);
L
Liu Jicong 已提交
336 337 338
      ASSERT(nBytes == pSHead->ssize);
    }

L
Liu Jicong 已提交
339
    if (pNode->handle.valueInTxn) {
L
Liu Jicong 已提交
340
      pSHead->action = TQ_ACTION_INTXN;
L
Liu Jicong 已提交
341 342
      if (pNode->handle.valueInTxn == TQ_DELETE_TOKEN) {
        pSHead->ssize = sizeof(STqSerializedHead);
L
Liu Jicong 已提交
343
      } else {
L
Liu Jicong 已提交
344
        pMeta->pSerializer(pNode->handle.valueInTxn, &pSHead);
L
Liu Jicong 已提交
345
      }
346
      int nBytesTxn = taosWriteFile(pMeta->pFile, pSHead, pSHead->ssize);
L
Liu Jicong 已提交
347 348 349
      ASSERT(nBytesTxn == pSHead->ssize);
      nBytes += nBytesTxn;
    }
L
Liu Jicong 已提交
350 351
    pNode->handle.offset = offset;
    offset += nBytes;
L
Liu Jicong 已提交
352

L
Liu Jicong 已提交
353 354
    // write idx file
    // TODO: endian check and convert
L
Liu Jicong 已提交
355 356 357
    *(bufPtr++) = pNode->handle.key;
    *(bufPtr++) = pNode->handle.offset;
    *(bufPtr++) = (int64_t)nBytes;
L
Liu Jicong 已提交
358
    idxBuf.head.writeOffset += TQ_IDX_SIZE;
L
Liu Jicong 已提交
359

L
Liu Jicong 已提交
360
    if (idxBuf.head.writeOffset >= TQ_PAGE_SIZE) {
361
      nBytes = taosWriteFile(pMeta->pIdxFile, &idxBuf, TQ_PAGE_SIZE);
L
Liu Jicong 已提交
362
      // TODO: handle error with tfile
L
Liu Jicong 已提交
363 364
      ASSERT(nBytes == TQ_PAGE_SIZE);
      memset(&idxBuf, 0, TQ_PAGE_SIZE);
L
Liu Jicong 已提交
365
      idxBuf.head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE;
L
Liu Jicong 已提交
366
      bufPtr = (int64_t*)&idxBuf.buffer;
L
Liu Jicong 已提交
367
    }
L
Liu Jicong 已提交
368
    // remove from unpersist list
L
Liu Jicong 已提交
369 370 371 372 373
    pHead->unpersistNext = pNode->unpersistNext;
    pHead->unpersistNext->unpersistPrev = pHead;
    pNode->unpersistPrev = pNode->unpersistNext = NULL;
    pNode = pHead->unpersistNext;

L
Liu Jicong 已提交
374 375 376
    // remove from bucket
    if (pNode->handle.valueInUse == TQ_DELETE_TOKEN && pNode->handle.valueInTxn == NULL) {
      int          bucketKey = pNode->handle.key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
377
      STqMetaList* pBucketHead = pMeta->bucket[bucketKey];
L
Liu Jicong 已提交
378
      if (pBucketHead == pNode) {
L
Liu Jicong 已提交
379
        pMeta->bucket[bucketKey] = pNode->next;
L
Liu Jicong 已提交
380
      } else {
L
Liu Jicong 已提交
381
        STqMetaList* pBucketNode = pBucketHead;
L
Liu Jicong 已提交
382 383
        while (pBucketNode->next != NULL && pBucketNode->next != pNode) {
          pBucketNode = pBucketNode->next;
L
Liu Jicong 已提交
384
        }
L
Liu Jicong 已提交
385
        // impossible for pBucket->next == NULL
L
Liu Jicong 已提交
386 387
        ASSERT(pBucketNode->next == pNode);
        pBucketNode->next = pNode->next;
L
Liu Jicong 已提交
388
      }
wafwerar's avatar
wafwerar 已提交
389
      taosMemoryFree(pNode);
L
Liu Jicong 已提交
390
    }
L
Liu Jicong 已提交
391
  }
L
Liu Jicong 已提交
392

L
Liu Jicong 已提交
393
  // write left bytes
wafwerar's avatar
wafwerar 已提交
394
  taosMemoryFree(pSHead);
L
Liu Jicong 已提交
395 396
  // TODO: write new version in tfile
  if ((char*)bufPtr != idxBuf.buffer) {
397
    int nBytes = taosWriteFile(pMeta->pIdxFile, &idxBuf, idxBuf.head.writeOffset);
L
Liu Jicong 已提交
398
    // TODO: handle error in tfile
L
Liu Jicong 已提交
399
    ASSERT(nBytes == idxBuf.head.writeOffset);
L
Liu Jicong 已提交
400
  }
L
Liu Jicong 已提交
401
  // TODO: using fsync in tfile
402 403
  taosFsyncFile(pMeta->pIdxFile);
  taosFsyncFile(pMeta->pFile);
L
Liu Jicong 已提交
404 405 406
  return 0;
}

L
Liu Jicong 已提交
407
static int32_t tqHandlePutCommitted(STqMetaStore* pMeta, int64_t key, void* value) {
L
Liu Jicong 已提交
408
  int64_t      bucketKey = key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
409
  STqMetaList* pNode = pMeta->bucket[bucketKey];
L
Liu Jicong 已提交
410 411 412
  while (pNode) {
    if (pNode->handle.key == key) {
      if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
413
        pMeta->pDeleter(pNode->handle.valueInUse);
L
Liu Jicong 已提交
414
      }
L
Liu Jicong 已提交
415
      // change pointer ownership
L
Liu Jicong 已提交
416
      pNode->handle.valueInUse = value;
L
Liu Jicong 已提交
417
      return 0;
L
Liu Jicong 已提交
418 419 420 421
    } else {
      pNode = pNode->next;
    }
  }
wafwerar's avatar
wafwerar 已提交
422
  STqMetaList* pNewNode = taosMemoryCalloc(1, sizeof(STqMetaList));
L
Liu Jicong 已提交
423
  if (pNewNode == NULL) {
L
Liu Jicong 已提交
424
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
425 426 427 428
    return -1;
  }
  pNewNode->handle.key = key;
  pNewNode->handle.valueInUse = value;
L
Liu Jicong 已提交
429
  pNewNode->next = pMeta->bucket[bucketKey];
L
Liu Jicong 已提交
430
  // put into unpersist list
L
Liu Jicong 已提交
431 432 433 434
  pNewNode->unpersistPrev = pMeta->unpersistHead;
  pNewNode->unpersistNext = pMeta->unpersistHead->unpersistNext;
  pMeta->unpersistHead->unpersistNext->unpersistPrev = pNewNode;
  pMeta->unpersistHead->unpersistNext = pNewNode;
L
Liu Jicong 已提交
435 436 437
  return 0;
}

L
Liu Jicong 已提交
438
void* tqHandleGet(STqMetaStore* pMeta, int64_t key) {
L
Liu Jicong 已提交
439
  int64_t      bucketKey = key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
440
  STqMetaList* pNode = pMeta->bucket[bucketKey];
L
Liu Jicong 已提交
441 442 443
  while (pNode) {
    if (pNode->handle.key == key) {
      if (pNode->handle.valueInUse != NULL && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
444
        return pNode->handle.valueInUse;
L
Liu Jicong 已提交
445 446 447 448 449 450 451
      } else {
        return NULL;
      }
    } else {
      pNode = pNode->next;
    }
  }
L
Liu Jicong 已提交
452 453 454
  return NULL;
}

L
Liu Jicong 已提交
455
void* tqHandleTouchGet(STqMetaStore* pMeta, int64_t key) {
L
Liu Jicong 已提交
456
  int64_t      bucketKey = key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
457
  STqMetaList* pNode = pMeta->bucket[bucketKey];
L
Liu Jicong 已提交
458 459 460
  while (pNode) {
    if (pNode->handle.key == key) {
      if (pNode->handle.valueInUse != NULL && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
461 462 463 464
        tqLinkUnpersist(pMeta, pNode);
        return pNode->handle.valueInUse;
      } else {
        return NULL;
L
Liu Jicong 已提交
465
      }
L
Liu Jicong 已提交
466 467 468 469
    } else {
      pNode = pNode->next;
    }
  }
L
Liu Jicong 已提交
470
  return NULL;
L
Liu Jicong 已提交
471 472
}

L
Liu Jicong 已提交
473
static inline int32_t tqHandlePutImpl(STqMetaStore* pMeta, int64_t key, void* value) {
L
Liu Jicong 已提交
474
  int64_t      bucketKey = key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
475
  STqMetaList* pNode = pMeta->bucket[bucketKey];
L
Liu Jicong 已提交
476 477 478
  while (pNode) {
    if (pNode->handle.key == key) {
      if (pNode->handle.valueInTxn) {
L
Liu Jicong 已提交
479
        if (tqDupIntxnReject(pMeta->tqConfigFlag)) {
L
Liu Jicong 已提交
480 481
          terrno = TSDB_CODE_TQ_META_KEY_DUP_IN_TXN;
          return -1;
L
Liu Jicong 已提交
482
        }
L
Liu Jicong 已提交
483
        if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
484
          pMeta->pDeleter(pNode->handle.valueInTxn);
L
Liu Jicong 已提交
485
        }
L
Liu Jicong 已提交
486
      }
L
Liu Jicong 已提交
487
      pNode->handle.valueInTxn = value;
L
Liu Jicong 已提交
488
      tqLinkUnpersist(pMeta, pNode);
L
Liu Jicong 已提交
489 490 491 492 493
      return 0;
    } else {
      pNode = pNode->next;
    }
  }
wafwerar's avatar
wafwerar 已提交
494
  STqMetaList* pNewNode = taosMemoryCalloc(1, sizeof(STqMetaList));
L
Liu Jicong 已提交
495
  if (pNewNode == NULL) {
L
Liu Jicong 已提交
496
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
497 498 499
    return -1;
  }
  pNewNode->handle.key = key;
L
Liu Jicong 已提交
500
  pNewNode->handle.valueInTxn = value;
L
Liu Jicong 已提交
501
  pNewNode->next = pMeta->bucket[bucketKey];
L
Liu Jicong 已提交
502
  pMeta->bucket[bucketKey] = pNewNode;
L
Liu Jicong 已提交
503
  tqLinkUnpersist(pMeta, pNewNode);
L
Liu Jicong 已提交
504 505 506
  return 0;
}

L
Liu Jicong 已提交
507
int32_t tqHandleMovePut(STqMetaStore* pMeta, int64_t key, void* value) { return tqHandlePutImpl(pMeta, key, value); }
L
Liu Jicong 已提交
508

L
Liu Jicong 已提交
509
int32_t tqHandleCopyPut(STqMetaStore* pMeta, int64_t key, void* value, size_t vsize) {
wafwerar's avatar
wafwerar 已提交
510
  void* vmem = taosMemoryMalloc(vsize);
L
Liu Jicong 已提交
511
  if (vmem == NULL) {
L
Liu Jicong 已提交
512
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
513 514 515 516 517 518
    return -1;
  }
  memcpy(vmem, value, vsize);
  return tqHandlePutImpl(pMeta, key, vmem);
}

L
Liu Jicong 已提交
519
static void* tqHandleGetUncommitted(STqMetaStore* pMeta, int64_t key) {
L
Liu Jicong 已提交
520
  int64_t      bucketKey = key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
521
  STqMetaList* pNode = pMeta->bucket[bucketKey];
L
Liu Jicong 已提交
522 523 524
  while (pNode) {
    if (pNode->handle.key == key) {
      if (pNode->handle.valueInTxn != NULL && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
525
        return pNode->handle.valueInTxn;
L
Liu Jicong 已提交
526 527 528 529 530 531 532
      } else {
        return NULL;
      }
    } else {
      pNode = pNode->next;
    }
  }
L
Liu Jicong 已提交
533 534 535
  return NULL;
}

L
Liu Jicong 已提交
536
int32_t tqHandleCommit(STqMetaStore* pMeta, int64_t key) {
L
Liu Jicong 已提交
537
  int64_t      bucketKey = key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
538
  STqMetaList* pNode = pMeta->bucket[bucketKey];
L
Liu Jicong 已提交
539 540 541
  while (pNode) {
    if (pNode->handle.key == key) {
      if (pNode->handle.valueInTxn == NULL) {
L
Liu Jicong 已提交
542
        terrno = TSDB_CODE_TQ_META_KEY_NOT_IN_TXN;
L
Liu Jicong 已提交
543 544
        return -1;
      }
L
Liu Jicong 已提交
545
      if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
546
        pMeta->pDeleter(pNode->handle.valueInUse);
L
Liu Jicong 已提交
547 548
      }
      pNode->handle.valueInUse = pNode->handle.valueInTxn;
L
Liu Jicong 已提交
549
      pNode->handle.valueInTxn = NULL;
L
Liu Jicong 已提交
550
      tqLinkUnpersist(pMeta, pNode);
L
Liu Jicong 已提交
551 552 553 554 555
      return 0;
    } else {
      pNode = pNode->next;
    }
  }
L
Liu Jicong 已提交
556 557
  terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY;
  return -1;
L
Liu Jicong 已提交
558 559
}

L
Liu Jicong 已提交
560
int32_t tqHandleAbort(STqMetaStore* pMeta, int64_t key) {
L
Liu Jicong 已提交
561
  int64_t      bucketKey = key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
562
  STqMetaList* pNode = pMeta->bucket[bucketKey];
L
Liu Jicong 已提交
563 564 565 566
  while (pNode) {
    if (pNode->handle.key == key) {
      if (pNode->handle.valueInTxn) {
        if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
L
Liu Jicong 已提交
567
          pMeta->pDeleter(pNode->handle.valueInTxn);
L
Liu Jicong 已提交
568
        }
L
Liu Jicong 已提交
569
        pNode->handle.valueInTxn = NULL;
L
Liu Jicong 已提交
570
        tqLinkUnpersist(pMeta, pNode);
L
Liu Jicong 已提交
571 572
        return 0;
      }
L
Liu Jicong 已提交
573
      terrno = TSDB_CODE_TQ_META_KEY_NOT_IN_TXN;
L
Liu Jicong 已提交
574 575 576 577 578
      return -1;
    } else {
      pNode = pNode->next;
    }
  }
L
Liu Jicong 已提交
579 580
  terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY;
  return -1;
L
Liu Jicong 已提交
581 582
}

L
Liu Jicong 已提交
583
int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) {
L
Liu Jicong 已提交
584
  int64_t      bucketKey = key & TQ_BUCKET_MASK;
L
Liu Jicong 已提交
585
  STqMetaList* pNode = pMeta->bucket[bucketKey];
L
Liu Jicong 已提交
586
  while (pNode) {
L
Liu Jicong 已提交
587 588 589 590 591 592 593 594 595
    if (pNode->handle.key == key) {
      if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
        if (pNode->handle.valueInTxn) {
          pMeta->pDeleter(pNode->handle.valueInTxn);
        }

        pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
        tqLinkUnpersist(pMeta, pNode);
        return 0;
L
Liu Jicong 已提交
596
      }
L
Liu Jicong 已提交
597 598 599 600 601 602 603
    } else {
      pNode = pNode->next;
    }
  }
  terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY;
  return -1;
}
L
Liu Jicong 已提交
604

L
Liu Jicong 已提交
605 606 607 608 609 610
int32_t tqHandlePurge(STqMetaStore* pMeta, int64_t key) {
  int64_t      bucketKey = key & TQ_BUCKET_MASK;
  STqMetaList* pNode = pMeta->bucket[bucketKey];
  while (pNode) {
    if (pNode->handle.key == key) {
      pNode->handle.valueInUse = TQ_DELETE_TOKEN;
L
Liu Jicong 已提交
611
      tqLinkUnpersist(pMeta, pNode);
L
Liu Jicong 已提交
612
      return 0;
L
Liu Jicong 已提交
613 614 615 616
    } else {
      pNode = pNode->next;
    }
  }
L
Liu Jicong 已提交
617
  terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY;
L
Liu Jicong 已提交
618
  return -1;
L
Liu Jicong 已提交
619 620
}

L
Liu Jicong 已提交
621 622
// TODO: clean deleted idx and data from persistent file
int32_t tqStoreCompact(STqMetaStore* pMeta) { return 0; }