tqOffset.c 5.7 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#define _DEFAULT_SOURCE

H
Hongze Cheng 已提交
17
#include "tq.h"
L
Liu Jicong 已提交
18 19

struct STqOffsetStore {
20 21
  STQ*      pTq;
  SHashObj* pHash;  // SHashObj<subscribeKey, offset>
22
  int8_t    needCommit;
L
Liu Jicong 已提交
23 24
};

25
char* tqOffsetBuildFName(const char* path, int32_t fVer) {
26
  int32_t len = strlen(path);
L
Liu Jicong 已提交
27
  char*   fname = taosMemoryCalloc(1, len + 40);
28
  snprintf(fname, len + 40, "%s/offset-ver%d", path, fVer);
29 30 31
  return fname;
}

L
Liu Jicong 已提交
32
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
33
  TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ);
34 35 36 37 38 39 40 41 42 43 44 45 46 47
  if (pFile == NULL) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t vgId = TD_VID(pStore->pTq->pVnode);
  int64_t code = 0;

  STqOffsetHead head = {0};

  while (1) {
    if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) {
      if (code == 0) {
        break;
      } else {
L
Liu Jicong 已提交
48
        return -1;
49
      }
50 51 52 53 54 55 56 57 58
    }

    int32_t size = htonl(head.size);
    void*   pMemBuf = taosMemoryCalloc(1, size);
    if (pMemBuf == NULL) {
      tqError("vgId:%d failed to restore offset from file, since out of memory, malloc size:%d", vgId, size);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
L
Liu Jicong 已提交
59

60 61 62 63 64 65 66 67 68 69
    if ((code = taosReadFile(pFile, pMemBuf, size)) != size) {
      taosMemoryFree(pMemBuf);
      return -1;
    }

    STqOffset offset;
    SDecoder  decoder;
    tDecoderInit(&decoder, pMemBuf, size);
    if (tDecodeSTqOffset(&decoder, &offset) < 0) {
      taosMemoryFree(pMemBuf);
70
      tDecoderClear(&decoder);
71 72 73 74 75 76 77
      return code;
    }

    tDecoderClear(&decoder);
    if (taosHashPut(pStore->pHash, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) {
      return -1;
    }
L
Liu Jicong 已提交
78

79 80 81 82
    // todo remove this
    if (offset.val.type == TMQ_OFFSET__LOG) {
      STqHandle* pHandle = taosHashGet(pStore->pTq->pHandle, offset.subKey, strlen(offset.subKey));
      if (pHandle) {
wmmhello's avatar
wmmhello 已提交
83
        if (walSetRefVer(pHandle->pRef, offset.val.version) < 0) {
84 85
//          tqError("vgId: %d, tq handle %s ref ver %" PRId64 "error", pStore->pTq->pVnode->config.vgId, pHandle->subKey,
//                  offset.val.version);
L
Liu Jicong 已提交
86 87
        }
      }
88 89
    }

90
    taosMemoryFree(pMemBuf);
91
  }
92 93 94

  taosCloseFile(&pFile);
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
95 96 97 98 99 100 101
}

STqOffsetStore* tqOffsetOpen(STQ* pTq) {
  STqOffsetStore* pStore = taosMemoryCalloc(1, sizeof(STqOffsetStore));
  if (pStore == NULL) {
    return NULL;
  }
102

L
Liu Jicong 已提交
103
  pStore->pTq = pTq;
104
  pStore->needCommit = 0;
L
Liu Jicong 已提交
105 106 107 108 109 110 111
  pTq->pOffsetStore = pStore;

  pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
  if (pStore->pHash == NULL) {
    taosMemoryFree(pStore);
    return NULL;
  }
112

L
Liu Jicong 已提交
113 114
  char* fname = tqOffsetBuildFName(pStore->pTq->path, 0);
  if (tqOffsetRestoreFromFile(pStore, fname) < 0) {
L
Liu Jicong 已提交
115 116 117
    taosMemoryFree(fname);
    taosMemoryFree(pStore);
    return NULL;
L
Liu Jicong 已提交
118
  }
119

L
Liu Jicong 已提交
120
  taosMemoryFree(fname);
L
Liu Jicong 已提交
121 122
  return pStore;
}
123 124

void tqOffsetClose(STqOffsetStore* pStore) {
L
Liu Jicong 已提交
125
  tqOffsetCommitFile(pStore);
126
  taosHashCleanup(pStore->pHash);
127
  taosMemoryFree(pStore);
128 129 130 131 132 133 134
}

STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey) {
  return (STqOffset*)taosHashGet(pStore->pHash, subscribeKey, strlen(subscribeKey));
}

int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) {
135
  pStore->needCommit = 1;
136 137 138
  return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
}

L
Liu Jicong 已提交
139 140 141 142
int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey) {
  return taosHashRemove(pStore->pHash, subscribeKey, strlen(subscribeKey));
}

L
Liu Jicong 已提交
143
int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
144 145 146 147
  if (!pStore->needCommit) {
    return 0;
  }

L
Liu Jicong 已提交
148 149
  // TODO file name should be with a newer version
  char*     fname = tqOffsetBuildFName(pStore->pTq->path, 0);
150
  TdFilePtr pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
151
  if (pFile == NULL) {
152
    terrno = TAOS_SYSTEM_ERROR(errno);
153 154
    const char* err = strerror(errno);
    tqError("vgId:%d, failed to open offset file %s, since %s", TD_VID(pStore->pTq->pVnode), fname, err);
L
Liu Jicong 已提交
155
    taosMemoryFree(fname);
156 157
    return -1;
  }
158

159
  taosMemoryFree(fname);
160

161 162 163
  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pStore->pHash, pIter);
164 165 166 167
    if (pIter == NULL) {
      break;
    }

168 169 170 171
    STqOffset* pOffset = (STqOffset*)pIter;
    int32_t    bodyLen;
    int32_t    code;
    tEncodeSize(tEncodeSTqOffset, pOffset, bodyLen, code);
172

173 174 175 176 177 178 179 180 181 182 183 184 185
    if (code < 0) {
      taosHashCancelIterate(pStore->pHash, pIter);
      return -1;
    }

    int32_t totLen = sizeof(STqOffsetHead) + bodyLen;
    void*   buf = taosMemoryCalloc(1, totLen);
    void*   abuf = POINTER_SHIFT(buf, sizeof(STqOffsetHead));

    ((STqOffsetHead*)buf)->size = htonl(bodyLen);
    SEncoder encoder;
    tEncoderInit(&encoder, abuf, bodyLen);
    tEncodeSTqOffset(&encoder, pOffset);
186

187 188
    // write file
    int64_t writeLen;
L
Liu Jicong 已提交
189
    if ((writeLen = taosWriteFile(pFile, buf, totLen)) != totLen) {
S
Shengliang Guan 已提交
190
      tqError("write offset incomplete, len %d, write len %" PRId64, bodyLen, writeLen);
191
      taosHashCancelIterate(pStore->pHash, pIter);
192
      taosMemoryFree(buf);
193 194
      return -1;
    }
195

196
    taosMemoryFree(buf);
197
  }
198

199 200
  // close and rename file
  taosCloseFile(&pFile);
201
  pStore->needCommit = 0;
202 203
  return 0;
}