tqRead.c 36.2 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "tmsg.h"
H
Hongze Cheng 已提交
17
#include "tq.h"
L
Liu Jicong 已提交
18

L
Liu Jicong 已提交
19 20
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
21 22 23
    return true;
  }

L
Liu Jicong 已提交
24 25 26
  int16_t msgType = pHead->msgType;
  char*   body = pHead->body;
  int32_t bodyLen = pHead->bodyLen;
27

L
Liu Jicong 已提交
28 29 30 31 32
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
  int64_t  realTbSuid = 0;
  SDecoder coder;
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
  int32_t  len = bodyLen - sizeof(SMsgHead);
33 34 35 36 37 38 39 40 41 42 43 44 45
  tDecoderInit(&coder, data, len);

  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
    SVCreateStbReq req = {0};
    if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
      goto end;
    }
    realTbSuid = req.suid;
  } else if (msgType == TDMT_VND_DROP_STB) {
    SVDropStbReq req = {0};
    if (tDecodeSVDropStbReq(&coder, &req) < 0) {
      goto end;
    }
L
Liu Jicong 已提交
46
    realTbSuid = req.suid;
47 48 49 50 51 52
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
    SVCreateTbBatchReq req = {0};
    if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
      goto end;
    }

L
Liu Jicong 已提交
53
    int32_t        needRebuild = 0;
54 55 56
    SVCreateTbReq* pCreateReq = NULL;
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
      pCreateReq = req.pReqs + iReq;
L
Liu Jicong 已提交
57
      if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
58 59 60
        needRebuild++;
      }
    }
L
Liu Jicong 已提交
61
    if (needRebuild == 0) {
62
      // do nothing
L
Liu Jicong 已提交
63
    } else if (needRebuild == req.nReqs) {
64
      realTbSuid = tbSuid;
L
Liu Jicong 已提交
65
    } else {
66 67 68 69 70
      realTbSuid = tbSuid;
      SVCreateTbBatchReq reqNew = {0};
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
        pCreateReq = req.pReqs + iReq;
L
Liu Jicong 已提交
71
        if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
72 73 74 75 76
          reqNew.nReqs++;
          taosArrayPush(reqNew.pArray, pCreateReq);
        }
      }

L
Liu Jicong 已提交
77
      int     tlen;
78 79 80 81 82
      int32_t ret = 0;
      tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret);
      void* buf = taosMemoryMalloc(tlen);
      if (NULL == buf) {
        taosArrayDestroy(reqNew.pArray);
83 84 85 86 87 88 89
        for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
          pCreateReq = req.pReqs + iReq;
          taosMemoryFreeClear(pCreateReq->comment);
          if (pCreateReq->type == TSDB_CHILD_TABLE) {
            taosArrayDestroy(pCreateReq->ctb.tagName);
          }
        }
90 91 92 93 94 95 96 97 98 99 100
        goto end;
      }
      SEncoder coderNew = {0};
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
      tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
      tEncoderClear(&coderNew);
      memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
      pHead->bodyLen = tlen + sizeof(SMsgHead);
      taosMemoryFree(buf);
      taosArrayDestroy(reqNew.pArray);
    }
101 102 103 104 105 106 107 108

    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
      pCreateReq = req.pReqs + iReq;
      taosMemoryFreeClear(pCreateReq->comment);
      if (pCreateReq->type == TSDB_CHILD_TABLE) {
        taosArrayDestroy(pCreateReq->ctb.tagName);
      }
    }
109
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
L
Liu Jicong 已提交
110
    SVAlterTbReq req = {0};
111 112 113 114 115 116

    if (tDecodeSVAlterTbReq(&coder, &req) < 0) {
      goto end;
    }

    SMetaReader mr = {0};
117
    metaReaderDoInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, 0);
118 119 120 121 122 123 124 125 126 127 128 129 130 131

    if (metaGetTableEntryByName(&mr, req.tbName) < 0) {
      metaReaderClear(&mr);
      goto end;
    }
    realTbSuid = mr.me.ctbEntry.suid;
    metaReaderClear(&mr);
  } else if (msgType == TDMT_VND_DROP_TABLE) {
    SVDropTbBatchReq req = {0};

    if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
      goto end;
    }

L
Liu Jicong 已提交
132
    int32_t      needRebuild = 0;
133 134 135 136
    SVDropTbReq* pDropReq = NULL;
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
      pDropReq = req.pReqs + iReq;

L
Liu Jicong 已提交
137
      if (pDropReq->suid == tbSuid) {
138 139 140
        needRebuild++;
      }
    }
L
Liu Jicong 已提交
141
    if (needRebuild == 0) {
142
      // do nothing
L
Liu Jicong 已提交
143
    } else if (needRebuild == req.nReqs) {
144
      realTbSuid = tbSuid;
L
Liu Jicong 已提交
145
    } else {
146 147 148 149 150
      realTbSuid = tbSuid;
      SVDropTbBatchReq reqNew = {0};
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
        pDropReq = req.pReqs + iReq;
L
Liu Jicong 已提交
151
        if (pDropReq->suid == tbSuid) {
152 153 154 155 156
          reqNew.nReqs++;
          taosArrayPush(reqNew.pArray, pDropReq);
        }
      }

L
Liu Jicong 已提交
157
      int     tlen;
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
      int32_t ret = 0;
      tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret);
      void* buf = taosMemoryMalloc(tlen);
      if (NULL == buf) {
        taosArrayDestroy(reqNew.pArray);
        goto end;
      }
      SEncoder coderNew = {0};
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
      tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
      tEncoderClear(&coderNew);
      memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
      pHead->bodyLen = tlen + sizeof(SMsgHead);
      taosMemoryFree(buf);
      taosArrayDestroy(reqNew.pArray);
    }
  } else if (msgType == TDMT_VND_DELETE) {
    SDeleteRes req = {0};
    if (tDecodeDeleteRes(&coder, &req) < 0) {
      goto end;
    }
    realTbSuid = req.suid;
  }

L
Liu Jicong 已提交
182
end:
183 184 185 186
  tDecoderClear(&coder);
  return tbSuid == realTbSuid;
}

187
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead, uint64_t reqId) {
L
Liu Jicong 已提交
188
  int32_t code = 0;
189
  int32_t vgId = TD_VID(pTq->pVnode);
H
Haojun Liao 已提交
190

wmmhello's avatar
wmmhello 已提交
191
  taosThreadMutexLock(&pHandle->pWalReader->mutex);
L
Liu Jicong 已提交
192 193 194
  int64_t offset = *fetchOffset;

  while (1) {
wmmhello's avatar
wmmhello 已提交
195
    if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
196 197
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
              ", no more log to return, reqId:0x%" PRIx64,
198
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId);
L
Liu Jicong 已提交
199 200 201 202 203
      *fetchOffset = offset - 1;
      code = -1;
      goto END;
    }

204 205
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64, vgId,
            pHandle->consumerId, offset, TMSG_INFO((*ppCkHead)->head.msgType), reqId);
206

L
Liu Jicong 已提交
207
    if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) {
wmmhello's avatar
wmmhello 已提交
208
      code = walFetchBody(pHandle->pWalReader, ppCkHead);
L
Liu Jicong 已提交
209 210 211 212 213 214 215 216 217 218

      if (code < 0) {
        *fetchOffset = offset;
        code = -1;
        goto END;
      }
      *fetchOffset = offset;
      code = 0;
      goto END;
    } else {
219
      if (pHandle->fetchMeta != WITH_DATA) {
L
Liu Jicong 已提交
220
        SWalCont* pHead = &((*ppCkHead)->head);
wmmhello's avatar
wmmhello 已提交
221
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
wmmhello's avatar
wmmhello 已提交
222
          code = walFetchBody(pHandle->pWalReader, ppCkHead);
L
Liu Jicong 已提交
223 224 225 226 227
          if (code < 0) {
            *fetchOffset = offset;
            code = -1;
            goto END;
          }
L
Liu Jicong 已提交
228

L
Liu Jicong 已提交
229
          if (isValValidForTable(pHandle, pHead)) {
230 231 232
            *fetchOffset = offset;
            code = 0;
            goto END;
L
Liu Jicong 已提交
233 234 235
          } else {
            offset++;
            continue;
236
          }
L
Liu Jicong 已提交
237 238
        }
      }
wmmhello's avatar
wmmhello 已提交
239
      code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead);
L
Liu Jicong 已提交
240 241 242 243 244 245 246 247
      if (code < 0) {
        *fetchOffset = offset;
        code = -1;
        goto END;
      }
      offset++;
    }
  }
H
Haojun Liao 已提交
248

L
Liu Jicong 已提交
249
END:
wmmhello's avatar
wmmhello 已提交
250
  taosThreadMutexUnlock(&pHandle->pWalReader->mutex);
L
Liu Jicong 已提交
251 252 253
  return code;
}

254
STqReader* tqReaderOpen(SVnode* pVnode) {
L
Liu Jicong 已提交
255
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
L
Liu Jicong 已提交
256
  if (pReader == NULL) {
L
Liu Jicong 已提交
257 258
    return NULL;
  }
L
Liu Jicong 已提交
259

L
Liu Jicong 已提交
260 261
  pReader->pWalReader = walOpenReader(pVnode->pWal, NULL);
  if (pReader->pWalReader == NULL) {
L
Liu Jicong 已提交
262
    taosMemoryFree(pReader);
L
Liu Jicong 已提交
263 264
    return NULL;
  }
L
Liu Jicong 已提交
265 266 267 268 269 270 271

  pReader->pVnodeMeta = pVnode->pMeta;
  pReader->pColIdList = NULL;
  pReader->cachedSchemaVer = 0;
  pReader->cachedSchemaSuid = 0;
  pReader->pSchemaWrapper = NULL;
  pReader->tbIdHash = NULL;
272
  pReader->pResBlock = createDataBlock();
L
Liu Jicong 已提交
273 274 275
  return pReader;
}

276
void tqReaderClose(STqReader* pReader) {
L
Liu Jicong 已提交
277
  // close wal reader
278 279 280
  if (pReader->pWalReader) {
    walCloseReader(pReader->pWalReader);
  }
281

282
  if (pReader->pSchemaWrapper) {
283
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
284
  }
285

286 287 288
  if (pReader->pColIdList) {
    taosArrayDestroy(pReader->pColIdList);
  }
289

L
Liu Jicong 已提交
290
  // free hash
291
  blockDataDestroy(pReader->pResBlock);
292
  taosHashCleanup(pReader->tbIdHash);
293
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
L
Liu Jicong 已提交
294 295 296
  taosMemoryFree(pReader);
}

297
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
298
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
L
Liu Jicong 已提交
299 300
    return -1;
  }
301
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
302
  return 0;
L
Liu Jicong 已提交
303 304
}

305
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) {
306 307 308
  int32_t code = walNextValidMsg(pReader);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
309 310 311
  }

  int64_t ver = pReader->pHead->head.version;
312
  if (ver > maxVer) {
313
    tqDebug("maxVer in WAL:%"PRId64" reached current:%"PRId64", do not scan wal anymore, %s", maxVer, ver, id);
314 315
    return TSDB_CODE_SUCCESS;
  }
316

317 318 319 320 321 322 323 324 325 326 327 328 329 330 331
  if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
    void*   pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg));
    int32_t len = pReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);

    void* data = taosMemoryMalloc(len);
    if (data == NULL) {
      // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
      return -1;
    }

    memcpy(data, pBody, len);
    SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};

dengyihao's avatar
dengyihao 已提交
332
    *pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT);
333 334 335 336 337 338 339 340 341 342
    if (*pItem == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      tqError("%s failed to create data submit for stream since out of memory", id);
      return terrno;
    }
  } else if (pReader->pHead->head.msgType == TDMT_VND_DELETE) {
    void*   pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SMsgHead));
    int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead);

    extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
343
    tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver);
344 345
  } else {
    ASSERT(0);
346 347 348 349 350
  }

  return 0;
}

351
// todo ignore the error in wal?
352
bool tqNextBlockInWal(STqReader* pReader, const char* id) {
353 354
  SWalReader* pWalReader = pReader->pWalReader;

L
Liu Jicong 已提交
355
  while (1) {
356 357 358
    SArray* pBlockList = pReader->submit.aSubmitTbData;
    if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) {
      // try next message in wal file
359
      // todo always retry to avoid read failure caused by wal file deletion
360
      if (walNextValidMsg(pWalReader) < 0) {
361
        return false;
L
Liu Jicong 已提交
362
      }
363

364 365 366 367 368 369
      void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
      int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
      int64_t ver = pWalReader->pHead->head.version;

      SDecoder decoder = {0};
      tDecoderInit(&decoder, pBody, bodyLen);
H
Haojun Liao 已提交
370 371 372 373 374

      {
        int32_t nSubmitTbData = taosArrayGetSize(pReader->submit.aSubmitTbData);
        for (int32_t i = 0; i < nSubmitTbData; i++) {
          SSubmitTbData* pData = taosArrayGet(pReader->submit.aSubmitTbData, i);
H
Haojun Liao 已提交
375 376 377 378
          if (pData->pCreateTbReq != NULL) {
            taosArrayDestroy(pData->pCreateTbReq->ctb.tagName);
            taosMemoryFreeClear(pData->pCreateTbReq);
          }
H
Haojun Liao 已提交
379 380 381 382
          pData->aRowP = taosArrayDestroy(pData->aRowP);
        }
        pReader->submit.aSubmitTbData = taosArrayDestroy(pReader->submit.aSubmitTbData);
      }
H
Haojun Liao 已提交
383

384 385
      if (tDecodeSubmitReq(&decoder, &pReader->submit) < 0) {
        tDecoderClear(&decoder);
386
        tqError("decode wal file error, msgLen:%d, ver:%" PRId64, bodyLen, ver);
387
        return false;
388
      }
wmmhello's avatar
wmmhello 已提交
389

390 391
      tDecoderClear(&decoder);
      pReader->nextBlk = 0;
L
Liu Jicong 已提交
392 393
    }

H
Haojun Liao 已提交
394
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
395
    while (pReader->nextBlk < numOfBlocks) {
396
      tqTrace("tq reader next data block %d/%d, len:%d %" PRId64 " %d", pReader->nextBlk,
397
          numOfBlocks, pReader->msg.msgLen, pReader->msg.ver, pReader->nextBlk);
398 399 400 401

      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);

      if (pReader->tbIdHash == NULL) {
402 403
        SSDataBlock* pRes = NULL;
        int32_t      code = tqRetrieveDataBlock(pReader, &pRes, NULL);
404
        if (code == TSDB_CODE_SUCCESS && pRes->info.rows > 0) {
405
          return true;
406
        }
L
Liu Jicong 已提交
407
      }
408

409 410
      void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
      if (ret != NULL) {
411
        tqTrace("tq reader return submit block, uid:%" PRId64 ", ver:%" PRId64, pSubmitTbData->uid, pReader->msg.ver);
412

413 414 415
        SSDataBlock* pRes = NULL;
        int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL);
        if (code == TSDB_CODE_SUCCESS && pRes->info.rows > 0) {
416
          return true;
417 418 419
        }
      } else {
        pReader->nextBlk += 1;
420
        tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
421
      }
L
Liu Jicong 已提交
422
    }
423

424
    qTrace("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id);
425
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
426

427
    pReader->msg.msgStr = NULL;
L
Liu Jicong 已提交
428
  }
L
Liu Jicong 已提交
429 430
}

431
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
432 433 434
  pReader->msg.msgStr = msgStr;
  pReader->msg.msgLen = msgLen;
  pReader->msg.ver = ver;
L
Liu Jicong 已提交
435

L
Liu Jicong 已提交
436
  tqDebug("tq reader set msg %p %d", msgStr, msgLen);
wmmhello's avatar
wmmhello 已提交
437
  SDecoder decoder;
438 439 440

  tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen);
  if (tDecodeSubmitReq(&decoder, &pReader->submit) < 0) {
L
Liu Jicong 已提交
441
    tDecoderClear(&decoder);
442
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
wmmhello's avatar
wmmhello 已提交
443
    return -1;
L
Liu Jicong 已提交
444
  }
445

wmmhello's avatar
wmmhello 已提交
446
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
447 448
  return 0;
}
L
Liu Jicong 已提交
449

450 451 452 453
SWalReader* tqGetWalReader(STqReader* pReader) {
  return pReader->pWalReader;
}

H
Haojun Liao 已提交
454 455 456
SSDataBlock* tqGetResultBlock (STqReader* pReader) {
  return pReader->pResBlock;
}
457

458
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
459
  if (pReader->msg.msgStr == NULL) {
460 461 462
    return false;
  }

463 464
  int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
  while (pReader->nextBlk < numOfBlocks) {
465
    tqDebug("try next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver,
dengyihao's avatar
dengyihao 已提交
466
            pReader->nextBlk, numOfBlocks, idstr);
467

L
Liu Jicong 已提交
468
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
469 470 471
    if (pReader->tbIdHash == NULL) {
      return true;
    }
L
Liu Jicong 已提交
472 473 474

    void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
    if (ret != NULL) {
475
      tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64", %s", pReader->msg.ver, pSubmitTbData->uid, idstr);
L
Liu Jicong 已提交
476
      return true;
477
    } else {
478 479
      tqDebug("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid,
          taosHashGetSize(pReader->tbIdHash), idstr);
L
Liu Jicong 已提交
480
    }
481

L
Liu Jicong 已提交
482
    pReader->nextBlk++;
L
Liu Jicong 已提交
483 484
  }

485
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
L
Liu Jicong 已提交
486
  pReader->nextBlk = 0;
487
  pReader->msg.msgStr = NULL;
L
Liu Jicong 已提交
488 489 490 491

  return false;
}

492
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
493
  if (pReader->msg.msgStr == NULL) return false;
L
Liu Jicong 已提交
494

L
Liu Jicong 已提交
495
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
L
Liu Jicong 已提交
496
  while (pReader->nextBlk < blockSz) {
L
Liu Jicong 已提交
497
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
498
    if (filterOutUids == NULL) return true;
L
Liu Jicong 已提交
499

500
    void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
L
Liu Jicong 已提交
501 502 503
    if (ret == NULL) {
      return true;
    }
504
    pReader->nextBlk++;
L
Liu Jicong 已提交
505 506
  }

507
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
L
Liu Jicong 已提交
508
  pReader->nextBlk = 0;
509
  pReader->msg.msgStr = NULL;
L
Liu Jicong 已提交
510 511 512

  return false;
}
513

514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask) {
  int32_t code;

  int32_t cnt = 0;
  for (int32_t i = 0; i < pSrc->nCols; i++) {
    cnt += mask[i];
  }

  pDst->nCols = cnt;
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
  if (pDst->pSchema == NULL) {
    return -1;
  }

  int32_t j = 0;
  for (int32_t i = 0; i < pSrc->nCols; i++) {
    if (mask[i]) {
      pDst->pSchema[j++] = pSrc->pSchema[i];
      SColumnInfoData colInfo =
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
      code = blockDataAppendColInfo(pBlock, &colInfo);
      if (code != 0) {
        return -1;
      }
    }
  }
  return 0;
}

543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591
static int32_t buildResSDataBlock(SSDataBlock* pBlock, SSchemaWrapper* pSchema, const SArray* pColIdList) {
  if (blockDataGetNumOfCols(pBlock) > 0) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t numOfCols = taosArrayGetSize(pColIdList);

  if (numOfCols == 0) {  // all columns are required
    for (int32_t i = 0; i < pSchema->nCols; ++i) {
      SSchema*        pColSchema = &pSchema->pSchema[i];
      SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);

      int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
      if (code != TSDB_CODE_SUCCESS) {
        blockDataFreeRes(pBlock);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
    }
  } else {
    if (numOfCols > pSchema->nCols) {
      numOfCols = pSchema->nCols;
    }

    int32_t i = 0;
    int32_t j = 0;
    while (i < pSchema->nCols && j < numOfCols) {
      SSchema* pColSchema = &pSchema->pSchema[i];
      col_id_t colIdSchema = pColSchema->colId;

      col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pColIdList, j);
      if (colIdSchema < colIdNeed) {
        i++;
      } else if (colIdSchema > colIdNeed) {
        j++;
      } else {
        SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
        int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
        if (code != TSDB_CODE_SUCCESS) {
          return -1;
        }
        i++;
        j++;
      }
    }
  }

  return TSDB_CODE_SUCCESS;
}

592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
  int32_t code = TSDB_CODE_SUCCESS;

  if (IS_STR_DATA_TYPE(pColVal->type)) {
    char val[65535 + 2] = {0};
    if (pColVal->value.pData != NULL) {
      memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
      varDataSetLen(val, pColVal->value.nData);
      code = colDataSetVal(pColumnInfoData, rowIndex, val, !COL_VAL_IS_VALUE(pColVal));
    } else {
      colDataSetNULL(pColumnInfoData, rowIndex);
    }
  } else {
    code = colDataSetVal(pColumnInfoData, rowIndex, (void*)&pColVal->value.val, !COL_VAL_IS_VALUE(pColVal));
  }

  return code;
}

611
int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
612
  tqTrace("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
613
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
614

615
  SSDataBlock* pBlock = pReader->pResBlock;
616 617
  *pRes = pBlock;

H
Haojun Liao 已提交
618 619
  blockDataCleanup(pBlock);

620
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
L
Liu Jicong 已提交
621 622 623 624 625 626
  int32_t sversion = pSubmitTbData->sver;
  int64_t suid = pSubmitTbData->suid;
  int64_t uid = pSubmitTbData->uid;
  pReader->lastBlkUid = uid;

  pBlock->info.id.uid = uid;
627
  pBlock->info.version = pReader->msg.ver;
L
Liu Jicong 已提交
628

629 630
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
      (pReader->cachedSchemaVer != sversion)) {
631
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
L
Liu Jicong 已提交
632 633 634

    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1);
    if (pReader->pSchemaWrapper == NULL) {
635 636
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
             "version %d, possibly dropped table",
637
             vgId, suid, uid, pReader->cachedSchemaVer);
L
Liu Jicong 已提交
638 639 640 641 642
      pReader->cachedSchemaSuid = 0;
      terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
      return -1;
    }

643
    pReader->cachedSchemaUid = uid;
644 645
    pReader->cachedSchemaSuid = suid;
    pReader->cachedSchemaVer = sversion;
L
Liu Jicong 已提交
646

647
    ASSERT(pReader->cachedSchemaVer == pReader->pSchemaWrapper->version);
648 649 650
    if (blockDataGetNumOfCols(pBlock) == 0) {
      int32_t code = buildResSDataBlock(pReader->pResBlock, pReader->pSchemaWrapper, pReader->pColIdList);
      if (code != TSDB_CODE_SUCCESS) {
651
        tqError("vgId:%d failed to build data block, code:%s", vgId, tstrerror(code));
652
        return code;
L
Liu Jicong 已提交
653 654
      }
    }
655
  }
L
Liu Jicong 已提交
656

657 658 659 660 661 662 663
  int32_t numOfRows = 0;
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
    numOfRows = pCol->nVal;
  } else {
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
  }
L
Liu Jicong 已提交
664

665 666
  if (blockDataEnsureCapacity(pBlock, numOfRows) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
667
    return -1;
668
  }
L
Liu Jicong 已提交
669

670
  pBlock->info.rows = numOfRows;
L
Liu Jicong 已提交
671

672
  int32_t colActual = blockDataGetNumOfCols(pBlock);
L
Liu Jicong 已提交
673

674 675 676 677 678 679 680 681 682
  // convert and scan one block
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
    SArray* pCols = pSubmitTbData->aCol;
    int32_t numOfCols = taosArrayGetSize(pCols);
    int32_t targetIdx = 0;
    int32_t sourceIdx = 0;
    while (targetIdx < colActual) {
      if (sourceIdx >= numOfCols) {
        tqError("tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
683
        return -1;
684
      }
L
Liu Jicong 已提交
685

686 687 688 689 690 691
      SColData*        pCol = taosArrayGet(pCols, sourceIdx);
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
      SColVal          colVal;

      if (pCol->nVal != numOfRows) {
        tqError("tqRetrieveDataBlock pCol->nVal:%d != numOfRows:%d", pCol->nVal, numOfRows);
692
        return -1;
693 694 695 696 697 698 699
      }

      if (pCol->cid < pColData->info.colId) {
        sourceIdx++;
      } else if (pCol->cid == pColData->info.colId) {
        for (int32_t i = 0; i < pCol->nVal; i++) {
          tColDataGetValue(pCol, i, &colVal);
700 701 702
          int32_t code = doSetVal(pColData, i, &colVal);
          if (code != TSDB_CODE_SUCCESS) {
            return code;
703 704 705 706 707
          }
        }
        sourceIdx++;
        targetIdx++;
      } else {
708
        colDataSetNNULL(pColData, 0, pCol->nVal);
709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726
        targetIdx++;
      }
    }
  } else {
    SArray*         pRows = pSubmitTbData->aRowP;
    SSchemaWrapper* pWrapper = pReader->pSchemaWrapper;
    STSchema*       pTSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);

    for (int32_t i = 0; i < numOfRows; i++) {
      SRow*   pRow = taosArrayGetP(pRows, i);
      int32_t sourceIdx = 0;

      for (int32_t j = 0; j < colActual; j++) {
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
        while (1) {
          SColVal colVal;
          tRowGet(pRow, pTSchema, sourceIdx, &colVal);
          if (colVal.cid < pColData->info.colId) {
dengyihao's avatar
dengyihao 已提交
727 728 729
            //            tqDebug("colIndex:%d column id:%d in row, ignore, the required colId:%d, total cols in
            //            schema:%d",
            //                    sourceIdx, colVal.cid, pColData->info.colId, pTSchema->numOfCols);
730 731 732
            sourceIdx++;
            continue;
          } else if (colVal.cid == pColData->info.colId) {
733 734 735
            int32_t code = doSetVal(pColData, i, &colVal);
            if (code != TSDB_CODE_SUCCESS) {
              return code;
L
Liu Jicong 已提交
736
            }
L
Liu Jicong 已提交
737

738 739 740 741 742
            sourceIdx++;
            break;
          } else {
            colDataSetNULL(pColData, i);
            break;
L
Liu Jicong 已提交
743 744 745 746
          }
        }
      }
    }
747 748

    taosMemoryFreeClear(pTSchema);
L
Liu Jicong 已提交
749 750 751 752 753
  }

  return 0;
}

754
// todo refactor:
755
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) {
756
  tqDebug("tq reader retrieve data block %p, %d", pReader->msg.msgStr, pReader->nextBlk);
757 758 759 760

  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
  pReader->nextBlk++;

761 762 763 764
  if (pSubmitTbDataRet) {
    *pSubmitTbDataRet = pSubmitTbData;
  }

765 766 767 768 769
  int32_t sversion = pSubmitTbData->sver;
  int64_t suid = pSubmitTbData->suid;
  int64_t uid = pSubmitTbData->uid;
  pReader->lastBlkUid = uid;

770
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
771 772 773 774 775 776 777 778 779 780
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1);
  if (pReader->pSchemaWrapper == NULL) {
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
           pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
    pReader->cachedSchemaSuid = 0;
    terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
    return -1;
  }

  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
781
  int32_t         numOfRows = 0;
782 783 784 785 786 787 788 789 790 791 792 793

  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
    SArray*   pCols = pSubmitTbData->aCol;
    SColData* pCol = taosArrayGet(pCols, 0);
    numOfRows = pCol->nVal;
  } else {
    SArray* pRows = pSubmitTbData->aRowP;
    numOfRows = taosArrayGetSize(pRows);
  }

  int32_t curRow = 0;
  int32_t lastRow = 0;
794
  char*   assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
795 796 797 798 799 800 801 802 803
  if (assigned == NULL) return -1;

  // convert and scan one block
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
    SArray* pCols = pSubmitTbData->aCol;
    int32_t numOfCols = taosArrayGetSize(pCols);
    for (int32_t i = 0; i < numOfRows; i++) {
      bool buildNew = false;

804
      for (int32_t j = 0; j < numOfCols; j++) {
805
        SColData* pCol = taosArrayGet(pCols, j);
806
        SColVal   colVal;
807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826
        tColDataGetValue(pCol, i, &colVal);
        if (curRow == 0) {
          assigned[j] = !COL_VAL_IS_NONE(&colVal);
          buildNew = true;
        } else {
          bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal);
          if (currentRowAssigned != assigned[j]) {
            assigned[j] = currentRowAssigned;
            buildNew = true;
          }
        }
      }

      if (buildNew) {
        if (taosArrayGetSize(blocks) > 0) {
          SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
          pLastBlock->info.rows = curRow - lastRow;
          lastRow = curRow;
        }

827
        SSDataBlock     block = {0};
828
        SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
829
        if (pSW == NULL) {
830 831 832 833 834 835
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          goto FAIL;
        }

        if (tqMaskBlock(pSW, &block, pSchemaWrapper, assigned) < 0) {
          blockDataFreeRes(&block);
836
          tDeleteSchemaWrapper(pSW);
837 838 839 840 841 842
          goto FAIL;
        }
        tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
                (int32_t)taosArrayGetSize(block.pDataBlock));

        block.info.id.uid = uid;
843
        block.info.version = pReader->msg.ver;
844 845 846
        if (blockDataEnsureCapacity(&block, numOfRows - curRow) < 0) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          blockDataFreeRes(&block);
847
          tDeleteSchemaWrapper(pSW);
848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876
          goto FAIL;
        }
        taosArrayPush(blocks, &block);
        taosArrayPush(schemas, &pSW);
      }

      SSDataBlock* pBlock = taosArrayGetLast(blocks);

      tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
              (int32_t)taosArrayGetSize(blocks));

      int32_t targetIdx = 0;
      int32_t sourceIdx = 0;
      int32_t colActual = blockDataGetNumOfCols(pBlock);
      while (targetIdx < colActual) {
        SColData*        pCol = taosArrayGet(pCols, sourceIdx);
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
        SColVal          colVal;

        if (pCol->cid < pColData->info.colId) {
          sourceIdx++;
        } else if (pCol->cid == pColData->info.colId) {
          tColDataGetValue(pCol, i, &colVal);

          if (IS_STR_DATA_TYPE(colVal.type)) {
            if (colVal.value.pData != NULL) {
              char val[65535 + 2];
              memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
              varDataSetLen(val, colVal.value.nData);
877
              if (colDataSetVal(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
878 879 880
                goto FAIL;
              }
            } else {
X
Xiaoyu Wang 已提交
881
              colDataSetNULL(pColData, curRow - lastRow);
882 883
            }
          } else {
884
            if (colDataSetVal(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
885 886 887 888 889 890 891 892 893 894 895
              goto FAIL;
            }
          }
          sourceIdx++;
          targetIdx++;
        }
      }

      curRow++;
    }
  } else {
896
    SSchemaWrapper* pWrapper = pReader->pSchemaWrapper;
897 898
    STSchema*       pTSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);
    SArray*         pRows = pSubmitTbData->aRowP;
899

900
    for (int32_t i = 0; i < numOfRows; i++) {
901 902
      SRow* pRow = taosArrayGetP(pRows, i);
      bool  buildNew = false;
903

904
      for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
905
        SColVal colVal;
906
        tRowGet(pRow, pTSchema, j, &colVal);
907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925
        if (curRow == 0) {
          assigned[j] = !COL_VAL_IS_NONE(&colVal);
          buildNew = true;
        } else {
          bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal);
          if (currentRowAssigned != assigned[j]) {
            assigned[j] = currentRowAssigned;
            buildNew = true;
          }
        }
      }

      if (buildNew) {
        if (taosArrayGetSize(blocks) > 0) {
          SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
          pLastBlock->info.rows = curRow - lastRow;
          lastRow = curRow;
        }

926
        SSDataBlock     block = {0};
927
        SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
928
        if (pSW == NULL) {
929 930 931 932 933 934
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          goto FAIL;
        }

        if (tqMaskBlock(pSW, &block, pSchemaWrapper, assigned) < 0) {
          blockDataFreeRes(&block);
935
          tDeleteSchemaWrapper(pSW);
936 937 938 939 940 941
          goto FAIL;
        }
        tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
                (int32_t)taosArrayGetSize(block.pDataBlock));

        block.info.id.uid = uid;
942
        block.info.version = pReader->msg.ver;
943 944 945
        if (blockDataEnsureCapacity(&block, numOfRows - curRow) < 0) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          blockDataFreeRes(&block);
946
          tDeleteSchemaWrapper(pSW);
947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962
          goto FAIL;
        }
        taosArrayPush(blocks, &block);
        taosArrayPush(schemas, &pSW);
      }

      SSDataBlock* pBlock = taosArrayGetLast(blocks);

      tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
              (int32_t)taosArrayGetSize(blocks));

      int32_t targetIdx = 0;
      int32_t sourceIdx = 0;
      int32_t colActual = blockDataGetNumOfCols(pBlock);
      while (targetIdx < colActual) {
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
963
        SColVal          colVal;
964
        tRowGet(pRow, pTSchema, sourceIdx, &colVal);
965 966 967 968 969 970 971 972 973

        if (colVal.cid < pColData->info.colId) {
          sourceIdx++;
        } else if (colVal.cid == pColData->info.colId) {
          if (IS_STR_DATA_TYPE(colVal.type)) {
            if (colVal.value.pData != NULL) {
              char val[65535 + 2];
              memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
              varDataSetLen(val, colVal.value.nData);
974
              if (colDataSetVal(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
975 976 977
                goto FAIL;
              }
            } else {
X
Xiaoyu Wang 已提交
978
              colDataSetNULL(pColData, curRow - lastRow);
979 980
            }
          } else {
981
            if (colDataSetVal(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
982 983 984 985 986 987 988 989 990
              goto FAIL;
            }
          }
          sourceIdx++;
          targetIdx++;
        }
      }
      curRow++;
    }
991 992

    taosMemoryFreeClear(pTSchema);
L
Liu Jicong 已提交
993
  }
994 995 996 997 998 999 1000

  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
  pLastBlock->info.rows = curRow - lastRow;

  taosMemoryFree(assigned);
  return 0;

1001 1002 1003 1004 1005
FAIL:
  taosMemoryFree(assigned);
  return -1;
}

L
Liu Jicong 已提交
1006
void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) { pReader->pColIdList = pColIdList; }
H
Hongze Cheng 已提交
1007

1008
int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
L
Liu Jicong 已提交
1009 1010 1011
  if (pReader->tbIdHash) {
    taosHashClear(pReader->tbIdHash);
  } else {
5
54liuyao 已提交
1012
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
H
Hongze Cheng 已提交
1013 1014
  }

L
Liu Jicong 已提交
1015
  if (pReader->tbIdHash == NULL) {
H
Hongze Cheng 已提交
1016 1017 1018 1019 1020 1021
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
L
Liu Jicong 已提交
1022
    taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
H
Hongze Cheng 已提交
1023 1024
  }

1025
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t) taosArrayGetSize(tbUidList));
H
Hongze Cheng 已提交
1026 1027 1028
  return 0;
}

1029
int tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
L
Liu Jicong 已提交
1030
  if (pReader->tbIdHash == NULL) {
5
54liuyao 已提交
1031
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
1032
    if (pReader->tbIdHash == NULL) {
H
Hongze Cheng 已提交
1033 1034 1035 1036 1037
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
  }

1038 1039 1040
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
  for (int i = 0; i < numOfTables; i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
L
Liu Jicong 已提交
1041
    taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
H
Hongze Cheng 已提交
1042 1043 1044 1045
  }

  return 0;
}
1046

1047 1048 1049 1050 1051 1052 1053 1054
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t));
}

bool tqCurrentBlockConsumed(const STqReader* pReader) {
    return pReader->msg.msgStr == NULL;
}

L
Liu Jicong 已提交
1055
int tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
L
Liu Jicong 已提交
1056 1057
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
L
Liu Jicong 已提交
1058
    taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t));
1059 1060 1061 1062
  }

  return 0;
}
L
Liu Jicong 已提交
1063

1064
// todo update the table list in wal reader
L
Liu Jicong 已提交
1065
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
1066 1067 1068 1069
  void*   pIter = NULL;
  int32_t vgId = TD_VID(pTq->pVnode);

  // update the table list for each consumer handle
L
Liu Jicong 已提交
1070
  while (1) {
1071
    pIter = taosHashIterate(pTq->pHandle, pIter);
H
Haojun Liao 已提交
1072 1073 1074 1075
    if (pIter == NULL) {
      break;
    }

1076 1077
    STqHandle* pTqHandle = (STqHandle*)pIter;
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
1078
      int32_t code = qUpdateTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList, isAdd);
L
Liu Jicong 已提交
1079
      if (code != 0) {
1080
        tqError("update qualified table error for %s", pTqHandle->subKey);
L
Liu Jicong 已提交
1081 1082
        continue;
      }
1083
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
L
Liu Jicong 已提交
1084 1085 1086 1087
      if (!isAdd) {
        int32_t sz = taosArrayGetSize(tbUidList);
        for (int32_t i = 0; i < sz; i++) {
          int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
1088
          taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
L
Liu Jicong 已提交
1089 1090
        }
      }
1091
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
1092
      if (isAdd) {
1093
        SArray* list = NULL;
wmmhello's avatar
wmmhello 已提交
1094
        int ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node, &list, pTqHandle->execHandle.task);
1095 1096 1097 1098
        if(ret != TDB_CODE_SUCCESS) {
          tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId);
          taosArrayDestroy(list);
          return ret;
1099
        }
H
Haojun Liao 已提交
1100
        tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
1101
        taosArrayDestroy(list);
1102
      } else {
1103
        tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
1104
      }
L
Liu Jicong 已提交
1105 1106
    }
  }
1107 1108

  // update the table list handle for each stream scanner/wal reader
1109
  taosWLockLatch(&pTq->pStreamMeta->lock);
L
Liu Jicong 已提交
1110
  while (1) {
L
Liu Jicong 已提交
1111
    pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
1112 1113 1114 1115
    if (pIter == NULL) {
      break;
    }

1116
    SStreamTask* pTask = *(SStreamTask**)pIter;
1117
    if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
1118
      int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd);
L
Liu Jicong 已提交
1119
      if (code != 0) {
1120
        tqError("vgId:%d, s-task:%s update qualified table error for stream task", vgId, pTask->id.idStr);
L
Liu Jicong 已提交
1121 1122
        continue;
      }
L
Liu Jicong 已提交
1123 1124
    }
  }
1125

1126
  taosWUnLockLatch(&pTq->pStreamMeta->lock);
L
Liu Jicong 已提交
1127 1128
  return 0;
}