You need to sign in or sign up before continuing.
tqMeta.c 10.4 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
L
Liu Jicong 已提交
15 16 17
#include "tdbInt.h"
#include "tq.h"

L
Liu Jicong 已提交
18
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
L
Liu Jicong 已提交
19 20
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeCStr(pEncoder, pHandle->subKey) < 0) return -1;
21
  if (tEncodeI8(pEncoder, pHandle->fetchMeta) < 0) return -1;
L
Liu Jicong 已提交
22
  if (tEncodeI64(pEncoder, pHandle->consumerId) < 0) return -1;
L
Liu Jicong 已提交
23
  if (tEncodeI64(pEncoder, pHandle->snapshotVer) < 0) return -1;
L
Liu Jicong 已提交
24 25 26
  if (tEncodeI32(pEncoder, pHandle->epoch) < 0) return -1;
  if (tEncodeI8(pEncoder, pHandle->execHandle.subType) < 0) return -1;
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
27
    if (tEncodeCStr(pEncoder, pHandle->execHandle.execCol.qmsg) < 0) return -1;
28
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
29 30
    int32_t size = taosHashGetSize(pHandle->execHandle.execDb.pFilterOutTbUid);
    if (tEncodeI32(pEncoder, size) < 0) return -1;
31
    void* pIter = NULL;
32
    pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
33 34
    while (pIter) {
      int64_t* tbUid = (int64_t*)taosHashGetKey(pIter, NULL);
35 36 37
      if (tEncodeI64(pEncoder, *tbUid) < 0) return -1;
      pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
    }
38
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
39
    if (tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid) < 0) return -1;
L
Liu Jicong 已提交
40 41 42 43 44
  }
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

L
Liu Jicong 已提交
45
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
L
Liu Jicong 已提交
46 47
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeCStrTo(pDecoder, pHandle->subKey) < 0) return -1;
48
  if (tDecodeI8(pDecoder, &pHandle->fetchMeta) < 0) return -1;
L
Liu Jicong 已提交
49
  if (tDecodeI64(pDecoder, &pHandle->consumerId) < 0) return -1;
L
Liu Jicong 已提交
50
  if (tDecodeI64(pDecoder, &pHandle->snapshotVer) < 0) return -1;
L
Liu Jicong 已提交
51 52 53
  if (tDecodeI32(pDecoder, &pHandle->epoch) < 0) return -1;
  if (tDecodeI8(pDecoder, &pHandle->execHandle.subType) < 0) return -1;
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
54
    if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg) < 0) return -1;
55
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
56 57 58 59
    pHandle->execHandle.execDb.pFilterOutTbUid =
        taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
    int32_t size = 0;
    if (tDecodeI32(pDecoder, &size) < 0) return -1;
60
    for (int32_t i = 0; i < size; i++) {
61 62 63 64
      int64_t tbUid = 0;
      if (tDecodeI64(pDecoder, &tbUid) < 0) return -1;
      taosHashPut(pHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
    }
65
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
66
    if (tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid) < 0) return -1;
L
Liu Jicong 已提交
67 68 69 70 71
  }
  tEndDecode(pDecoder);
  return 0;
}

72
int32_t tqMetaOpen(STQ* pTq) {
73
  if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0) < 0) {
L
Liu Jicong 已提交
74
    ASSERT(0);
L
Liu Jicong 已提交
75
    return -1;
L
Liu Jicong 已提交
76 77
  }

78
  if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0) < 0) {
79 80 81
    ASSERT(0);
    return -1;
  }
L
Liu Jicong 已提交
82

83
  if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore, 0) < 0) {
84 85 86
    ASSERT(0);
    return -1;
  }
L
Liu Jicong 已提交
87

88 89 90
  if (tqMetaRestoreHandle(pTq) < 0) {
    return -1;
  }
L
Liu Jicong 已提交
91

92 93 94
  if (tqMetaRestoreCheckInfo(pTq) < 0) {
    return -1;
  }
L
Liu Jicong 已提交
95

96 97
  return 0;
}
wmmhello's avatar
wmmhello 已提交
98

99 100 101
int32_t tqMetaClose(STQ* pTq) {
  if (pTq->pExecStore) {
    tdbTbClose(pTq->pExecStore);
L
Liu Jicong 已提交
102
  }
103 104 105 106
  if (pTq->pCheckStore) {
    tdbTbClose(pTq->pCheckStore);
  }
  tdbClose(pTq->pMetaDB);
L
Liu Jicong 已提交
107 108
  return 0;
}
L
Liu Jicong 已提交
109

110 111 112
int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen) {
  TXN txn;
  if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
L
Liu Jicong 已提交
113
    return -1;
L
Liu Jicong 已提交
114
  }
wmmhello's avatar
wmmhello 已提交
115

116
  if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
L
Liu Jicong 已提交
117 118
    return -1;
  }
wmmhello's avatar
wmmhello 已提交
119

120
  if (tdbTbUpsert(pTq->pCheckStore, key, strlen(key), value, vLen, &txn) < 0) {
121 122
    return -1;
  }
wmmhello's avatar
wmmhello 已提交
123

124
  if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
L
Liu Jicong 已提交
125
    return -1;
L
Liu Jicong 已提交
126 127
  }

M
Minglei Jin 已提交
128 129 130 131
  if (tdbPostCommit(pTq->pMetaDB, &txn) < 0) {
    return -1;
  }

L
Liu Jicong 已提交
132 133 134
  return 0;
}

135 136 137 138
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) {
  TXN txn;

  if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
L
Liu Jicong 已提交
139 140
    ASSERT(0);
  }
L
Liu Jicong 已提交
141

142
  if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
L
Liu Jicong 已提交
143 144 145
    ASSERT(0);
  }

146 147 148 149 150 151
  if (tdbTbDelete(pTq->pCheckStore, key, (int)strlen(key), &txn) < 0) {
    /*ASSERT(0);*/
  }

  if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
    ASSERT(0);
L
Liu Jicong 已提交
152 153
  }

M
Minglei Jin 已提交
154 155 156 157
  if (tdbPostCommit(pTq->pMetaDB, &txn) < 0) {
    ASSERT(0);
  }

L
Liu Jicong 已提交
158 159 160
  return 0;
}

161 162 163 164 165
int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
  TBC* pCur = NULL;
  if (tdbTbcOpen(pTq->pCheckStore, &pCur, NULL) < 0) {
    ASSERT(0);
    return -1;
166
  }
167 168 169 170 171 172 173 174 175 176 177 178 179 180

  void*    pKey = NULL;
  int      kLen = 0;
  void*    pVal = NULL;
  int      vLen = 0;
  SDecoder decoder;

  tdbTbcMoveToFirst(pCur);

  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
    STqCheckInfo info;
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
    if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
181
      tdbFree(pKey);
L
Liu Jicong 已提交
182
      tdbTbcClose(pCur);
183 184 185 186 187
      return -1;
    }
    tDecoderClear(&decoder);
    if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
188
      tdbFree(pKey);
L
Liu Jicong 已提交
189
      tdbTbcClose(pCur);
190 191
      return -1;
    }
192
  }
193
  tdbFree(pKey);
194
  tdbTbcClose(pCur);
L
Liu Jicong 已提交
195 196 197 198 199 200 201 202 203
  return 0;
}

int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
  int32_t code;
  int32_t vlen;
  tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
  ASSERT(code == 0);

204 205
  tqDebug("tq save %s(%d) consumer %" PRId64 " vgId:%d", pHandle->subKey, (int32_t)strlen(pHandle->subKey),
          pHandle->consumerId, TD_VID(pTq->pVnode));
M
Minglei Jin 已提交
206

L
Liu Jicong 已提交
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
  void* buf = taosMemoryCalloc(1, vlen);
  if (buf == NULL) {
    ASSERT(0);
  }

  SEncoder encoder;
  tEncoderInit(&encoder, buf, vlen);

  if (tEncodeSTqHandle(&encoder, pHandle) < 0) {
    ASSERT(0);
  }

  TXN txn;

  if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
    ASSERT(0);
  }

225
  if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
L
Liu Jicong 已提交
226 227 228 229 230 231 232
    ASSERT(0);
  }

  if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, &txn) < 0) {
    ASSERT(0);
  }

233
  if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
L
Liu Jicong 已提交
234 235 236
    ASSERT(0);
  }

M
Minglei Jin 已提交
237 238 239 240
  if (tdbPostCommit(pTq->pMetaDB, &txn) < 0) {
    ASSERT(0);
  }

L
Liu Jicong 已提交
241 242 243 244 245 246 247 248 249 250 251 252
  tEncoderClear(&encoder);
  taosMemoryFree(buf);
  return 0;
}

int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
  TXN txn;

  if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
    ASSERT(0);
  }

253
  if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
L
Liu Jicong 已提交
254 255 256 257 258 259 260
    ASSERT(0);
  }

  if (tdbTbDelete(pTq->pExecStore, key, (int)strlen(key), &txn) < 0) {
    /*ASSERT(0);*/
  }

261
  if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
L
Liu Jicong 已提交
262 263 264
    ASSERT(0);
  }

M
Minglei Jin 已提交
265 266 267 268
  if (tdbPostCommit(pTq->pMetaDB, &txn) < 0) {
    ASSERT(0);
  }

L
Liu Jicong 已提交
269 270
  return 0;
}
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298

int32_t tqMetaRestoreHandle(STQ* pTq) {
  TBC* pCur = NULL;
  if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
    ASSERT(0);
    return -1;
  }

  void*    pKey = NULL;
  int      kLen = 0;
  void*    pVal = NULL;
  int      vLen = 0;
  SDecoder decoder;

  tdbTbcMoveToFirst(pCur);

  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
    STqHandle handle;
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
    tDecodeSTqHandle(&decoder, &handle);

    handle.pRef = walOpenRef(pTq->pVnode->pWal);
    if (handle.pRef == NULL) {
      ASSERT(0);
      return -1;
    }
    walRefVer(handle.pRef, handle.snapshotVer);

wmmhello's avatar
wmmhello 已提交
299 300 301 302 303 304 305 306
    SReadHandle reader = {
        .meta = pTq->pVnode->pMeta,
        .vnode = pTq->pVnode,
        .initTableReader = true,
        .initTqReader = true,
        .version = handle.snapshotVer,
    };

307
    if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
308 309
      handle.execHandle.task =
          qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, NULL);
wmmhello's avatar
wmmhello 已提交
310
      ASSERT(handle.execHandle.task);
311
      void* scanner = NULL;
wmmhello's avatar
wmmhello 已提交
312
      qExtractStreamScanner(handle.execHandle.task, &scanner);
313 314 315
      ASSERT(scanner);
      handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
      ASSERT(handle.execHandle.pExecReader);
316
    } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) {
wmmhello's avatar
wmmhello 已提交
317
      handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
318 319
      handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);

320 321 322
      buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta,
                       (SSnapContext**)(&reader.sContext));
      handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
323 324
    } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
      handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
wmmhello's avatar
wmmhello 已提交
325

326 327 328 329 330 331 332 333 334 335 336
      SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
      vnodeGetCtbIdList(pTq->pVnode, handle.execHandle.execTb.suid, tbUidList);
      tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid);
      for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
        int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
        tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid);
      }
      handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
      tqReaderSetTbUidList(handle.execHandle.pExecReader, tbUidList);
      taosArrayDestroy(tbUidList);

337 338 339
      buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType,
                       handle.fetchMeta, (SSnapContext**)(&reader.sContext));
      handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
340 341 342 343 344 345 346 347
    }
    tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode));
    taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
  }

  tdbTbcClose(pCur);
  return 0;
}