tqOffset.c 5.8 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#define _DEFAULT_SOURCE

H
Hongze Cheng 已提交
17
#include "tq.h"
L
Liu Jicong 已提交
18 19

struct STqOffsetStore {
20 21
  STQ*      pTq;
  SHashObj* pHash;  // SHashObj<subscribeKey, offset>
22
  int8_t    needCommit;
L
Liu Jicong 已提交
23 24
};

25
char* tqOffsetBuildFName(const char* path, int32_t fVer) {
26
  int32_t len = strlen(path);
L
Liu Jicong 已提交
27
  char*   fname = taosMemoryCalloc(1, len + 40);
28
  snprintf(fname, len + 40, "%s/offset-ver%d", path, fVer);
29 30 31
  return fname;
}

L
Liu Jicong 已提交
32
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
33
  TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ);
34 35 36 37 38 39 40 41 42 43 44 45 46 47
  if (pFile == NULL) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t vgId = TD_VID(pStore->pTq->pVnode);
  int64_t code = 0;

  STqOffsetHead head = {0};

  while (1) {
    if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) {
      if (code == 0) {
        break;
      } else {
L
Liu Jicong 已提交
48
        return -1;
49
      }
50 51 52 53 54 55 56 57 58
    }

    int32_t size = htonl(head.size);
    void*   pMemBuf = taosMemoryCalloc(1, size);
    if (pMemBuf == NULL) {
      tqError("vgId:%d failed to restore offset from file, since out of memory, malloc size:%d", vgId, size);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
L
Liu Jicong 已提交
59

60 61 62 63 64 65 66 67 68 69
    if ((code = taosReadFile(pFile, pMemBuf, size)) != size) {
      taosMemoryFree(pMemBuf);
      return -1;
    }

    STqOffset offset;
    SDecoder  decoder;
    tDecoderInit(&decoder, pMemBuf, size);
    if (tDecodeSTqOffset(&decoder, &offset) < 0) {
      taosMemoryFree(pMemBuf);
70
      tDecoderClear(&decoder);
71 72 73 74 75 76 77
      return code;
    }

    tDecoderClear(&decoder);
    if (taosHashPut(pStore->pHash, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) {
      return -1;
    }
L
Liu Jicong 已提交
78

79 80
    // todo remove this
    if (offset.val.type == TMQ_OFFSET__LOG) {
wmmhello's avatar
wmmhello 已提交
81
      taosWLockLatch(&pStore->pTq->lock);
82 83
      STqHandle* pHandle = taosHashGet(pStore->pTq->pHandle, offset.subKey, strlen(offset.subKey));
      if (pHandle) {
wmmhello's avatar
wmmhello 已提交
84
        if (walSetRefVer(pHandle->pRef, offset.val.version) < 0) {
85 86
//          tqError("vgId: %d, tq handle %s ref ver %" PRId64 "error", pStore->pTq->pVnode->config.vgId, pHandle->subKey,
//                  offset.val.version);
L
Liu Jicong 已提交
87 88
        }
      }
wmmhello's avatar
wmmhello 已提交
89
      taosWUnLockLatch(&pStore->pTq->lock);
90 91
    }

92
    taosMemoryFree(pMemBuf);
93
  }
94 95 96

  taosCloseFile(&pFile);
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
97 98 99 100 101 102 103
}

STqOffsetStore* tqOffsetOpen(STQ* pTq) {
  STqOffsetStore* pStore = taosMemoryCalloc(1, sizeof(STqOffsetStore));
  if (pStore == NULL) {
    return NULL;
  }
104

L
Liu Jicong 已提交
105
  pStore->pTq = pTq;
106
  pStore->needCommit = 0;
L
Liu Jicong 已提交
107 108 109 110 111 112 113
  pTq->pOffsetStore = pStore;

  pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
  if (pStore->pHash == NULL) {
    taosMemoryFree(pStore);
    return NULL;
  }
114

L
Liu Jicong 已提交
115 116
  char* fname = tqOffsetBuildFName(pStore->pTq->path, 0);
  if (tqOffsetRestoreFromFile(pStore, fname) < 0) {
L
Liu Jicong 已提交
117 118 119
    taosMemoryFree(fname);
    taosMemoryFree(pStore);
    return NULL;
L
Liu Jicong 已提交
120
  }
121

L
Liu Jicong 已提交
122
  taosMemoryFree(fname);
L
Liu Jicong 已提交
123 124
  return pStore;
}
125 126

void tqOffsetClose(STqOffsetStore* pStore) {
L
Liu Jicong 已提交
127
  tqOffsetCommitFile(pStore);
128
  taosHashCleanup(pStore->pHash);
129
  taosMemoryFree(pStore);
130 131 132 133 134 135 136
}

STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey) {
  return (STqOffset*)taosHashGet(pStore->pHash, subscribeKey, strlen(subscribeKey));
}

int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) {
137
  pStore->needCommit = 1;
138 139 140
  return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
}

L
Liu Jicong 已提交
141 142 143 144
int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey) {
  return taosHashRemove(pStore->pHash, subscribeKey, strlen(subscribeKey));
}

L
Liu Jicong 已提交
145
int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
146 147 148 149
  if (!pStore->needCommit) {
    return 0;
  }

L
Liu Jicong 已提交
150 151
  // TODO file name should be with a newer version
  char*     fname = tqOffsetBuildFName(pStore->pTq->path, 0);
152
  TdFilePtr pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
153
  if (pFile == NULL) {
154
    terrno = TAOS_SYSTEM_ERROR(errno);
155 156
    const char* err = strerror(errno);
    tqError("vgId:%d, failed to open offset file %s, since %s", TD_VID(pStore->pTq->pVnode), fname, err);
L
Liu Jicong 已提交
157
    taosMemoryFree(fname);
158 159
    return -1;
  }
160

161
  taosMemoryFree(fname);
162

163 164 165
  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pStore->pHash, pIter);
166 167 168 169
    if (pIter == NULL) {
      break;
    }

170 171 172 173
    STqOffset* pOffset = (STqOffset*)pIter;
    int32_t    bodyLen;
    int32_t    code;
    tEncodeSize(tEncodeSTqOffset, pOffset, bodyLen, code);
174

175 176 177 178 179 180 181 182 183 184 185 186 187
    if (code < 0) {
      taosHashCancelIterate(pStore->pHash, pIter);
      return -1;
    }

    int32_t totLen = sizeof(STqOffsetHead) + bodyLen;
    void*   buf = taosMemoryCalloc(1, totLen);
    void*   abuf = POINTER_SHIFT(buf, sizeof(STqOffsetHead));

    ((STqOffsetHead*)buf)->size = htonl(bodyLen);
    SEncoder encoder;
    tEncoderInit(&encoder, abuf, bodyLen);
    tEncodeSTqOffset(&encoder, pOffset);
188

189 190
    // write file
    int64_t writeLen;
L
Liu Jicong 已提交
191
    if ((writeLen = taosWriteFile(pFile, buf, totLen)) != totLen) {
S
Shengliang Guan 已提交
192
      tqError("write offset incomplete, len %d, write len %" PRId64, bodyLen, writeLen);
193
      taosHashCancelIterate(pStore->pHash, pIter);
194
      taosMemoryFree(buf);
195 196
      return -1;
    }
197

198
    taosMemoryFree(buf);
199
  }
200

201 202
  // close and rename file
  taosCloseFile(&pFile);
203
  pStore->needCommit = 0;
204 205
  return 0;
}