tqOffset.c 4.6 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#define _DEFAULT_SOURCE

H
Hongze Cheng 已提交
17
#include "tq.h"
L
Liu Jicong 已提交
18 19

struct STqOffsetStore {
20
  char*     fname;
21 22
  STQ*      pTq;
  SHashObj* pHash;  // SHashObj<subscribeKey, offset>
L
Liu Jicong 已提交
23 24
};

25
char* tqOffsetBuildFName(const char* path, int32_t fVer) {
26
  int32_t len = strlen(path);
L
Liu Jicong 已提交
27
  char*   fname = taosMemoryCalloc(1, len + 40);
28
  snprintf(fname, len + 40, "%s/offset-ver%d", path, fVer);
29 30 31
  return fname;
}

L
Liu Jicong 已提交
32
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
33
  TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ);
34 35 36 37 38 39
  if (pFile != NULL) {
    STqOffsetHead head = {0};
    int64_t       code;

    while (1) {
      if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) {
L
Liu Jicong 已提交
40
        if (code == 0) {
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
          break;
        } else {
          ASSERT(0);
          // TODO handle error
        }
      }
      int32_t size = htonl(head.size);
      void*   memBuf = taosMemoryCalloc(1, size);
      if ((code = taosReadFile(pFile, memBuf, size)) != size) {
        ASSERT(0);
        // TODO handle error
      }
      STqOffset offset;
      SDecoder  decoder;
      tDecoderInit(&decoder, memBuf, size);
      if (tDecodeSTqOffset(&decoder, &offset) < 0) {
        ASSERT(0);
      }
      tDecoderClear(&decoder);
      if (taosHashPut(pStore->pHash, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) {
        ASSERT(0);
        // TODO
      }
    }

    taosCloseFile(&pFile);
  }
L
Liu Jicong 已提交
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
  return 0;
}

STqOffsetStore* tqOffsetOpen(STQ* pTq) {
  STqOffsetStore* pStore = taosMemoryCalloc(1, sizeof(STqOffsetStore));
  if (pStore == NULL) {
    return NULL;
  }
  pStore->pTq = pTq;
  pTq->pOffsetStore = pStore;

  pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
  if (pStore->pHash == NULL) {
    taosMemoryFree(pStore);
    return NULL;
  }
  char* fname = tqOffsetBuildFName(pStore->pTq->path, 0);
  if (tqOffsetRestoreFromFile(pStore, fname) < 0) {
    ASSERT(0);
  }
  taosMemoryFree(fname);
L
Liu Jicong 已提交
89 90
  return pStore;
}
91 92

void tqOffsetClose(STqOffsetStore* pStore) {
L
Liu Jicong 已提交
93
  tqOffsetCommitFile(pStore);
94
  taosHashCleanup(pStore->pHash);
95
  taosMemoryFree(pStore);
96 97 98 99 100 101 102 103 104 105
}

STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey) {
  return (STqOffset*)taosHashGet(pStore->pHash, subscribeKey, strlen(subscribeKey));
}

int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) {
  return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
}

L
Liu Jicong 已提交
106 107 108 109
int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey) {
  return taosHashRemove(pStore->pHash, subscribeKey, strlen(subscribeKey));
}

L
Liu Jicong 已提交
110 111 112
int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
  // TODO file name should be with a newer version
  char*     fname = tqOffsetBuildFName(pStore->pTq->path, 0);
113
  TdFilePtr pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
L
Liu Jicong 已提交
114
  taosMemoryFree(fname);
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
  if (pFile == NULL) {
    ASSERT(0);
    return -1;
  }
  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pStore->pHash, pIter);
    if (pIter == NULL) break;
    STqOffset* pOffset = (STqOffset*)pIter;
    int32_t    bodyLen;
    int32_t    code;
    tEncodeSize(tEncodeSTqOffset, pOffset, bodyLen, code);
    ASSERT(code == 0);
    if (code < 0) {
      ASSERT(0);
      taosHashCancelIterate(pStore->pHash, pIter);
      return -1;
    }

    int32_t totLen = sizeof(STqOffsetHead) + bodyLen;
    void*   buf = taosMemoryCalloc(1, totLen);
    void*   abuf = POINTER_SHIFT(buf, sizeof(STqOffsetHead));

    ((STqOffsetHead*)buf)->size = htonl(bodyLen);
    SEncoder encoder;
    tEncoderInit(&encoder, abuf, bodyLen);
    tEncodeSTqOffset(&encoder, pOffset);
    // write file
    int64_t writeLen;
L
Liu Jicong 已提交
144
    if ((writeLen = taosWriteFile(pFile, buf, totLen)) != totLen) {
145
      ASSERT(0);
S
Shengliang Guan 已提交
146
      tqError("write offset incomplete, len %d, write len %" PRId64, bodyLen, writeLen);
147
      taosHashCancelIterate(pStore->pHash, pIter);
148
      taosMemoryFree(buf);
149 150
      return -1;
    }
151
    taosMemoryFree(buf);
152 153 154 155 156
  }
  // close and rename file
  taosCloseFile(&pFile);
  return 0;
}