tqMeta.c 11.6 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
L
Liu Jicong 已提交
15 16 17
#include "tdbInt.h"
#include "tq.h"

L
Liu Jicong 已提交
18
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
L
Liu Jicong 已提交
19 20
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeCStr(pEncoder, pHandle->subKey) < 0) return -1;
21
  if (tEncodeI8(pEncoder, pHandle->fetchMeta) < 0) return -1;
L
Liu Jicong 已提交
22
  if (tEncodeI64(pEncoder, pHandle->consumerId) < 0) return -1;
L
Liu Jicong 已提交
23
  if (tEncodeI64(pEncoder, pHandle->snapshotVer) < 0) return -1;
L
Liu Jicong 已提交
24 25 26
  if (tEncodeI32(pEncoder, pHandle->epoch) < 0) return -1;
  if (tEncodeI8(pEncoder, pHandle->execHandle.subType) < 0) return -1;
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
27
    if (tEncodeCStr(pEncoder, pHandle->execHandle.execCol.qmsg) < 0) return -1;
28
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
29 30
    int32_t size = taosHashGetSize(pHandle->execHandle.execDb.pFilterOutTbUid);
    if (tEncodeI32(pEncoder, size) < 0) return -1;
31
    void* pIter = NULL;
32
    pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
33 34
    while (pIter) {
      int64_t* tbUid = (int64_t*)taosHashGetKey(pIter, NULL);
35 36 37
      if (tEncodeI64(pEncoder, *tbUid) < 0) return -1;
      pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
    }
38
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
39
    if (tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid) < 0) return -1;
40
    if (tEncodeCStr(pEncoder, pHandle->execHandle.execTb.qmsg) < 0) return -1;
L
Liu Jicong 已提交
41 42 43 44 45
  }
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

L
Liu Jicong 已提交
46
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
L
Liu Jicong 已提交
47 48
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeCStrTo(pDecoder, pHandle->subKey) < 0) return -1;
49
  if (tDecodeI8(pDecoder, &pHandle->fetchMeta) < 0) return -1;
L
Liu Jicong 已提交
50
  if (tDecodeI64(pDecoder, &pHandle->consumerId) < 0) return -1;
L
Liu Jicong 已提交
51
  if (tDecodeI64(pDecoder, &pHandle->snapshotVer) < 0) return -1;
L
Liu Jicong 已提交
52 53 54
  if (tDecodeI32(pDecoder, &pHandle->epoch) < 0) return -1;
  if (tDecodeI8(pDecoder, &pHandle->execHandle.subType) < 0) return -1;
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
55
    if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg) < 0) return -1;
56
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
57
    pHandle->execHandle.execDb.pFilterOutTbUid =
wmmhello's avatar
wmmhello 已提交
58
        taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
59 60
    int32_t size = 0;
    if (tDecodeI32(pDecoder, &size) < 0) return -1;
61
    for (int32_t i = 0; i < size; i++) {
62 63 64 65
      int64_t tbUid = 0;
      if (tDecodeI64(pDecoder, &tbUid) < 0) return -1;
      taosHashPut(pHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
    }
66
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
67
    if (tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid) < 0) return -1;
wmmhello's avatar
wmmhello 已提交
68 69 70
    if (!tDecodeIsEnd(pDecoder)){
      if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execTb.qmsg) < 0) return -1;
    }
L
Liu Jicong 已提交
71 72 73 74 75
  }
  tEndDecode(pDecoder);
  return 0;
}

76
int32_t tqMetaOpen(STQ* pTq) {
77
  if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0) < 0) {
L
Liu Jicong 已提交
78
    return -1;
L
Liu Jicong 已提交
79 80
  }

81
  if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0) < 0) {
82 83
    return -1;
  }
L
Liu Jicong 已提交
84

85
  if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore, 0) < 0) {
86 87
    return -1;
  }
L
Liu Jicong 已提交
88

89 90 91
  if (tqMetaRestoreHandle(pTq) < 0) {
    return -1;
  }
L
Liu Jicong 已提交
92

93 94 95
  if (tqMetaRestoreCheckInfo(pTq) < 0) {
    return -1;
  }
L
Liu Jicong 已提交
96

97 98
  return 0;
}
wmmhello's avatar
wmmhello 已提交
99

100 101 102
int32_t tqMetaClose(STQ* pTq) {
  if (pTq->pExecStore) {
    tdbTbClose(pTq->pExecStore);
L
Liu Jicong 已提交
103
  }
104 105 106 107
  if (pTq->pCheckStore) {
    tdbTbClose(pTq->pCheckStore);
  }
  tdbClose(pTq->pMetaDB);
L
Liu Jicong 已提交
108 109
  return 0;
}
L
Liu Jicong 已提交
110

111
int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen) {
112
  TXN* txn;
wmmhello's avatar
wmmhello 已提交
113

114 115
  if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
      0) {
L
Liu Jicong 已提交
116 117
    return -1;
  }
wmmhello's avatar
wmmhello 已提交
118

119
  if (tdbTbUpsert(pTq->pCheckStore, key, strlen(key), value, vLen, txn) < 0) {
120 121
    return -1;
  }
wmmhello's avatar
wmmhello 已提交
122

123
  if (tdbCommit(pTq->pMetaDB, txn) < 0) {
L
Liu Jicong 已提交
124
    return -1;
L
Liu Jicong 已提交
125 126
  }

127
  if (tdbPostCommit(pTq->pMetaDB, txn) < 0) {
M
Minglei Jin 已提交
128 129 130
    return -1;
  }

L
Liu Jicong 已提交
131 132 133
  return 0;
}

134
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) {
135
  TXN* txn;
L
Liu Jicong 已提交
136

137 138
  if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
      0) {
L
Liu Jicong 已提交
139
    return -1;
L
Liu Jicong 已提交
140 141
  }

142
  if (tdbTbDelete(pTq->pCheckStore, key, (int)strlen(key), txn) < 0) {
L
Liu Jicong 已提交
143
    tqWarn("vgId:%d, tq try delete checkinfo failed %s", pTq->pVnode->config.vgId, key);
144 145
  }

146
  if (tdbCommit(pTq->pMetaDB, txn) < 0) {
L
Liu Jicong 已提交
147
    return -1;
L
Liu Jicong 已提交
148 149
  }

150
  if (tdbPostCommit(pTq->pMetaDB, txn) < 0) {
L
Liu Jicong 已提交
151
    return -1;
M
Minglei Jin 已提交
152 153
  }

L
Liu Jicong 已提交
154 155 156
  return 0;
}

157 158 159 160
int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
  TBC* pCur = NULL;
  if (tdbTbcOpen(pTq->pCheckStore, &pCur, NULL) < 0) {
    return -1;
161
  }
162 163 164 165 166 167 168 169 170 171 172 173 174 175

  void*    pKey = NULL;
  int      kLen = 0;
  void*    pVal = NULL;
  int      vLen = 0;
  SDecoder decoder;

  tdbTbcMoveToFirst(pCur);

  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
    STqCheckInfo info;
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
    if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
176
      tdbFree(pKey);
M
Minglei Jin 已提交
177
      tdbFree(pVal);
L
Liu Jicong 已提交
178
      tdbTbcClose(pCur);
179 180 181 182 183
      return -1;
    }
    tDecoderClear(&decoder);
    if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
184
      tdbFree(pKey);
M
Minglei Jin 已提交
185
      tdbFree(pVal);
L
Liu Jicong 已提交
186
      tdbTbcClose(pCur);
187 188
      return -1;
    }
189
  }
190
  tdbFree(pKey);
M
Minglei Jin 已提交
191
  tdbFree(pVal);
192
  tdbTbcClose(pCur);
L
Liu Jicong 已提交
193 194 195 196 197 198 199
  return 0;
}

int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
  int32_t code;
  int32_t vlen;
  tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
L
Liu Jicong 已提交
200 201 202
  if (code < 0) {
    return -1;
  }
L
Liu Jicong 已提交
203

204
  tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 " epoch:%d vgId:%d", pHandle->subKey,
205
          (int32_t)strlen(pHandle->subKey), pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode));
M
Minglei Jin 已提交
206

L
Liu Jicong 已提交
207 208
  void* buf = taosMemoryCalloc(1, vlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
209
    return -1;
L
Liu Jicong 已提交
210 211 212 213 214 215
  }

  SEncoder encoder;
  tEncoderInit(&encoder, buf, vlen);

  if (tEncodeSTqHandle(&encoder, pHandle) < 0) {
L
Liu Jicong 已提交
216 217 218
    tEncoderClear(&encoder);
    taosMemoryFree(buf);
    return -1;
L
Liu Jicong 已提交
219 220
  }

221
  TXN* txn;
L
Liu Jicong 已提交
222

223 224
  if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
      0) {
L
Liu Jicong 已提交
225 226 227
    tEncoderClear(&encoder);
    taosMemoryFree(buf);
    return -1;
L
Liu Jicong 已提交
228 229
  }

230
  if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, txn) < 0) {
L
Liu Jicong 已提交
231 232 233
    tEncoderClear(&encoder);
    taosMemoryFree(buf);
    return -1;
L
Liu Jicong 已提交
234 235
  }

236
  if (tdbCommit(pTq->pMetaDB, txn) < 0) {
L
Liu Jicong 已提交
237 238 239
    tEncoderClear(&encoder);
    taosMemoryFree(buf);
    return -1;
L
Liu Jicong 已提交
240 241
  }

242
  if (tdbPostCommit(pTq->pMetaDB, txn) < 0) {
L
Liu Jicong 已提交
243 244 245
    tEncoderClear(&encoder);
    taosMemoryFree(buf);
    return -1;
M
Minglei Jin 已提交
246 247
  }

L
Liu Jicong 已提交
248 249 250 251 252 253
  tEncoderClear(&encoder);
  taosMemoryFree(buf);
  return 0;
}

int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
254
  TXN* txn;
L
Liu Jicong 已提交
255

256 257
  if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
      0) {
L
Liu Jicong 已提交
258
    return -1;
L
Liu Jicong 已提交
259 260
  }

261
  if (tdbTbDelete(pTq->pExecStore, key, (int)strlen(key), txn) < 0) {
L
Liu Jicong 已提交
262 263
  }

264
  if (tdbCommit(pTq->pMetaDB, txn) < 0) {
L
Liu Jicong 已提交
265
    return -1;
L
Liu Jicong 已提交
266 267
  }

268
  if (tdbPostCommit(pTq->pMetaDB, txn) < 0) {
L
Liu Jicong 已提交
269
    return -1;
M
Minglei Jin 已提交
270 271
  }

L
Liu Jicong 已提交
272 273
  return 0;
}
274 275

int32_t tqMetaRestoreHandle(STQ* pTq) {
X
Xiaoyu Wang 已提交
276
  int  code = 0;
277 278 279 280 281
  TBC* pCur = NULL;
  if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
    return -1;
  }

282
  int32_t  vgId = TD_VID(pTq->pVnode);
283 284 285 286 287 288 289 290 291
  void*    pKey = NULL;
  int      kLen = 0;
  void*    pVal = NULL;
  int      vLen = 0;
  SDecoder decoder;

  tdbTbcMoveToFirst(pCur);

  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
292
    STqHandle handle = {0};
293 294
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
    tDecodeSTqHandle(&decoder, &handle);
L
Liu Jicong 已提交
295
    tDecoderClear(&decoder);
296 297 298

    handle.pRef = walOpenRef(pTq->pVnode->pWal);
    if (handle.pRef == NULL) {
wmmhello's avatar
wmmhello 已提交
299 300
      code = -1;
      goto end;
301
    }
wmmhello's avatar
wmmhello 已提交
302
    walSetRefVer(handle.pRef, handle.snapshotVer);
303

wmmhello's avatar
wmmhello 已提交
304 305 306 307
    SReadHandle reader = {
        .vnode = pTq->pVnode,
        .initTableReader = true,
        .initTqReader = true,
308
        .version = handle.snapshotVer
wmmhello's avatar
wmmhello 已提交
309 310
    };

H
Haojun Liao 已提交
311 312
    initStorageAPI(&reader.api);

313
    if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
314
      handle.execHandle.task =
315
          qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, 0);
L
Liu Jicong 已提交
316 317
      if (handle.execHandle.task == NULL) {
        tqError("cannot create exec task for %s", handle.subKey);
wmmhello's avatar
wmmhello 已提交
318 319
        code = -1;
        goto end;
L
Liu Jicong 已提交
320
      }
321
      void* scanner = NULL;
wmmhello's avatar
wmmhello 已提交
322
      qExtractStreamScanner(handle.execHandle.task, &scanner);
L
Liu Jicong 已提交
323 324
      if (scanner == NULL) {
        tqError("cannot extract stream scanner for %s", handle.subKey);
wmmhello's avatar
wmmhello 已提交
325 326
        code = -1;
        goto end;
L
Liu Jicong 已提交
327
      }
328 329
      handle.execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
      if (handle.execHandle.pTqReader == NULL) {
L
Liu Jicong 已提交
330
        tqError("cannot extract exec reader for %s", handle.subKey);
wmmhello's avatar
wmmhello 已提交
331 332
        code = -1;
        goto end;
L
Liu Jicong 已提交
333
      }
334
    } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) {
wmmhello's avatar
wmmhello 已提交
335
      handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
336
      handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode);
337

338
      buildSnapContext(reader.vnode, reader.version, 0, handle.execHandle.subType, handle.fetchMeta,
339
                       (SSnapContext**)(&reader.sContext));
340
      handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0);
341 342
    } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
      handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
wmmhello's avatar
wmmhello 已提交
343

wmmhello's avatar
wmmhello 已提交
344
      if(handle.execHandle.execTb.qmsg != NULL && strcmp(handle.execHandle.execTb.qmsg, "") != 0) {
345 346 347 348
        if (nodesStringToNode(handle.execHandle.execTb.qmsg, &handle.execHandle.execTb.node) != 0) {
          tqError("nodesStringToNode error in sub stable, since %s", terrstr());
          return -1;
        }
349
      }
wmmhello's avatar
wmmhello 已提交
350 351 352
      buildSnapContext(reader.vnode, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType,
                       handle.fetchMeta, (SSnapContext**)(&reader.sContext));
      handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0);
353 354

      SArray* tbUidList = NULL;
wmmhello's avatar
wmmhello 已提交
355
      int ret = qGetTableList(handle.execHandle.execTb.suid, pTq->pVnode, handle.execHandle.execTb.node, &tbUidList, handle.execHandle.task);
356 357 358 359 360 361
      if(ret != TDB_CODE_SUCCESS) {
        tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle.subKey, handle.consumerId);
        taosArrayDestroy(tbUidList);
        goto end;
      }
      tqDebug("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid);
362
      handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode);
363
      tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList);
364
      taosArrayDestroy(tbUidList);
365
    }
366
    tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId);
wmmhello's avatar
wmmhello 已提交
367
    taosWLockLatch(&pTq->lock);
368
    taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
wmmhello's avatar
wmmhello 已提交
369
    taosWUnLockLatch(&pTq->lock);
370 371
  }

wmmhello's avatar
wmmhello 已提交
372
end:
L
Liu Jicong 已提交
373 374
  tdbFree(pKey);
  tdbFree(pVal);
375
  tdbTbcClose(pCur);
wmmhello's avatar
wmmhello 已提交
376
  return code;
377
}