tqMeta.c 13.4 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
L
Liu Jicong 已提交
15 16 17
#include "tdbInt.h"
#include "tq.h"

L
Liu Jicong 已提交
18
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
L
Liu Jicong 已提交
19 20
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeCStr(pEncoder, pHandle->subKey) < 0) return -1;
21
  if (tEncodeI8(pEncoder, pHandle->fetchMeta) < 0) return -1;
L
Liu Jicong 已提交
22
  if (tEncodeI64(pEncoder, pHandle->consumerId) < 0) return -1;
L
Liu Jicong 已提交
23
  if (tEncodeI64(pEncoder, pHandle->snapshotVer) < 0) return -1;
L
Liu Jicong 已提交
24 25 26
  if (tEncodeI32(pEncoder, pHandle->epoch) < 0) return -1;
  if (tEncodeI8(pEncoder, pHandle->execHandle.subType) < 0) return -1;
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
27
    if (tEncodeCStr(pEncoder, pHandle->execHandle.execCol.qmsg) < 0) return -1;
28
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
29 30
    int32_t size = taosHashGetSize(pHandle->execHandle.execDb.pFilterOutTbUid);
    if (tEncodeI32(pEncoder, size) < 0) return -1;
31
    void* pIter = NULL;
32
    pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
33 34
    while (pIter) {
      int64_t* tbUid = (int64_t*)taosHashGetKey(pIter, NULL);
35 36 37
      if (tEncodeI64(pEncoder, *tbUid) < 0) return -1;
      pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
    }
38
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
39
    if (tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid) < 0) return -1;
wmmhello's avatar
wmmhello 已提交
40 41 42
    if (pHandle->execHandle.execTb.qmsg != NULL){
      if (tEncodeCStr(pEncoder, pHandle->execHandle.execTb.qmsg) < 0) return -1;
    }
L
Liu Jicong 已提交
43 44 45 46 47
  }
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

L
Liu Jicong 已提交
48
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
L
Liu Jicong 已提交
49 50
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeCStrTo(pDecoder, pHandle->subKey) < 0) return -1;
51
  if (tDecodeI8(pDecoder, &pHandle->fetchMeta) < 0) return -1;
L
Liu Jicong 已提交
52
  if (tDecodeI64(pDecoder, &pHandle->consumerId) < 0) return -1;
L
Liu Jicong 已提交
53
  if (tDecodeI64(pDecoder, &pHandle->snapshotVer) < 0) return -1;
L
Liu Jicong 已提交
54 55 56
  if (tDecodeI32(pDecoder, &pHandle->epoch) < 0) return -1;
  if (tDecodeI8(pDecoder, &pHandle->execHandle.subType) < 0) return -1;
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
57
    if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg) < 0) return -1;
58
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
59
    pHandle->execHandle.execDb.pFilterOutTbUid =
wmmhello's avatar
wmmhello 已提交
60
        taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
61 62
    int32_t size = 0;
    if (tDecodeI32(pDecoder, &size) < 0) return -1;
63
    for (int32_t i = 0; i < size; i++) {
64 65 66 67
      int64_t tbUid = 0;
      if (tDecodeI64(pDecoder, &tbUid) < 0) return -1;
      taosHashPut(pHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
    }
68
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
69
    if (tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid) < 0) return -1;
wmmhello's avatar
wmmhello 已提交
70 71 72
    if (!tDecodeIsEnd(pDecoder)){
      if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execTb.qmsg) < 0) return -1;
    }
L
Liu Jicong 已提交
73 74 75 76 77
  }
  tEndDecode(pDecoder);
  return 0;
}

78
int32_t tqMetaOpen(STQ* pTq) {
79
  if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0) < 0) {
L
Liu Jicong 已提交
80
    return -1;
L
Liu Jicong 已提交
81 82
  }

83
  if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0) < 0) {
84 85
    return -1;
  }
L
Liu Jicong 已提交
86

87
  if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore, 0) < 0) {
88 89
    return -1;
  }
L
Liu Jicong 已提交
90

91 92 93
//  if (tqMetaRestoreHandle(pTq) < 0) {
//    return -1;
//  }
L
Liu Jicong 已提交
94

95 96 97
  if (tqMetaRestoreCheckInfo(pTq) < 0) {
    return -1;
  }
L
Liu Jicong 已提交
98

99 100
  return 0;
}
wmmhello's avatar
wmmhello 已提交
101

102 103 104
int32_t tqMetaClose(STQ* pTq) {
  if (pTq->pExecStore) {
    tdbTbClose(pTq->pExecStore);
L
Liu Jicong 已提交
105
  }
106 107 108 109
  if (pTq->pCheckStore) {
    tdbTbClose(pTq->pCheckStore);
  }
  tdbClose(pTq->pMetaDB);
L
Liu Jicong 已提交
110 111
  return 0;
}
L
Liu Jicong 已提交
112

113
int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen) {
114
  TXN* txn;
wmmhello's avatar
wmmhello 已提交
115

116 117
  if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
      0) {
L
Liu Jicong 已提交
118 119
    return -1;
  }
wmmhello's avatar
wmmhello 已提交
120

121
  if (tdbTbUpsert(pTq->pCheckStore, key, strlen(key), value, vLen, txn) < 0) {
122 123
    return -1;
  }
wmmhello's avatar
wmmhello 已提交
124

125
  if (tdbCommit(pTq->pMetaDB, txn) < 0) {
L
Liu Jicong 已提交
126
    return -1;
L
Liu Jicong 已提交
127 128
  }

129
  if (tdbPostCommit(pTq->pMetaDB, txn) < 0) {
M
Minglei Jin 已提交
130 131 132
    return -1;
  }

L
Liu Jicong 已提交
133 134 135
  return 0;
}

136
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) {
137
  TXN* txn;
L
Liu Jicong 已提交
138

139 140
  if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
      0) {
L
Liu Jicong 已提交
141
    return -1;
L
Liu Jicong 已提交
142 143
  }

144
  if (tdbTbDelete(pTq->pCheckStore, key, (int)strlen(key), txn) < 0) {
L
Liu Jicong 已提交
145
    tqWarn("vgId:%d, tq try delete checkinfo failed %s", pTq->pVnode->config.vgId, key);
146 147
  }

148
  if (tdbCommit(pTq->pMetaDB, txn) < 0) {
L
Liu Jicong 已提交
149
    return -1;
L
Liu Jicong 已提交
150 151
  }

152
  if (tdbPostCommit(pTq->pMetaDB, txn) < 0) {
L
Liu Jicong 已提交
153
    return -1;
M
Minglei Jin 已提交
154 155
  }

L
Liu Jicong 已提交
156 157 158
  return 0;
}

159 160 161 162
int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
  TBC* pCur = NULL;
  if (tdbTbcOpen(pTq->pCheckStore, &pCur, NULL) < 0) {
    return -1;
163
  }
164 165 166 167 168 169 170 171 172 173 174 175 176 177

  void*    pKey = NULL;
  int      kLen = 0;
  void*    pVal = NULL;
  int      vLen = 0;
  SDecoder decoder;

  tdbTbcMoveToFirst(pCur);

  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
    STqCheckInfo info;
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
    if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
178
      tdbFree(pKey);
M
Minglei Jin 已提交
179
      tdbFree(pVal);
L
Liu Jicong 已提交
180
      tdbTbcClose(pCur);
181 182 183 184 185
      return -1;
    }
    tDecoderClear(&decoder);
    if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
186
      tdbFree(pKey);
M
Minglei Jin 已提交
187
      tdbFree(pVal);
L
Liu Jicong 已提交
188
      tdbTbcClose(pCur);
189 190
      return -1;
    }
191
  }
192
  tdbFree(pKey);
M
Minglei Jin 已提交
193
  tdbFree(pVal);
194
  tdbTbcClose(pCur);
L
Liu Jicong 已提交
195 196 197 198 199 200 201
  return 0;
}

int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
  int32_t code;
  int32_t vlen;
  tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
L
Liu Jicong 已提交
202 203 204
  if (code < 0) {
    return -1;
  }
L
Liu Jicong 已提交
205

206
  tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 " epoch:%d vgId:%d", pHandle->subKey,
207
          (int32_t)strlen(pHandle->subKey), pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode));
M
Minglei Jin 已提交
208

L
Liu Jicong 已提交
209 210
  void* buf = taosMemoryCalloc(1, vlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
211
    return -1;
L
Liu Jicong 已提交
212 213 214 215 216 217
  }

  SEncoder encoder;
  tEncoderInit(&encoder, buf, vlen);

  if (tEncodeSTqHandle(&encoder, pHandle) < 0) {
L
Liu Jicong 已提交
218 219 220
    tEncoderClear(&encoder);
    taosMemoryFree(buf);
    return -1;
L
Liu Jicong 已提交
221 222
  }

223
  TXN* txn;
L
Liu Jicong 已提交
224

225 226
  if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
      0) {
L
Liu Jicong 已提交
227 228 229
    tEncoderClear(&encoder);
    taosMemoryFree(buf);
    return -1;
L
Liu Jicong 已提交
230 231
  }

232
  if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, txn) < 0) {
L
Liu Jicong 已提交
233 234 235
    tEncoderClear(&encoder);
    taosMemoryFree(buf);
    return -1;
L
Liu Jicong 已提交
236 237
  }

238
  if (tdbCommit(pTq->pMetaDB, txn) < 0) {
L
Liu Jicong 已提交
239 240 241
    tEncoderClear(&encoder);
    taosMemoryFree(buf);
    return -1;
L
Liu Jicong 已提交
242 243
  }

244
  if (tdbPostCommit(pTq->pMetaDB, txn) < 0) {
L
Liu Jicong 已提交
245 246 247
    tEncoderClear(&encoder);
    taosMemoryFree(buf);
    return -1;
M
Minglei Jin 已提交
248 249
  }

L
Liu Jicong 已提交
250 251 252 253 254 255
  tEncoderClear(&encoder);
  taosMemoryFree(buf);
  return 0;
}

int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
256
  TXN* txn;
L
Liu Jicong 已提交
257

258 259
  if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
      0) {
L
Liu Jicong 已提交
260
    return -1;
L
Liu Jicong 已提交
261 262
  }

263
  if (tdbTbDelete(pTq->pExecStore, key, (int)strlen(key), txn) < 0) {
L
Liu Jicong 已提交
264 265
  }

266
  if (tdbCommit(pTq->pMetaDB, txn) < 0) {
L
Liu Jicong 已提交
267
    return -1;
L
Liu Jicong 已提交
268 269
  }

270
  if (tdbPostCommit(pTq->pMetaDB, txn) < 0) {
L
Liu Jicong 已提交
271
    return -1;
M
Minglei Jin 已提交
272 273
  }

L
Liu Jicong 已提交
274 275
  return 0;
}
276

277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381
static int buildHandle(STQ* pTq, STqHandle* handle){
  SVnode* pVnode = pTq->pVnode;
  int32_t vgId = TD_VID(pVnode);

  handle->pRef = walOpenRef(pVnode->pWal);
  if (handle->pRef == NULL) {
    return -1;
  }
  walSetRefVer(handle->pRef, handle->snapshotVer);

  SReadHandle reader = {
      .vnode = pVnode,
      .initTableReader = true,
      .initTqReader = true,
      .version = handle->snapshotVer,
  };

  initStorageAPI(&reader.api);

  if (handle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
    handle->execHandle.task =
        qCreateQueueExecTaskInfo(handle->execHandle.execCol.qmsg, &reader, vgId, &handle->execHandle.numOfCols, handle->consumerId);
    if (handle->execHandle.task == NULL) {
      tqError("cannot create exec task for %s", handle->subKey);
      return -1;
    }
    void* scanner = NULL;
    qExtractStreamScanner(handle->execHandle.task, &scanner);
    if (scanner == NULL) {
      tqError("cannot extract stream scanner for %s", handle->subKey);
      return -1;
    }
    handle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
    if (handle->execHandle.pTqReader == NULL) {
      tqError("cannot extract exec reader for %s", handle->subKey);
      return -1;
    }
  } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
    handle->pWalReader = walOpenReader(pVnode->pWal, NULL);
    handle->execHandle.pTqReader = tqReaderOpen(pVnode);

    buildSnapContext(reader.vnode, reader.version, 0, handle->execHandle.subType, handle->fetchMeta,
                     (SSnapContext**)(&reader.sContext));
    handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId);
  } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
    handle->pWalReader = walOpenReader(pVnode->pWal, NULL);

    if(handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) {
      if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) {
        tqError("nodesStringToNode error in sub stable, since %s", terrstr());
        return -1;
      }
    }
    buildSnapContext(reader.vnode, reader.version, handle->execHandle.execTb.suid, handle->execHandle.subType,
                     handle->fetchMeta, (SSnapContext**)(&reader.sContext));
    handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId);

    SArray* tbUidList = NULL;
    int ret = qGetTableList(handle->execHandle.execTb.suid, pVnode, handle->execHandle.execTb.node, &tbUidList, handle->execHandle.task);
    if(ret != TDB_CODE_SUCCESS) {
      tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle->subKey, handle->consumerId);
      taosArrayDestroy(tbUidList);
      return -1;
    }
    tqDebug("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId, handle->execHandle.execTb.suid);
    handle->execHandle.pTqReader = tqReaderOpen(pVnode);
    tqReaderSetTbUidList(handle->execHandle.pTqReader, tbUidList, NULL);
    taosArrayDestroy(tbUidList);
  }
  return 0;
}

static int restoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){
  int32_t  vgId = TD_VID(pTq->pVnode);
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
  tDecodeSTqHandle(&decoder, handle);
  tDecoderClear(&decoder);

  if(buildHandle(pTq, handle) < 0){
    return -1;
  }
  tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
  return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
}

int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
  int32_t  vgId = TD_VID(pTq->pVnode);

  memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN);
  handle->consumerId = req->newConsumerId;
  handle->epoch = -1;

  handle->execHandle.subType = req->subType;
  handle->fetchMeta = req->withMeta;
  if(req->subType == TOPIC_SUB_TYPE__COLUMN){
    handle->execHandle.execCol.qmsg = taosStrdup(req->qmsg);
  }else if(req->subType == TOPIC_SUB_TYPE__DB){
    handle->execHandle.execDb.pFilterOutTbUid =
        taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
  }else if(req->subType == TOPIC_SUB_TYPE__TABLE){
    handle->execHandle.execTb.suid = req->suid;
    handle->execHandle.execTb.qmsg = taosStrdup(req->qmsg);
  }

wmmhello's avatar
wmmhello 已提交
382
  handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal);
383 384 385 386 387 388 389 390

  if(buildHandle(pTq, handle) < 0){
    return -1;
  }
  tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
  return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
}

391
int32_t tqMetaRestoreHandle(STQ* pTq) {
X
Xiaoyu Wang 已提交
392
  int  code = 0;
393 394 395 396 397 398 399 400 401 402 403 404 405
  TBC* pCur = NULL;
  if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
    return -1;
  }

  void*    pKey = NULL;
  int      kLen = 0;
  void*    pVal = NULL;
  int      vLen = 0;

  tdbTbcMoveToFirst(pCur);

  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
406
    STqHandle handle = {0};
407
    code = restoreHandle(pTq, pVal, vLen, &handle);
D
dapan1121 已提交
408
    if (code < 0) {
409 410
      tqDestroyTqHandle(&handle);
      break;
411 412 413
    }
  }

L
Liu Jicong 已提交
414 415
  tdbFree(pKey);
  tdbFree(pVal);
416
  tdbTbcClose(pCur);
wmmhello's avatar
wmmhello 已提交
417
  return code;
418
}
419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434

int32_t tqMetaGetHandle(STQ* pTq, const char* key) {
  void*    pVal = NULL;
  int      vLen = 0;

  if (tdbTbGet(pTq->pExecStore, key, (int)strlen(key), &pVal, &vLen) < 0) {
    return -1;
  }
  STqHandle handle = {0};
  int code = restoreHandle(pTq, pVal, vLen, &handle);
  if (code < 0){
    tqDestroyTqHandle(&handle);
  }
  tdbFree(pVal);
  return code;
}