tqOffset.c 5.6 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#define _DEFAULT_SOURCE

H
Hongze Cheng 已提交
17
#include "tq.h"
L
Liu Jicong 已提交
18 19

struct STqOffsetStore {
20 21
  STQ*      pTq;
  SHashObj* pHash;  // SHashObj<subscribeKey, offset>
22
  int8_t    needCommit;
L
Liu Jicong 已提交
23 24
};

25
char* tqOffsetBuildFName(const char* path, int32_t fVer) {
26
  int32_t len = strlen(path);
L
Liu Jicong 已提交
27
  char*   fname = taosMemoryCalloc(1, len + 40);
28
  snprintf(fname, len + 40, "%s/offset-ver%d", path, fVer);
29 30 31
  return fname;
}

L
Liu Jicong 已提交
32
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
33
  TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ);
34 35 36 37 38 39
  if (pFile != NULL) {
    STqOffsetHead head = {0};
    int64_t       code;

    while (1) {
      if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) {
L
Liu Jicong 已提交
40
        if (code == 0) {
41 42
          break;
        } else {
L
Liu Jicong 已提交
43
          return -1;
44 45 46 47
        }
      }
      int32_t size = htonl(head.size);
      void*   memBuf = taosMemoryCalloc(1, size);
L
Liu Jicong 已提交
48 49 50
      if (memBuf == NULL) {
        return -1;
      }
51
      if ((code = taosReadFile(pFile, memBuf, size)) != size) {
L
Liu Jicong 已提交
52
        taosMemoryFree(memBuf);
L
Liu Jicong 已提交
53
        return -1;
54 55 56 57 58
      }
      STqOffset offset;
      SDecoder  decoder;
      tDecoderInit(&decoder, memBuf, size);
      if (tDecodeSTqOffset(&decoder, &offset) < 0) {
L
Liu Jicong 已提交
59 60
        taosMemoryFree(memBuf);
        tDecoderClear(&decoder);
L
Liu Jicong 已提交
61
        return -1;
62
      }
L
Liu Jicong 已提交
63

64 65
      tDecoderClear(&decoder);
      if (taosHashPut(pStore->pHash, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) {
L
Liu Jicong 已提交
66
        return -1;
67
      }
L
Liu Jicong 已提交
68 69 70 71 72 73 74 75 76 77 78

      if (offset.val.type == TMQ_OFFSET__LOG) {
        STqHandle* pHandle = taosHashGet(pStore->pTq->pHandle, offset.subKey, strlen(offset.subKey));
        if (pHandle) {
          if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
            tqError("vgId: %d, tq handle %s ref ver %" PRId64 "error", pStore->pTq->pVnode->config.vgId,
                    pHandle->subKey, offset.val.version);
          }
        }
      }

L
Liu Jicong 已提交
79
      taosMemoryFree(memBuf);
80 81 82 83
    }

    taosCloseFile(&pFile);
  }
L
Liu Jicong 已提交
84 85 86 87 88 89 90 91 92
  return 0;
}

STqOffsetStore* tqOffsetOpen(STQ* pTq) {
  STqOffsetStore* pStore = taosMemoryCalloc(1, sizeof(STqOffsetStore));
  if (pStore == NULL) {
    return NULL;
  }
  pStore->pTq = pTq;
93
  pStore->needCommit = 0;
L
Liu Jicong 已提交
94 95 96 97 98 99 100 101 102
  pTq->pOffsetStore = pStore;

  pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
  if (pStore->pHash == NULL) {
    taosMemoryFree(pStore);
    return NULL;
  }
  char* fname = tqOffsetBuildFName(pStore->pTq->path, 0);
  if (tqOffsetRestoreFromFile(pStore, fname) < 0) {
L
Liu Jicong 已提交
103 104 105
    taosMemoryFree(fname);
    taosMemoryFree(pStore);
    return NULL;
L
Liu Jicong 已提交
106 107
  }
  taosMemoryFree(fname);
L
Liu Jicong 已提交
108 109
  return pStore;
}
110 111

void tqOffsetClose(STqOffsetStore* pStore) {
L
Liu Jicong 已提交
112
  tqOffsetCommitFile(pStore);
113
  taosHashCleanup(pStore->pHash);
114
  taosMemoryFree(pStore);
115 116 117 118 119 120 121
}

STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey) {
  return (STqOffset*)taosHashGet(pStore->pHash, subscribeKey, strlen(subscribeKey));
}

int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) {
122
  pStore->needCommit = 1;
123 124 125
  return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
}

L
Liu Jicong 已提交
126 127 128 129
int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey) {
  return taosHashRemove(pStore->pHash, subscribeKey, strlen(subscribeKey));
}

L
Liu Jicong 已提交
130
int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
131
  if (!pStore->needCommit) return 0;
L
Liu Jicong 已提交
132 133
  // TODO file name should be with a newer version
  char*     fname = tqOffsetBuildFName(pStore->pTq->path, 0);
134
  TdFilePtr pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
135
  if (pFile == NULL) {
136 137 138 139 140 141 142 143
    terrno = TAOS_SYSTEM_ERROR(errno);

    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
    int32_t     sysErr = errno;
    const char* sysErrStr = strerror(errno);
    tqError("vgId:%d, cannot open file %s when commit offset since %s", pStore->pTq->pVnode->config.vgId, fname,
            sysErrStr);
L
Liu Jicong 已提交
144
    taosMemoryFree(fname);
145 146
    return -1;
  }
147
  taosMemoryFree(fname);
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pStore->pHash, pIter);
    if (pIter == NULL) break;
    STqOffset* pOffset = (STqOffset*)pIter;
    int32_t    bodyLen;
    int32_t    code;
    tEncodeSize(tEncodeSTqOffset, pOffset, bodyLen, code);
    if (code < 0) {
      taosHashCancelIterate(pStore->pHash, pIter);
      return -1;
    }

    int32_t totLen = sizeof(STqOffsetHead) + bodyLen;
    void*   buf = taosMemoryCalloc(1, totLen);
    void*   abuf = POINTER_SHIFT(buf, sizeof(STqOffsetHead));

    ((STqOffsetHead*)buf)->size = htonl(bodyLen);
    SEncoder encoder;
    tEncoderInit(&encoder, abuf, bodyLen);
    tEncodeSTqOffset(&encoder, pOffset);
    // write file
    int64_t writeLen;
L
Liu Jicong 已提交
171
    if ((writeLen = taosWriteFile(pFile, buf, totLen)) != totLen) {
S
Shengliang Guan 已提交
172
      tqError("write offset incomplete, len %d, write len %" PRId64, bodyLen, writeLen);
173
      taosHashCancelIterate(pStore->pHash, pIter);
174
      taosMemoryFree(buf);
175 176
      return -1;
    }
177
    taosMemoryFree(buf);
178 179 180
  }
  // close and rename file
  taosCloseFile(&pFile);
181
  pStore->needCommit = 0;
182 183
  return 0;
}