tqOffset.c 5.0 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#define _DEFAULT_SOURCE

H
Hongze Cheng 已提交
17
#include "tq.h"
L
Liu Jicong 已提交
18 19

struct STqOffsetStore {
20 21
  STQ*      pTq;
  SHashObj* pHash;  // SHashObj<subscribeKey, offset>
22
  int8_t    needCommit;
L
Liu Jicong 已提交
23 24
};

25
char* tqOffsetBuildFName(const char* path, int32_t fVer) {
26
  int32_t len = strlen(path);
L
Liu Jicong 已提交
27
  char*   fname = taosMemoryCalloc(1, len + 40);
28
  snprintf(fname, len + 40, "%s/offset-ver%d", path, fVer);
29 30 31
  return fname;
}

L
Liu Jicong 已提交
32
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
33
  TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ);
34 35 36 37 38 39
  if (pFile != NULL) {
    STqOffsetHead head = {0};
    int64_t       code;

    while (1) {
      if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) {
L
Liu Jicong 已提交
40
        if (code == 0) {
41 42
          break;
        } else {
L
Liu Jicong 已提交
43
          return -1;
44 45 46 47 48
        }
      }
      int32_t size = htonl(head.size);
      void*   memBuf = taosMemoryCalloc(1, size);
      if ((code = taosReadFile(pFile, memBuf, size)) != size) {
L
Liu Jicong 已提交
49
        return -1;
50 51 52 53 54
      }
      STqOffset offset;
      SDecoder  decoder;
      tDecoderInit(&decoder, memBuf, size);
      if (tDecodeSTqOffset(&decoder, &offset) < 0) {
L
Liu Jicong 已提交
55
        return -1;
56 57 58
      }
      tDecoderClear(&decoder);
      if (taosHashPut(pStore->pHash, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) {
L
Liu Jicong 已提交
59
        return -1;
60
      }
L
Liu Jicong 已提交
61
      taosMemoryFree(memBuf);
62 63 64 65
    }

    taosCloseFile(&pFile);
  }
L
Liu Jicong 已提交
66 67 68 69 70 71 72 73 74
  return 0;
}

STqOffsetStore* tqOffsetOpen(STQ* pTq) {
  STqOffsetStore* pStore = taosMemoryCalloc(1, sizeof(STqOffsetStore));
  if (pStore == NULL) {
    return NULL;
  }
  pStore->pTq = pTq;
75
  pStore->needCommit = 0;
L
Liu Jicong 已提交
76 77 78 79 80 81 82 83 84
  pTq->pOffsetStore = pStore;

  pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
  if (pStore->pHash == NULL) {
    taosMemoryFree(pStore);
    return NULL;
  }
  char* fname = tqOffsetBuildFName(pStore->pTq->path, 0);
  if (tqOffsetRestoreFromFile(pStore, fname) < 0) {
L
Liu Jicong 已提交
85 86 87
    taosMemoryFree(fname);
    taosMemoryFree(pStore);
    return NULL;
L
Liu Jicong 已提交
88 89
  }
  taosMemoryFree(fname);
L
Liu Jicong 已提交
90 91
  return pStore;
}
92 93

void tqOffsetClose(STqOffsetStore* pStore) {
L
Liu Jicong 已提交
94
  tqOffsetCommitFile(pStore);
95
  taosHashCleanup(pStore->pHash);
96
  taosMemoryFree(pStore);
97 98 99 100 101 102 103
}

STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey) {
  return (STqOffset*)taosHashGet(pStore->pHash, subscribeKey, strlen(subscribeKey));
}

int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) {
104
  pStore->needCommit = 1;
105 106 107
  return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
}

L
Liu Jicong 已提交
108 109 110 111
int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey) {
  return taosHashRemove(pStore->pHash, subscribeKey, strlen(subscribeKey));
}

L
Liu Jicong 已提交
112
int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
113
  if (!pStore->needCommit) return 0;
L
Liu Jicong 已提交
114 115
  // TODO file name should be with a newer version
  char*     fname = tqOffsetBuildFName(pStore->pTq->path, 0);
116
  TdFilePtr pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
117
  if (pFile == NULL) {
118 119 120 121 122 123 124 125
    terrno = TAOS_SYSTEM_ERROR(errno);

    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
    int32_t     sysErr = errno;
    const char* sysErrStr = strerror(errno);
    tqError("vgId:%d, cannot open file %s when commit offset since %s", pStore->pTq->pVnode->config.vgId, fname,
            sysErrStr);
126 127
    return -1;
  }
128
  taosMemoryFree(fname);
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pStore->pHash, pIter);
    if (pIter == NULL) break;
    STqOffset* pOffset = (STqOffset*)pIter;
    int32_t    bodyLen;
    int32_t    code;
    tEncodeSize(tEncodeSTqOffset, pOffset, bodyLen, code);
    if (code < 0) {
      taosHashCancelIterate(pStore->pHash, pIter);
      return -1;
    }

    int32_t totLen = sizeof(STqOffsetHead) + bodyLen;
    void*   buf = taosMemoryCalloc(1, totLen);
    void*   abuf = POINTER_SHIFT(buf, sizeof(STqOffsetHead));

    ((STqOffsetHead*)buf)->size = htonl(bodyLen);
    SEncoder encoder;
    tEncoderInit(&encoder, abuf, bodyLen);
    tEncodeSTqOffset(&encoder, pOffset);
    // write file
    int64_t writeLen;
L
Liu Jicong 已提交
152
    if ((writeLen = taosWriteFile(pFile, buf, totLen)) != totLen) {
S
Shengliang Guan 已提交
153
      tqError("write offset incomplete, len %d, write len %" PRId64, bodyLen, writeLen);
154
      taosHashCancelIterate(pStore->pHash, pIter);
155
      taosMemoryFree(buf);
156 157
      return -1;
    }
158
    taosMemoryFree(buf);
159 160 161
  }
  // close and rename file
  taosCloseFile(&pFile);
162
  pStore->needCommit = 0;
163 164
  return 0;
}