tqMeta.c 7.7 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
L
Liu Jicong 已提交
15 16 17
#include "tdbInt.h"
#include "tq.h"

L
Liu Jicong 已提交
18
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
L
Liu Jicong 已提交
19 20 21
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeCStr(pEncoder, pHandle->subKey) < 0) return -1;
  if (tEncodeI64(pEncoder, pHandle->consumerId) < 0) return -1;
L
Liu Jicong 已提交
22
  if (tEncodeI64(pEncoder, pHandle->snapshotVer) < 0) return -1;
L
Liu Jicong 已提交
23 24 25
  if (tEncodeI32(pEncoder, pHandle->epoch) < 0) return -1;
  if (tEncodeI8(pEncoder, pHandle->execHandle.subType) < 0) return -1;
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
26
    if (tEncodeCStr(pEncoder, pHandle->execHandle.execCol.qmsg) < 0) return -1;
L
Liu Jicong 已提交
27 28 29 30 31
  }
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

L
Liu Jicong 已提交
32
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
L
Liu Jicong 已提交
33 34 35
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeCStrTo(pDecoder, pHandle->subKey) < 0) return -1;
  if (tDecodeI64(pDecoder, &pHandle->consumerId) < 0) return -1;
L
Liu Jicong 已提交
36
  if (tDecodeI64(pDecoder, &pHandle->snapshotVer) < 0) return -1;
L
Liu Jicong 已提交
37 38 39
  if (tDecodeI32(pDecoder, &pHandle->epoch) < 0) return -1;
  if (tDecodeI8(pDecoder, &pHandle->execHandle.subType) < 0) return -1;
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
40
    if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg) < 0) return -1;
L
Liu Jicong 已提交
41 42 43 44 45
  }
  tEndDecode(pDecoder);
  return 0;
}

46 47
int32_t tqMetaOpen(STQ* pTq) {
  if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB) < 0) {
L
Liu Jicong 已提交
48
    ASSERT(0);
L
Liu Jicong 已提交
49
    return -1;
L
Liu Jicong 已提交
50 51
  }

52 53 54 55
  if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore) < 0) {
    ASSERT(0);
    return -1;
  }
L
Liu Jicong 已提交
56

57 58 59 60
  if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore) < 0) {
    ASSERT(0);
    return -1;
  }
L
Liu Jicong 已提交
61

62 63 64
  if (tqMetaRestoreHandle(pTq) < 0) {
    return -1;
  }
L
Liu Jicong 已提交
65

66 67 68
  if (tqMetaRestoreCheckInfo(pTq) < 0) {
    return -1;
  }
L
Liu Jicong 已提交
69

70 71
  return 0;
}
wmmhello's avatar
wmmhello 已提交
72

73 74 75
int32_t tqMetaClose(STQ* pTq) {
  if (pTq->pExecStore) {
    tdbTbClose(pTq->pExecStore);
L
Liu Jicong 已提交
76
  }
77 78 79 80
  if (pTq->pCheckStore) {
    tdbTbClose(pTq->pCheckStore);
  }
  tdbClose(pTq->pMetaDB);
L
Liu Jicong 已提交
81 82
  return 0;
}
L
Liu Jicong 已提交
83

84 85 86
int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen) {
  TXN txn;
  if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
L
Liu Jicong 已提交
87
    return -1;
L
Liu Jicong 已提交
88
  }
wmmhello's avatar
wmmhello 已提交
89

90
  if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
L
Liu Jicong 已提交
91 92
    return -1;
  }
wmmhello's avatar
wmmhello 已提交
93

94 95 96
  if (tdbTbUpsert(pTq->pExecStore, key, strlen(key), value, vLen, &txn) < 0) {
    return -1;
  }
wmmhello's avatar
wmmhello 已提交
97

98
  if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
L
Liu Jicong 已提交
99
    return -1;
L
Liu Jicong 已提交
100 101
  }

L
Liu Jicong 已提交
102 103 104
  return 0;
}

105 106 107 108
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) {
  TXN txn;

  if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
L
Liu Jicong 已提交
109 110
    ASSERT(0);
  }
L
Liu Jicong 已提交
111

112
  if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
L
Liu Jicong 已提交
113 114 115
    ASSERT(0);
  }

116 117 118 119 120 121
  if (tdbTbDelete(pTq->pCheckStore, key, (int)strlen(key), &txn) < 0) {
    /*ASSERT(0);*/
  }

  if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
    ASSERT(0);
L
Liu Jicong 已提交
122 123
  }

L
Liu Jicong 已提交
124 125 126
  return 0;
}

127 128 129 130 131
int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
  TBC* pCur = NULL;
  if (tdbTbcOpen(pTq->pCheckStore, &pCur, NULL) < 0) {
    ASSERT(0);
    return -1;
132
  }
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153

  void*    pKey = NULL;
  int      kLen = 0;
  void*    pVal = NULL;
  int      vLen = 0;
  SDecoder decoder;

  tdbTbcMoveToFirst(pCur);

  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
    STqCheckInfo info;
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
    if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
    tDecoderClear(&decoder);
    if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
154
  }
155
  tdbTbcClose(pCur);
L
Liu Jicong 已提交
156 157 158 159 160 161 162 163 164
  return 0;
}

int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
  int32_t code;
  int32_t vlen;
  tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
  ASSERT(code == 0);

S
Shengliang Guan 已提交
165
  tqDebug("tq save %s(%d) consumer %" PRId64 " vgId:%d", pHandle->subKey, strlen(pHandle->subKey), pHandle->consumerId,
M
Minglei Jin 已提交
166 167
          TD_VID(pTq->pVnode));

L
Liu Jicong 已提交
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
  void* buf = taosMemoryCalloc(1, vlen);
  if (buf == NULL) {
    ASSERT(0);
  }

  SEncoder encoder;
  tEncoderInit(&encoder, buf, vlen);

  if (tEncodeSTqHandle(&encoder, pHandle) < 0) {
    ASSERT(0);
  }

  TXN txn;

  if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
    ASSERT(0);
  }

186
  if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
L
Liu Jicong 已提交
187 188 189 190 191 192 193
    ASSERT(0);
  }

  if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, &txn) < 0) {
    ASSERT(0);
  }

194
  if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
L
Liu Jicong 已提交
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
    ASSERT(0);
  }

  tEncoderClear(&encoder);
  taosMemoryFree(buf);
  return 0;
}

int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
  TXN txn;

  if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
    ASSERT(0);
  }

210
  if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
L
Liu Jicong 已提交
211 212 213 214 215 216 217
    ASSERT(0);
  }

  if (tdbTbDelete(pTq->pExecStore, key, (int)strlen(key), &txn) < 0) {
    /*ASSERT(0);*/
  }

218
  if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
L
Liu Jicong 已提交
219 220 221 222 223
    ASSERT(0);
  }

  return 0;
}
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251

int32_t tqMetaRestoreHandle(STQ* pTq) {
  TBC* pCur = NULL;
  if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
    ASSERT(0);
    return -1;
  }

  void*    pKey = NULL;
  int      kLen = 0;
  void*    pVal = NULL;
  int      vLen = 0;
  SDecoder decoder;

  tdbTbcMoveToFirst(pCur);

  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
    STqHandle handle;
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
    tDecodeSTqHandle(&decoder, &handle);

    handle.pRef = walOpenRef(pTq->pVnode->pWal);
    if (handle.pRef == NULL) {
      ASSERT(0);
      return -1;
    }
    walRefVer(handle.pRef, handle.snapshotVer);

wmmhello's avatar
wmmhello 已提交
252 253 254 255 256 257 258 259
    SReadHandle reader = {
        .meta = pTq->pVnode->pMeta,
        .vnode = pTq->pVnode,
        .initTableReader = true,
        .initTqReader = true,
        .version = handle.snapshotVer,
    };

260
    if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
wmmhello's avatar
wmmhello 已提交
261 262 263 264

      handle.execHandle.task = qCreateQueueExecTaskInfo(
          handle.execHandle.execCol.qmsg, &reader, NULL, &handle.execHandle.pSchemaWrapper);
      ASSERT(handle.execHandle.task);
265
      void* scanner = NULL;
wmmhello's avatar
wmmhello 已提交
266
      qExtractStreamScanner(handle.execHandle.task, &scanner);
267 268 269
      ASSERT(scanner);
      handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
      ASSERT(handle.execHandle.pExecReader);
wmmhello's avatar
wmmhello 已提交
270
    } else {
wmmhello's avatar
wmmhello 已提交
271

wmmhello's avatar
wmmhello 已提交
272
      handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
wmmhello's avatar
wmmhello 已提交
273 274
      handle.execHandle.execDb.pFilterOutTbUid =
          taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
wmmhello's avatar
wmmhello 已提交
275
//      handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
wmmhello's avatar
wmmhello 已提交
276 277 278 279
      buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext));

      handle.execHandle.task =
          qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
280 281 282 283 284 285 286 287 288
    }
    tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode));
    taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
  }

  tdbTbcClose(pCur);
  return 0;
}