tqRead.c 18.3 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
L
Liu Jicong 已提交
17

L
Liu Jicong 已提交
18 19
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
20 21 22
    return true;
  }

L
Liu Jicong 已提交
23 24 25
  int16_t msgType = pHead->msgType;
  char*   body = pHead->body;
  int32_t bodyLen = pHead->bodyLen;
26

L
Liu Jicong 已提交
27 28 29 30 31
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
  int64_t  realTbSuid = 0;
  SDecoder coder;
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
  int32_t  len = bodyLen - sizeof(SMsgHead);
32 33 34 35 36 37 38 39 40 41 42 43 44
  tDecoderInit(&coder, data, len);

  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
    SVCreateStbReq req = {0};
    if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
      goto end;
    }
    realTbSuid = req.suid;
  } else if (msgType == TDMT_VND_DROP_STB) {
    SVDropStbReq req = {0};
    if (tDecodeSVDropStbReq(&coder, &req) < 0) {
      goto end;
    }
L
Liu Jicong 已提交
45
    realTbSuid = req.suid;
46 47 48 49 50 51
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
    SVCreateTbBatchReq req = {0};
    if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
      goto end;
    }

L
Liu Jicong 已提交
52
    int32_t        needRebuild = 0;
53 54 55
    SVCreateTbReq* pCreateReq = NULL;
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
      pCreateReq = req.pReqs + iReq;
L
Liu Jicong 已提交
56
      if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
57 58 59
        needRebuild++;
      }
    }
L
Liu Jicong 已提交
60
    if (needRebuild == 0) {
61
      // do nothing
L
Liu Jicong 已提交
62
    } else if (needRebuild == req.nReqs) {
63
      realTbSuid = tbSuid;
L
Liu Jicong 已提交
64
    } else {
65 66 67 68 69
      realTbSuid = tbSuid;
      SVCreateTbBatchReq reqNew = {0};
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
        pCreateReq = req.pReqs + iReq;
L
Liu Jicong 已提交
70
        if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
71 72 73 74 75
          reqNew.nReqs++;
          taosArrayPush(reqNew.pArray, pCreateReq);
        }
      }

L
Liu Jicong 已提交
76
      int     tlen;
77 78 79 80 81
      int32_t ret = 0;
      tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret);
      void* buf = taosMemoryMalloc(tlen);
      if (NULL == buf) {
        taosArrayDestroy(reqNew.pArray);
82 83 84 85 86 87 88
        for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
          pCreateReq = req.pReqs + iReq;
          taosMemoryFreeClear(pCreateReq->comment);
          if (pCreateReq->type == TSDB_CHILD_TABLE) {
            taosArrayDestroy(pCreateReq->ctb.tagName);
          }
        }
89 90 91 92 93 94 95 96 97 98 99
        goto end;
      }
      SEncoder coderNew = {0};
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
      tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
      tEncoderClear(&coderNew);
      memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
      pHead->bodyLen = tlen + sizeof(SMsgHead);
      taosMemoryFree(buf);
      taosArrayDestroy(reqNew.pArray);
    }
100 101 102 103 104 105 106 107

    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
      pCreateReq = req.pReqs + iReq;
      taosMemoryFreeClear(pCreateReq->comment);
      if (pCreateReq->type == TSDB_CHILD_TABLE) {
        taosArrayDestroy(pCreateReq->ctb.tagName);
      }
    }
108
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
L
Liu Jicong 已提交
109
    SVAlterTbReq req = {0};
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130

    if (tDecodeSVAlterTbReq(&coder, &req) < 0) {
      goto end;
    }

    SMetaReader mr = {0};
    metaReaderInit(&mr, pHandle->execHandle.pExecReader->pVnodeMeta, 0);

    if (metaGetTableEntryByName(&mr, req.tbName) < 0) {
      metaReaderClear(&mr);
      goto end;
    }
    realTbSuid = mr.me.ctbEntry.suid;
    metaReaderClear(&mr);
  } else if (msgType == TDMT_VND_DROP_TABLE) {
    SVDropTbBatchReq req = {0};

    if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
      goto end;
    }

L
Liu Jicong 已提交
131
    int32_t      needRebuild = 0;
132 133 134 135
    SVDropTbReq* pDropReq = NULL;
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
      pDropReq = req.pReqs + iReq;

L
Liu Jicong 已提交
136
      if (pDropReq->suid == tbSuid) {
137 138 139
        needRebuild++;
      }
    }
L
Liu Jicong 已提交
140
    if (needRebuild == 0) {
141
      // do nothing
L
Liu Jicong 已提交
142
    } else if (needRebuild == req.nReqs) {
143
      realTbSuid = tbSuid;
L
Liu Jicong 已提交
144
    } else {
145 146 147 148 149
      realTbSuid = tbSuid;
      SVDropTbBatchReq reqNew = {0};
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
        pDropReq = req.pReqs + iReq;
L
Liu Jicong 已提交
150
        if (pDropReq->suid == tbSuid) {
151 152 153 154 155
          reqNew.nReqs++;
          taosArrayPush(reqNew.pArray, pDropReq);
        }
      }

L
Liu Jicong 已提交
156
      int     tlen;
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
      int32_t ret = 0;
      tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret);
      void* buf = taosMemoryMalloc(tlen);
      if (NULL == buf) {
        taosArrayDestroy(reqNew.pArray);
        goto end;
      }
      SEncoder coderNew = {0};
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
      tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
      tEncoderClear(&coderNew);
      memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
      pHead->bodyLen = tlen + sizeof(SMsgHead);
      taosMemoryFree(buf);
      taosArrayDestroy(reqNew.pArray);
    }
  } else if (msgType == TDMT_VND_DELETE) {
    SDeleteRes req = {0};
    if (tDecodeDeleteRes(&coder, &req) < 0) {
      goto end;
    }
    realTbSuid = req.suid;
L
Liu Jicong 已提交
179
  } else {
180 181 182
    ASSERT(0);
  }

L
Liu Jicong 已提交
183
end:
184 185 186 187
  tDecoderClear(&coder);
  return tbSuid == realTbSuid;
}

wmmhello's avatar
wmmhello 已提交
188
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
L
Liu Jicong 已提交
189
  int32_t code = 0;
wmmhello's avatar
wmmhello 已提交
190
  taosThreadMutexLock(&pHandle->pWalReader->mutex);
L
Liu Jicong 已提交
191 192 193
  int64_t offset = *fetchOffset;

  while (1) {
wmmhello's avatar
wmmhello 已提交
194 195 196
    if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
      tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return",
              pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset);
L
Liu Jicong 已提交
197 198 199 200 201
      *fetchOffset = offset - 1;
      code = -1;
      goto END;
    }

L
Liu Jicong 已提交
202
    if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) {
wmmhello's avatar
wmmhello 已提交
203
      code = walFetchBody(pHandle->pWalReader, ppCkHead);
L
Liu Jicong 已提交
204 205 206 207 208 209 210 211 212 213 214

      if (code < 0) {
        ASSERT(0);
        *fetchOffset = offset;
        code = -1;
        goto END;
      }
      *fetchOffset = offset;
      code = 0;
      goto END;
    } else {
wmmhello's avatar
wmmhello 已提交
215
      if (pHandle->fetchMeta) {
L
Liu Jicong 已提交
216
        SWalCont* pHead = &((*ppCkHead)->head);
L
Liu Jicong 已提交
217
        if (IS_META_MSG(pHead->msgType)) {
wmmhello's avatar
wmmhello 已提交
218
          code = walFetchBody(pHandle->pWalReader, ppCkHead);
L
Liu Jicong 已提交
219 220 221 222 223 224 225

          if (code < 0) {
            ASSERT(0);
            *fetchOffset = offset;
            code = -1;
            goto END;
          }
L
Liu Jicong 已提交
226
          if (isValValidForTable(pHandle, pHead)) {
227 228 229 230
            *fetchOffset = offset;
            code = 0;
            goto END;
          }
L
Liu Jicong 已提交
231 232
        }
      }
wmmhello's avatar
wmmhello 已提交
233
      code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead);
L
Liu Jicong 已提交
234 235 236 237 238 239 240 241 242
      if (code < 0) {
        ASSERT(0);
        *fetchOffset = offset;
        code = -1;
        goto END;
      }
      offset++;
    }
  }
L
Liu Jicong 已提交
243
END:
wmmhello's avatar
wmmhello 已提交
244
  taosThreadMutexUnlock(&pHandle->pWalReader->mutex);
L
Liu Jicong 已提交
245 246 247
  return code;
}

L
Liu Jicong 已提交
248 249 250
STqReader* tqOpenReader(SVnode* pVnode) {
  STqReader* pReader = taosMemoryMalloc(sizeof(STqReader));
  if (pReader == NULL) {
L
Liu Jicong 已提交
251 252
    return NULL;
  }
L
Liu Jicong 已提交
253

L
Liu Jicong 已提交
254 255
  pReader->pWalReader = walOpenReader(pVnode->pWal, NULL);
  if (pReader->pWalReader == NULL) {
L
Liu Jicong 已提交
256
    taosMemoryFree(pReader);
L
Liu Jicong 已提交
257 258
    return NULL;
  }
L
Liu Jicong 已提交
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273

  pReader->pVnodeMeta = pVnode->pMeta;
  pReader->pMsg = NULL;
  pReader->ver = -1;
  pReader->pColIdList = NULL;
  pReader->cachedSchemaVer = 0;
  pReader->cachedSchemaSuid = 0;
  pReader->pSchema = NULL;
  pReader->pSchemaWrapper = NULL;
  pReader->tbIdHash = NULL;
  return pReader;
}

void tqCloseReader(STqReader* pReader) {
  // close wal reader
274 275 276
  if (pReader->pWalReader) {
    walCloseReader(pReader->pWalReader);
  }
L
Liu Jicong 已提交
277
  // free cached schema
278 279 280 281 282 283 284 285 286
  if (pReader->pSchema) {
    taosMemoryFree(pReader->pSchema);
  }
  if (pReader->pSchemaWrapper) {
    tDeleteSSchemaWrapper(pReader->pSchemaWrapper);
  }
  if (pReader->pColIdList) {
    taosArrayDestroy(pReader->pColIdList);
  }
L
Liu Jicong 已提交
287
  // free hash
288
  taosHashCleanup(pReader->tbIdHash);
L
Liu Jicong 已提交
289 290 291
  taosMemoryFree(pReader);
}

L
Liu Jicong 已提交
292
int32_t tqSeekVer(STqReader* pReader, int64_t ver) {
L
Liu Jicong 已提交
293 294 295 296 297 298 299
  if (walReadSeekVer(pReader->pWalReader, ver) < 0) {
    ASSERT(pReader->pWalReader->curInvalid);
    ASSERT(pReader->pWalReader->curVersion == ver);
    return -1;
  }
  ASSERT(pReader->pWalReader->curVersion == ver);
  return 0;
L
Liu Jicong 已提交
300 301
}

L
Liu Jicong 已提交
302 303 304 305 306 307
int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
  bool fromProcessedMsg = pReader->pMsg != NULL;

  while (1) {
    if (!fromProcessedMsg) {
      if (walNextValidMsg(pReader->pWalReader) < 0) {
308 309
        pReader->ver =
            pReader->pWalReader->curVersion - (pReader->pWalReader->curInvalid | pReader->pWalReader->curStopped);
L
Liu Jicong 已提交
310 311
        ret->offset.type = TMQ_OFFSET__LOG;
        ret->offset.version = pReader->ver;
L
Liu Jicong 已提交
312
        ret->fetchType = FETCH_TYPE__NONE;
S
Shengliang Guan 已提交
313
        tqDebug("return offset %" PRId64 ", no more valid", ret->offset.version);
L
Liu Jicong 已提交
314
        ASSERT(ret->offset.version >= 0);
L
Liu Jicong 已提交
315 316 317
        return -1;
      }
      void* body = pReader->pWalReader->pHead->head.body;
L
Liu Jicong 已提交
318
#if 0
L
Liu Jicong 已提交
319 320 321 322 323 324
      if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
        // TODO do filter
        ret->fetchType = FETCH_TYPE__META;
        ret->meta = pReader->pWalReader->pHead->head.body;
        return 0;
      } else {
L
Liu Jicong 已提交
325 326 327
#endif
      tqReaderSetDataMsg(pReader, body, pReader->pWalReader->pHead->head.version);
#if 0
L
Liu Jicong 已提交
328
      }
L
Liu Jicong 已提交
329
#endif
L
Liu Jicong 已提交
330 331 332
    }

    while (tqNextDataBlock(pReader)) {
L
Liu Jicong 已提交
333
      // TODO mem free
L
Liu Jicong 已提交
334 335 336
      memset(&ret->data, 0, sizeof(SSDataBlock));
      int32_t code = tqRetrieveDataBlock(&ret->data, pReader);
      if (code != 0 || ret->data.info.rows == 0) {
L
Liu Jicong 已提交
337
        ASSERT(0);
L
Liu Jicong 已提交
338
        continue;
L
Liu Jicong 已提交
339 340
      }
      ret->fetchType = FETCH_TYPE__DATA;
L
Liu Jicong 已提交
341
      tqDebug("return data rows %d", ret->data.info.rows);
L
Liu Jicong 已提交
342 343 344 345
      return 0;
    }

    if (fromProcessedMsg) {
L
Liu Jicong 已提交
346 347
      ret->offset.type = TMQ_OFFSET__LOG;
      ret->offset.version = pReader->ver;
L
Liu Jicong 已提交
348
      ASSERT(pReader->ver >= 0);
L
Liu Jicong 已提交
349
      ret->fetchType = FETCH_TYPE__SEP;
S
Shengliang Guan 已提交
350
      tqDebug("return offset %" PRId64 ", processed finish", ret->offset.version);
L
Liu Jicong 已提交
351 352 353
      return 0;
    }
  }
L
Liu Jicong 已提交
354 355
}

L
Liu Jicong 已提交
356
int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t ver) {
L
Liu Jicong 已提交
357
  pReader->pMsg = pMsg;
L
Liu Jicong 已提交
358

L
Liu Jicong 已提交
359
  if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
L
Liu Jicong 已提交
360
  while (true) {
L
Liu Jicong 已提交
361 362
    if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1;
    if (pReader->pBlock == NULL) break;
L
Liu Jicong 已提交
363 364
  }

L
Liu Jicong 已提交
365 366 367
  if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
  pReader->ver = ver;
  memset(&pReader->blkIter, 0, sizeof(SSubmitBlkIter));
L
Liu Jicong 已提交
368
  return 0;
L
Liu Jicong 已提交
369 370
}

L
Liu Jicong 已提交
371 372
bool tqNextDataBlock(STqReader* pReader) {
  if (pReader->pMsg == NULL) return false;
L
Liu Jicong 已提交
373
  while (1) {
L
Liu Jicong 已提交
374
    if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) {
L
Liu Jicong 已提交
375 376
      return false;
    }
L
Liu Jicong 已提交
377 378
    if (pReader->pBlock == NULL) {
      pReader->pMsg = NULL;
5
54liuyao 已提交
379 380
      return false;
    }
L
Liu Jicong 已提交
381

L
Liu Jicong 已提交
382
    if (pReader->tbIdHash == NULL) {
L
Liu Jicong 已提交
383 384
      return true;
    }
L
Liu Jicong 已提交
385
    void* ret = taosHashGet(pReader->tbIdHash, &pReader->msgIter.uid, sizeof(int64_t));
S
Shengliang Guan 已提交
386
    /*tqDebug("search uid %" PRId64, pHandle->msgIter.uid);*/
L
Liu Jicong 已提交
387
    if (ret != NULL) {
S
Shengliang Guan 已提交
388
      /*tqDebug("find   uid %" PRId64, pHandle->msgIter.uid);*/
L
Liu Jicong 已提交
389
      return true;
390 391 392 393 394
    }
  }
  return false;
}

L
Liu Jicong 已提交
395
bool tqNextDataBlockFilterOut(STqReader* pHandle, SHashObj* filterOutUids) {
396 397 398 399 400 401 402 403 404 405
  while (1) {
    if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
      return false;
    }
    if (pHandle->pBlock == NULL) return false;

    ASSERT(pHandle->tbIdHash == NULL);
    void* ret = taosHashGet(filterOutUids, &pHandle->msgIter.uid, sizeof(int64_t));
    if (ret == NULL) {
      return true;
L
Liu Jicong 已提交
406 407 408 409 410
    }
  }
  return false;
}

L
Liu Jicong 已提交
411
int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) {
412
  // TODO: cache multiple schema
L
Liu Jicong 已提交
413 414 415 416
  int32_t sversion = htonl(pReader->pBlock->sversion);
  if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion ||
      pReader->cachedSchemaSuid != pReader->msgIter.suid) {
    if (pReader->pSchema) taosMemoryFree(pReader->pSchema);
417
    pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, 1);
L
Liu Jicong 已提交
418
    if (pReader->pSchema == NULL) {
S
Shengliang Guan 已提交
419
      tqWarn("cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 "), version %d, possibly dropped table",
L
Liu Jicong 已提交
420
             pReader->msgIter.uid, pReader->msgIter.suid, pReader->cachedSchemaVer);
L
Liu Jicong 已提交
421
      /*ASSERT(0);*/
422
      terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
L
Liu Jicong 已提交
423 424
      return -1;
    }
L
Liu Jicong 已提交
425

L
Liu Jicong 已提交
426
    if (pReader->pSchemaWrapper) tDeleteSSchemaWrapper(pReader->pSchemaWrapper);
427
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, 1);
L
Liu Jicong 已提交
428
    if (pReader->pSchemaWrapper == NULL) {
S
Shengliang Guan 已提交
429
      tqWarn("cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
L
Liu Jicong 已提交
430
             pReader->msgIter.uid, pReader->cachedSchemaVer);
L
Liu Jicong 已提交
431 432 433 434
      /*ASSERT(0);*/
      terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
      return -1;
    }
L
Liu Jicong 已提交
435 436
    pReader->cachedSchemaVer = sversion;
    pReader->cachedSchemaSuid = pReader->msgIter.suid;
L
Liu Jicong 已提交
437 438
  }

L
Liu Jicong 已提交
439 440
  STSchema*       pTschema = pReader->pSchema;
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
L
Liu Jicong 已提交
441

L
Liu Jicong 已提交
442
  int32_t colNumNeed = taosArrayGetSize(pReader->pColIdList);
L
Liu Jicong 已提交
443

L
Liu Jicong 已提交
444 445 446 447
  if (colNumNeed == 0) {
    int32_t colMeta = 0;
    while (colMeta < pSchemaWrapper->nCols) {
      SSchema*        pColSchema = &pSchemaWrapper->pSchema[colMeta];
448
      SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
L
Liu Jicong 已提交
449
      int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
450
      if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
451
        goto FAIL;
L
Liu Jicong 已提交
452
      }
L
Liu Jicong 已提交
453
      colMeta++;
L
Liu Jicong 已提交
454 455 456 457 458 459 460 461 462 463 464
    }
  } else {
    if (colNumNeed > pSchemaWrapper->nCols) {
      colNumNeed = pSchemaWrapper->nCols;
    }

    int32_t colMeta = 0;
    int32_t colNeed = 0;
    while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) {
      SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta];
      col_id_t colIdSchema = pColSchema->colId;
L
Liu Jicong 已提交
465
      col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pReader->pColIdList, colNeed);
L
Liu Jicong 已提交
466 467 468 469 470
      if (colIdSchema < colIdNeed) {
        colMeta++;
      } else if (colIdSchema > colIdNeed) {
        colNeed++;
      } else {
471
        SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
L
Liu Jicong 已提交
472
        int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
473
        if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
474 475 476 477 478
          goto FAIL;
        }
        colMeta++;
        colNeed++;
      }
L
Liu Jicong 已提交
479 480
    }
  }
L
Liu Jicong 已提交
481

L
Liu Jicong 已提交
482
  if (blockDataEnsureCapacity(pBlock, pReader->msgIter.numOfRows) < 0) {
483
    terrno = TSDB_CODE_OUT_OF_MEMORY;
484 485 486 487
    goto FAIL;
  }

  int32_t colActual = blockDataGetNumOfCols(pBlock);
L
Liu Jicong 已提交
488

L
Liu Jicong 已提交
489 490 491 492
  STSRowIter iter = {0};
  tdSTSRowIterInit(&iter, pTschema);
  STSRow* row;
  int32_t curRow = 0;
493

L
Liu Jicong 已提交
494
  tInitSubmitBlkIter(&pReader->msgIter, pReader->pBlock, &pReader->blkIter);
495

L
Liu Jicong 已提交
496 497
  pBlock->info.uid = pReader->msgIter.uid;
  pBlock->info.rows = pReader->msgIter.numOfRows;
498
  pBlock->info.version = pReader->pMsg->version;
499

L
Liu Jicong 已提交
500
  while ((row = tGetSubmitBlkNext(&pReader->blkIter)) != NULL) {
L
Liu Jicong 已提交
501 502
    tdSTSRowIterReset(&iter, row);
    // get all wanted col of that block
L
Liu Jicong 已提交
503
    for (int32_t i = 0; i < colActual; i++) {
504
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
L
Liu Jicong 已提交
505
      SCellVal         sVal = {0};
C
Cary Xu 已提交
506
      if (!tdSTSRowIterFetch(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
L
Liu Jicong 已提交
507 508
        break;
      }
509
      if (colDataAppend(pColData, curRow, sVal.val, sVal.valType != TD_VTYPE_NORM) < 0) {
L
Liu Jicong 已提交
510
        goto FAIL;
L
Liu Jicong 已提交
511
      }
L
Liu Jicong 已提交
512 513 514
    }
    curRow++;
  }
L
Liu Jicong 已提交
515
  return 0;
516

L
Liu Jicong 已提交
517
FAIL:
518
  blockDataFreeRes(pBlock);
L
Liu Jicong 已提交
519
  return -1;
L
Liu Jicong 已提交
520
}
H
Hongze Cheng 已提交
521

L
Liu Jicong 已提交
522
void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) { pReader->pColIdList = pColIdList; }
H
Hongze Cheng 已提交
523

L
Liu Jicong 已提交
524 525 526 527
int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) {
  if (pReader->tbIdHash) {
    taosHashClear(pReader->tbIdHash);
  } else {
5
54liuyao 已提交
528
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
H
Hongze Cheng 已提交
529 530
  }

L
Liu Jicong 已提交
531
  if (pReader->tbIdHash == NULL) {
H
Hongze Cheng 已提交
532 533 534 535 536 537
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
L
Liu Jicong 已提交
538
    taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
H
Hongze Cheng 已提交
539 540 541 542 543
  }

  return 0;
}

L
Liu Jicong 已提交
544 545
int tqReaderAddTbUidList(STqReader* pReader, const SArray* tbUidList) {
  if (pReader->tbIdHash == NULL) {
5
54liuyao 已提交
546
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
547
    if (pReader->tbIdHash == NULL) {
H
Hongze Cheng 已提交
548 549 550 551 552 553 554
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
  }

  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
L
Liu Jicong 已提交
555
    taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
H
Hongze Cheng 已提交
556 557 558 559
  }

  return 0;
}
560

L
Liu Jicong 已提交
561 562
int tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
  ASSERT(pReader->tbIdHash != NULL);
563

L
Liu Jicong 已提交
564 565
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
L
Liu Jicong 已提交
566
    taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t));
567 568 569 570
  }

  return 0;
}
L
Liu Jicong 已提交
571 572 573 574

int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
  void* pIter = NULL;
  while (1) {
575
    pIter = taosHashIterate(pTq->pHandle, pIter);
L
Liu Jicong 已提交
576 577 578
    if (pIter == NULL) break;
    STqHandle* pExec = (STqHandle*)pIter;
    if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
579
      int32_t code = qUpdateQualifiedTableId(pExec->execHandle.task, tbUidList, isAdd);
L
Liu Jicong 已提交
580
      ASSERT(code == 0);
L
Liu Jicong 已提交
581 582 583 584 585
    } else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__DB) {
      if (!isAdd) {
        int32_t sz = taosArrayGetSize(tbUidList);
        for (int32_t i = 0; i < sz; i++) {
          int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
L
Liu Jicong 已提交
586
          taosHashPut(pExec->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
L
Liu Jicong 已提交
587 588 589 590 591 592 593
        }
      }
    } else {
      // tq update id
    }
  }
  while (1) {
L
Liu Jicong 已提交
594
    pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
L
Liu Jicong 已提交
595
    if (pIter == NULL) break;
596
    SStreamTask* pTask = *(SStreamTask**)pIter;
597
    if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
L
Liu Jicong 已提交
598 599 600 601 602 603
      int32_t code = qUpdateQualifiedTableId(pTask->exec.executor, tbUidList, isAdd);
      ASSERT(code == 0);
    }
  }
  return 0;
}