tqRead.c 36.3 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "tmsg.h"
H
Hongze Cheng 已提交
17
#include "tq.h"
L
Liu Jicong 已提交
18

L
Liu Jicong 已提交
19 20
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
21 22 23
    return true;
  }

L
Liu Jicong 已提交
24 25 26
  int16_t msgType = pHead->msgType;
  char*   body = pHead->body;
  int32_t bodyLen = pHead->bodyLen;
27

L
Liu Jicong 已提交
28 29 30 31 32
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
  int64_t  realTbSuid = 0;
  SDecoder coder;
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
  int32_t  len = bodyLen - sizeof(SMsgHead);
33 34 35 36 37 38 39 40 41 42 43 44 45
  tDecoderInit(&coder, data, len);

  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
    SVCreateStbReq req = {0};
    if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
      goto end;
    }
    realTbSuid = req.suid;
  } else if (msgType == TDMT_VND_DROP_STB) {
    SVDropStbReq req = {0};
    if (tDecodeSVDropStbReq(&coder, &req) < 0) {
      goto end;
    }
L
Liu Jicong 已提交
46
    realTbSuid = req.suid;
47 48 49 50 51 52
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
    SVCreateTbBatchReq req = {0};
    if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
      goto end;
    }

L
Liu Jicong 已提交
53
    int32_t        needRebuild = 0;
54 55 56
    SVCreateTbReq* pCreateReq = NULL;
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
      pCreateReq = req.pReqs + iReq;
L
Liu Jicong 已提交
57
      if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
58 59 60
        needRebuild++;
      }
    }
L
Liu Jicong 已提交
61
    if (needRebuild == 0) {
62
      // do nothing
L
Liu Jicong 已提交
63
    } else if (needRebuild == req.nReqs) {
64
      realTbSuid = tbSuid;
L
Liu Jicong 已提交
65
    } else {
66 67 68 69 70
      realTbSuid = tbSuid;
      SVCreateTbBatchReq reqNew = {0};
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
        pCreateReq = req.pReqs + iReq;
L
Liu Jicong 已提交
71
        if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
72 73 74 75 76
          reqNew.nReqs++;
          taosArrayPush(reqNew.pArray, pCreateReq);
        }
      }

L
Liu Jicong 已提交
77
      int     tlen;
78 79 80 81 82
      int32_t ret = 0;
      tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret);
      void* buf = taosMemoryMalloc(tlen);
      if (NULL == buf) {
        taosArrayDestroy(reqNew.pArray);
83 84 85 86 87 88 89
        for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
          pCreateReq = req.pReqs + iReq;
          taosMemoryFreeClear(pCreateReq->comment);
          if (pCreateReq->type == TSDB_CHILD_TABLE) {
            taosArrayDestroy(pCreateReq->ctb.tagName);
          }
        }
90 91 92 93 94 95 96 97 98 99 100
        goto end;
      }
      SEncoder coderNew = {0};
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
      tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
      tEncoderClear(&coderNew);
      memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
      pHead->bodyLen = tlen + sizeof(SMsgHead);
      taosMemoryFree(buf);
      taosArrayDestroy(reqNew.pArray);
    }
101 102 103 104 105 106 107 108

    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
      pCreateReq = req.pReqs + iReq;
      taosMemoryFreeClear(pCreateReq->comment);
      if (pCreateReq->type == TSDB_CHILD_TABLE) {
        taosArrayDestroy(pCreateReq->ctb.tagName);
      }
    }
109
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
L
Liu Jicong 已提交
110
    SVAlterTbReq req = {0};
111 112 113 114 115 116

    if (tDecodeSVAlterTbReq(&coder, &req) < 0) {
      goto end;
    }

    SMetaReader mr = {0};
117
    metaReaderDoInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, 0);
118 119 120 121 122 123 124 125 126 127 128 129 130 131

    if (metaGetTableEntryByName(&mr, req.tbName) < 0) {
      metaReaderClear(&mr);
      goto end;
    }
    realTbSuid = mr.me.ctbEntry.suid;
    metaReaderClear(&mr);
  } else if (msgType == TDMT_VND_DROP_TABLE) {
    SVDropTbBatchReq req = {0};

    if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
      goto end;
    }

L
Liu Jicong 已提交
132
    int32_t      needRebuild = 0;
133 134 135 136
    SVDropTbReq* pDropReq = NULL;
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
      pDropReq = req.pReqs + iReq;

L
Liu Jicong 已提交
137
      if (pDropReq->suid == tbSuid) {
138 139 140
        needRebuild++;
      }
    }
L
Liu Jicong 已提交
141
    if (needRebuild == 0) {
142
      // do nothing
L
Liu Jicong 已提交
143
    } else if (needRebuild == req.nReqs) {
144
      realTbSuid = tbSuid;
L
Liu Jicong 已提交
145
    } else {
146 147 148 149 150
      realTbSuid = tbSuid;
      SVDropTbBatchReq reqNew = {0};
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
        pDropReq = req.pReqs + iReq;
L
Liu Jicong 已提交
151
        if (pDropReq->suid == tbSuid) {
152 153 154 155 156
          reqNew.nReqs++;
          taosArrayPush(reqNew.pArray, pDropReq);
        }
      }

L
Liu Jicong 已提交
157
      int     tlen;
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
      int32_t ret = 0;
      tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret);
      void* buf = taosMemoryMalloc(tlen);
      if (NULL == buf) {
        taosArrayDestroy(reqNew.pArray);
        goto end;
      }
      SEncoder coderNew = {0};
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
      tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
      tEncoderClear(&coderNew);
      memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
      pHead->bodyLen = tlen + sizeof(SMsgHead);
      taosMemoryFree(buf);
      taosArrayDestroy(reqNew.pArray);
    }
  } else if (msgType == TDMT_VND_DELETE) {
    SDeleteRes req = {0};
    if (tDecodeDeleteRes(&coder, &req) < 0) {
      goto end;
    }
    realTbSuid = req.suid;
  }

L
Liu Jicong 已提交
182
end:
183 184 185 186
  tDecoderClear(&coder);
  return tbSuid == realTbSuid;
}

187
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead, uint64_t reqId) {
L
Liu Jicong 已提交
188
  int32_t code = 0;
189
  int32_t vgId = TD_VID(pTq->pVnode);
H
Haojun Liao 已提交
190

wmmhello's avatar
wmmhello 已提交
191
  taosThreadMutexLock(&pHandle->pWalReader->mutex);
L
Liu Jicong 已提交
192 193 194
  int64_t offset = *fetchOffset;

  while (1) {
wmmhello's avatar
wmmhello 已提交
195
    if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
196 197
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
              ", no more log to return, reqId:0x%" PRIx64,
198
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId);
199
      *fetchOffset = offset;
L
Liu Jicong 已提交
200 201 202 203
      code = -1;
      goto END;
    }

204 205
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64, vgId,
            pHandle->consumerId, offset, TMSG_INFO((*ppCkHead)->head.msgType), reqId);
206

L
Liu Jicong 已提交
207
    if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) {
wmmhello's avatar
wmmhello 已提交
208
      code = walFetchBody(pHandle->pWalReader, ppCkHead);
L
Liu Jicong 已提交
209 210 211 212 213 214 215 216 217 218

      if (code < 0) {
        *fetchOffset = offset;
        code = -1;
        goto END;
      }
      *fetchOffset = offset;
      code = 0;
      goto END;
    } else {
219
      if (pHandle->fetchMeta != WITH_DATA) {
L
Liu Jicong 已提交
220
        SWalCont* pHead = &((*ppCkHead)->head);
wmmhello's avatar
wmmhello 已提交
221
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
wmmhello's avatar
wmmhello 已提交
222
          code = walFetchBody(pHandle->pWalReader, ppCkHead);
L
Liu Jicong 已提交
223 224 225 226 227
          if (code < 0) {
            *fetchOffset = offset;
            code = -1;
            goto END;
          }
L
Liu Jicong 已提交
228

L
Liu Jicong 已提交
229
          if (isValValidForTable(pHandle, pHead)) {
230 231 232
            *fetchOffset = offset;
            code = 0;
            goto END;
L
Liu Jicong 已提交
233 234 235
          } else {
            offset++;
            continue;
236
          }
L
Liu Jicong 已提交
237 238
        }
      }
wmmhello's avatar
wmmhello 已提交
239
      code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead);
L
Liu Jicong 已提交
240 241 242 243 244 245 246 247
      if (code < 0) {
        *fetchOffset = offset;
        code = -1;
        goto END;
      }
      offset++;
    }
  }
H
Haojun Liao 已提交
248

L
Liu Jicong 已提交
249
END:
wmmhello's avatar
wmmhello 已提交
250
  taosThreadMutexUnlock(&pHandle->pWalReader->mutex);
L
Liu Jicong 已提交
251 252 253
  return code;
}

254
STqReader* tqReaderOpen(SVnode* pVnode) {
L
Liu Jicong 已提交
255
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
L
Liu Jicong 已提交
256
  if (pReader == NULL) {
L
Liu Jicong 已提交
257 258
    return NULL;
  }
L
Liu Jicong 已提交
259

L
Liu Jicong 已提交
260 261
  pReader->pWalReader = walOpenReader(pVnode->pWal, NULL);
  if (pReader->pWalReader == NULL) {
L
Liu Jicong 已提交
262
    taosMemoryFree(pReader);
L
Liu Jicong 已提交
263 264
    return NULL;
  }
L
Liu Jicong 已提交
265 266 267 268 269 270 271

  pReader->pVnodeMeta = pVnode->pMeta;
  pReader->pColIdList = NULL;
  pReader->cachedSchemaVer = 0;
  pReader->cachedSchemaSuid = 0;
  pReader->pSchemaWrapper = NULL;
  pReader->tbIdHash = NULL;
272
  pReader->pResBlock = createDataBlock();
L
Liu Jicong 已提交
273 274 275
  return pReader;
}

276
void tqReaderClose(STqReader* pReader) {
L
Liu Jicong 已提交
277
  // close wal reader
278 279 280
  if (pReader->pWalReader) {
    walCloseReader(pReader->pWalReader);
  }
281

282
  if (pReader->pSchemaWrapper) {
283
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
284
  }
285

286 287 288
  if (pReader->pColIdList) {
    taosArrayDestroy(pReader->pColIdList);
  }
289

L
Liu Jicong 已提交
290
  // free hash
291
  blockDataDestroy(pReader->pResBlock);
292
  taosHashCleanup(pReader->tbIdHash);
293
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
L
Liu Jicong 已提交
294 295 296
  taosMemoryFree(pReader);
}

297
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
298
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
L
Liu Jicong 已提交
299 300
    return -1;
  }
301
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
302
  return 0;
L
Liu Jicong 已提交
303 304
}

305
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) {
306 307 308
  int32_t code = walNextValidMsg(pReader);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
309 310 311
  }

  int64_t ver = pReader->pHead->head.version;
312
  if (ver > maxVer) {
313
    tqDebug("maxVer in WAL:%"PRId64" reached current:%"PRId64", do not scan wal anymore, %s", maxVer, ver, id);
314 315
    return TSDB_CODE_SUCCESS;
  }
316

317 318 319 320 321 322 323 324 325 326 327 328 329 330 331
  if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
    void*   pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg));
    int32_t len = pReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);

    void* data = taosMemoryMalloc(len);
    if (data == NULL) {
      // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
      return -1;
    }

    memcpy(data, pBody, len);
    SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};

dengyihao's avatar
dengyihao 已提交
332
    *pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT);
333 334 335 336 337 338 339 340 341
    if (*pItem == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      tqError("%s failed to create data submit for stream since out of memory", id);
      return terrno;
    }
  } else if (pReader->pHead->head.msgType == TDMT_VND_DELETE) {
    void*   pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SMsgHead));
    int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead);

H
Haojun Liao 已提交
342 343 344 345 346 347
    code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
    if (code != TSDB_CODE_SUCCESS) {
      tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
    } else {
      tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver);
    }
348 349
  } else {
    ASSERT(0);
350 351 352 353 354
  }

  return 0;
}

355
// todo ignore the error in wal?
356
bool tqNextBlockInWal(STqReader* pReader, const char* id) {
357 358
  SWalReader* pWalReader = pReader->pWalReader;

L
Liu Jicong 已提交
359
  while (1) {
360 361 362
    SArray* pBlockList = pReader->submit.aSubmitTbData;
    if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) {
      // try next message in wal file
363
      // todo always retry to avoid read failure caused by wal file deletion
364
      if (walNextValidMsg(pWalReader) < 0) {
365
        return false;
L
Liu Jicong 已提交
366
      }
367

368 369 370 371 372 373
      void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
      int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
      int64_t ver = pWalReader->pHead->head.version;

      SDecoder decoder = {0};
      tDecoderInit(&decoder, pBody, bodyLen);
H
Haojun Liao 已提交
374 375 376 377 378

      {
        int32_t nSubmitTbData = taosArrayGetSize(pReader->submit.aSubmitTbData);
        for (int32_t i = 0; i < nSubmitTbData; i++) {
          SSubmitTbData* pData = taosArrayGet(pReader->submit.aSubmitTbData, i);
H
Haojun Liao 已提交
379 380 381 382
          if (pData->pCreateTbReq != NULL) {
            taosArrayDestroy(pData->pCreateTbReq->ctb.tagName);
            taosMemoryFreeClear(pData->pCreateTbReq);
          }
H
Haojun Liao 已提交
383 384 385 386
          pData->aRowP = taosArrayDestroy(pData->aRowP);
        }
        pReader->submit.aSubmitTbData = taosArrayDestroy(pReader->submit.aSubmitTbData);
      }
H
Haojun Liao 已提交
387

388 389
      if (tDecodeSubmitReq(&decoder, &pReader->submit) < 0) {
        tDecoderClear(&decoder);
390
        tqError("decode wal file error, msgLen:%d, ver:%" PRId64, bodyLen, ver);
391
        return false;
392
      }
wmmhello's avatar
wmmhello 已提交
393

394 395
      tDecoderClear(&decoder);
      pReader->nextBlk = 0;
L
Liu Jicong 已提交
396 397
    }

H
Haojun Liao 已提交
398
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
399
    while (pReader->nextBlk < numOfBlocks) {
400
      tqTrace("tq reader next data block %d/%d, len:%d %" PRId64 " %d", pReader->nextBlk,
401
          numOfBlocks, pReader->msg.msgLen, pReader->msg.ver, pReader->nextBlk);
402 403 404 405

      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);

      if (pReader->tbIdHash == NULL) {
406 407
        SSDataBlock* pRes = NULL;
        int32_t      code = tqRetrieveDataBlock(pReader, &pRes, NULL);
408
        if (code == TSDB_CODE_SUCCESS && pRes->info.rows > 0) {
409
          return true;
410
        }
L
Liu Jicong 已提交
411
      }
412

413 414
      void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
      if (ret != NULL) {
415
        tqTrace("tq reader return submit block, uid:%" PRId64 ", ver:%" PRId64, pSubmitTbData->uid, pReader->msg.ver);
416

417 418 419
        SSDataBlock* pRes = NULL;
        int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL);
        if (code == TSDB_CODE_SUCCESS && pRes->info.rows > 0) {
420
          return true;
421 422 423
        }
      } else {
        pReader->nextBlk += 1;
424
        tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
425
      }
L
Liu Jicong 已提交
426
    }
427

428
    qTrace("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id);
429
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
430

431
    pReader->msg.msgStr = NULL;
L
Liu Jicong 已提交
432
  }
L
Liu Jicong 已提交
433 434
}

435
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
436 437 438
  pReader->msg.msgStr = msgStr;
  pReader->msg.msgLen = msgLen;
  pReader->msg.ver = ver;
L
Liu Jicong 已提交
439

L
Liu Jicong 已提交
440
  tqDebug("tq reader set msg %p %d", msgStr, msgLen);
wmmhello's avatar
wmmhello 已提交
441
  SDecoder decoder;
442 443 444

  tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen);
  if (tDecodeSubmitReq(&decoder, &pReader->submit) < 0) {
L
Liu Jicong 已提交
445
    tDecoderClear(&decoder);
446
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
wmmhello's avatar
wmmhello 已提交
447
    return -1;
L
Liu Jicong 已提交
448
  }
449

wmmhello's avatar
wmmhello 已提交
450
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
451 452
  return 0;
}
L
Liu Jicong 已提交
453

454 455 456 457
SWalReader* tqGetWalReader(STqReader* pReader) {
  return pReader->pWalReader;
}

H
Haojun Liao 已提交
458 459 460
SSDataBlock* tqGetResultBlock (STqReader* pReader) {
  return pReader->pResBlock;
}
461

462
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
463
  if (pReader->msg.msgStr == NULL) {
464 465 466
    return false;
  }

467 468
  int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
  while (pReader->nextBlk < numOfBlocks) {
469
    tqDebug("try next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver,
dengyihao's avatar
dengyihao 已提交
470
            pReader->nextBlk, numOfBlocks, idstr);
471

L
Liu Jicong 已提交
472
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
473 474 475
    if (pReader->tbIdHash == NULL) {
      return true;
    }
L
Liu Jicong 已提交
476 477 478

    void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
    if (ret != NULL) {
479
      tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64", %s", pReader->msg.ver, pSubmitTbData->uid, idstr);
L
Liu Jicong 已提交
480
      return true;
481
    } else {
482 483
      tqDebug("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid,
          taosHashGetSize(pReader->tbIdHash), idstr);
L
Liu Jicong 已提交
484
    }
485

L
Liu Jicong 已提交
486
    pReader->nextBlk++;
L
Liu Jicong 已提交
487 488
  }

489
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
L
Liu Jicong 已提交
490
  pReader->nextBlk = 0;
491
  pReader->msg.msgStr = NULL;
L
Liu Jicong 已提交
492 493 494 495

  return false;
}

496
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
497
  if (pReader->msg.msgStr == NULL) return false;
L
Liu Jicong 已提交
498

L
Liu Jicong 已提交
499
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
L
Liu Jicong 已提交
500
  while (pReader->nextBlk < blockSz) {
L
Liu Jicong 已提交
501
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
502
    if (filterOutUids == NULL) return true;
L
Liu Jicong 已提交
503

504
    void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
L
Liu Jicong 已提交
505 506 507
    if (ret == NULL) {
      return true;
    }
508
    pReader->nextBlk++;
L
Liu Jicong 已提交
509 510
  }

511
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
L
Liu Jicong 已提交
512
  pReader->nextBlk = 0;
513
  pReader->msg.msgStr = NULL;
L
Liu Jicong 已提交
514 515 516

  return false;
}
517

518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask) {
  int32_t code;

  int32_t cnt = 0;
  for (int32_t i = 0; i < pSrc->nCols; i++) {
    cnt += mask[i];
  }

  pDst->nCols = cnt;
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
  if (pDst->pSchema == NULL) {
    return -1;
  }

  int32_t j = 0;
  for (int32_t i = 0; i < pSrc->nCols; i++) {
    if (mask[i]) {
      pDst->pSchema[j++] = pSrc->pSchema[i];
      SColumnInfoData colInfo =
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
      code = blockDataAppendColInfo(pBlock, &colInfo);
      if (code != 0) {
        return -1;
      }
    }
  }
  return 0;
}

547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595
static int32_t buildResSDataBlock(SSDataBlock* pBlock, SSchemaWrapper* pSchema, const SArray* pColIdList) {
  if (blockDataGetNumOfCols(pBlock) > 0) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t numOfCols = taosArrayGetSize(pColIdList);

  if (numOfCols == 0) {  // all columns are required
    for (int32_t i = 0; i < pSchema->nCols; ++i) {
      SSchema*        pColSchema = &pSchema->pSchema[i];
      SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);

      int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
      if (code != TSDB_CODE_SUCCESS) {
        blockDataFreeRes(pBlock);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
    }
  } else {
    if (numOfCols > pSchema->nCols) {
      numOfCols = pSchema->nCols;
    }

    int32_t i = 0;
    int32_t j = 0;
    while (i < pSchema->nCols && j < numOfCols) {
      SSchema* pColSchema = &pSchema->pSchema[i];
      col_id_t colIdSchema = pColSchema->colId;

      col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pColIdList, j);
      if (colIdSchema < colIdNeed) {
        i++;
      } else if (colIdSchema > colIdNeed) {
        j++;
      } else {
        SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
        int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
        if (code != TSDB_CODE_SUCCESS) {
          return -1;
        }
        i++;
        j++;
      }
    }
  }

  return TSDB_CODE_SUCCESS;
}

596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
  int32_t code = TSDB_CODE_SUCCESS;

  if (IS_STR_DATA_TYPE(pColVal->type)) {
    char val[65535 + 2] = {0};
    if (pColVal->value.pData != NULL) {
      memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
      varDataSetLen(val, pColVal->value.nData);
      code = colDataSetVal(pColumnInfoData, rowIndex, val, !COL_VAL_IS_VALUE(pColVal));
    } else {
      colDataSetNULL(pColumnInfoData, rowIndex);
    }
  } else {
    code = colDataSetVal(pColumnInfoData, rowIndex, (void*)&pColVal->value.val, !COL_VAL_IS_VALUE(pColVal));
  }

  return code;
}

615
int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
616
  tqTrace("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
617
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
618

619
  SSDataBlock* pBlock = pReader->pResBlock;
620 621
  *pRes = pBlock;

H
Haojun Liao 已提交
622 623
  blockDataCleanup(pBlock);

624
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
L
Liu Jicong 已提交
625 626 627 628 629 630
  int32_t sversion = pSubmitTbData->sver;
  int64_t suid = pSubmitTbData->suid;
  int64_t uid = pSubmitTbData->uid;
  pReader->lastBlkUid = uid;

  pBlock->info.id.uid = uid;
631
  pBlock->info.version = pReader->msg.ver;
L
Liu Jicong 已提交
632

633 634
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
      (pReader->cachedSchemaVer != sversion)) {
635
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
L
Liu Jicong 已提交
636 637 638

    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1);
    if (pReader->pSchemaWrapper == NULL) {
639 640
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
             "version %d, possibly dropped table",
641
             vgId, suid, uid, pReader->cachedSchemaVer);
L
Liu Jicong 已提交
642 643 644 645 646
      pReader->cachedSchemaSuid = 0;
      terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
      return -1;
    }

647
    pReader->cachedSchemaUid = uid;
648 649
    pReader->cachedSchemaSuid = suid;
    pReader->cachedSchemaVer = sversion;
L
Liu Jicong 已提交
650

651
    ASSERT(pReader->cachedSchemaVer == pReader->pSchemaWrapper->version);
652 653 654
    if (blockDataGetNumOfCols(pBlock) == 0) {
      int32_t code = buildResSDataBlock(pReader->pResBlock, pReader->pSchemaWrapper, pReader->pColIdList);
      if (code != TSDB_CODE_SUCCESS) {
655
        tqError("vgId:%d failed to build data block, code:%s", vgId, tstrerror(code));
656
        return code;
L
Liu Jicong 已提交
657 658
      }
    }
659
  }
L
Liu Jicong 已提交
660

661 662 663 664 665 666 667
  int32_t numOfRows = 0;
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
    numOfRows = pCol->nVal;
  } else {
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
  }
L
Liu Jicong 已提交
668

669 670
  if (blockDataEnsureCapacity(pBlock, numOfRows) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
671
    return -1;
672
  }
L
Liu Jicong 已提交
673

674
  pBlock->info.rows = numOfRows;
L
Liu Jicong 已提交
675

676
  int32_t colActual = blockDataGetNumOfCols(pBlock);
L
Liu Jicong 已提交
677

678 679 680 681 682 683 684 685 686
  // convert and scan one block
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
    SArray* pCols = pSubmitTbData->aCol;
    int32_t numOfCols = taosArrayGetSize(pCols);
    int32_t targetIdx = 0;
    int32_t sourceIdx = 0;
    while (targetIdx < colActual) {
      if (sourceIdx >= numOfCols) {
        tqError("tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
687
        return -1;
688
      }
L
Liu Jicong 已提交
689

690 691 692 693 694 695
      SColData*        pCol = taosArrayGet(pCols, sourceIdx);
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
      SColVal          colVal;

      if (pCol->nVal != numOfRows) {
        tqError("tqRetrieveDataBlock pCol->nVal:%d != numOfRows:%d", pCol->nVal, numOfRows);
696
        return -1;
697 698 699 700 701 702 703
      }

      if (pCol->cid < pColData->info.colId) {
        sourceIdx++;
      } else if (pCol->cid == pColData->info.colId) {
        for (int32_t i = 0; i < pCol->nVal; i++) {
          tColDataGetValue(pCol, i, &colVal);
704 705 706
          int32_t code = doSetVal(pColData, i, &colVal);
          if (code != TSDB_CODE_SUCCESS) {
            return code;
707 708 709 710 711
          }
        }
        sourceIdx++;
        targetIdx++;
      } else {
712
        colDataSetNNULL(pColData, 0, pCol->nVal);
713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730
        targetIdx++;
      }
    }
  } else {
    SArray*         pRows = pSubmitTbData->aRowP;
    SSchemaWrapper* pWrapper = pReader->pSchemaWrapper;
    STSchema*       pTSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);

    for (int32_t i = 0; i < numOfRows; i++) {
      SRow*   pRow = taosArrayGetP(pRows, i);
      int32_t sourceIdx = 0;

      for (int32_t j = 0; j < colActual; j++) {
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
        while (1) {
          SColVal colVal;
          tRowGet(pRow, pTSchema, sourceIdx, &colVal);
          if (colVal.cid < pColData->info.colId) {
dengyihao's avatar
dengyihao 已提交
731 732 733
            //            tqDebug("colIndex:%d column id:%d in row, ignore, the required colId:%d, total cols in
            //            schema:%d",
            //                    sourceIdx, colVal.cid, pColData->info.colId, pTSchema->numOfCols);
734 735 736
            sourceIdx++;
            continue;
          } else if (colVal.cid == pColData->info.colId) {
737 738 739
            int32_t code = doSetVal(pColData, i, &colVal);
            if (code != TSDB_CODE_SUCCESS) {
              return code;
L
Liu Jicong 已提交
740
            }
L
Liu Jicong 已提交
741

742 743 744 745 746
            sourceIdx++;
            break;
          } else {
            colDataSetNULL(pColData, i);
            break;
L
Liu Jicong 已提交
747 748 749 750
          }
        }
      }
    }
751 752

    taosMemoryFreeClear(pTSchema);
L
Liu Jicong 已提交
753 754 755 756 757
  }

  return 0;
}

758
// todo refactor:
759
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) {
760
  tqDebug("tq reader retrieve data block %p, %d", pReader->msg.msgStr, pReader->nextBlk);
761 762 763 764

  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
  pReader->nextBlk++;

765 766 767 768
  if (pSubmitTbDataRet) {
    *pSubmitTbDataRet = pSubmitTbData;
  }

769 770 771 772 773
  int32_t sversion = pSubmitTbData->sver;
  int64_t suid = pSubmitTbData->suid;
  int64_t uid = pSubmitTbData->uid;
  pReader->lastBlkUid = uid;

774
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
775 776 777 778 779 780 781 782 783 784
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1);
  if (pReader->pSchemaWrapper == NULL) {
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
           pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
    pReader->cachedSchemaSuid = 0;
    terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
    return -1;
  }

  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
785
  int32_t         numOfRows = 0;
786 787 788 789 790 791 792 793 794 795 796 797

  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
    SArray*   pCols = pSubmitTbData->aCol;
    SColData* pCol = taosArrayGet(pCols, 0);
    numOfRows = pCol->nVal;
  } else {
    SArray* pRows = pSubmitTbData->aRowP;
    numOfRows = taosArrayGetSize(pRows);
  }

  int32_t curRow = 0;
  int32_t lastRow = 0;
798
  char*   assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
799 800 801 802 803 804 805 806 807
  if (assigned == NULL) return -1;

  // convert and scan one block
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
    SArray* pCols = pSubmitTbData->aCol;
    int32_t numOfCols = taosArrayGetSize(pCols);
    for (int32_t i = 0; i < numOfRows; i++) {
      bool buildNew = false;

808
      for (int32_t j = 0; j < numOfCols; j++) {
809
        SColData* pCol = taosArrayGet(pCols, j);
810
        SColVal   colVal;
811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830
        tColDataGetValue(pCol, i, &colVal);
        if (curRow == 0) {
          assigned[j] = !COL_VAL_IS_NONE(&colVal);
          buildNew = true;
        } else {
          bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal);
          if (currentRowAssigned != assigned[j]) {
            assigned[j] = currentRowAssigned;
            buildNew = true;
          }
        }
      }

      if (buildNew) {
        if (taosArrayGetSize(blocks) > 0) {
          SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
          pLastBlock->info.rows = curRow - lastRow;
          lastRow = curRow;
        }

831
        SSDataBlock     block = {0};
832
        SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
833
        if (pSW == NULL) {
834 835 836 837 838 839
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          goto FAIL;
        }

        if (tqMaskBlock(pSW, &block, pSchemaWrapper, assigned) < 0) {
          blockDataFreeRes(&block);
840
          tDeleteSchemaWrapper(pSW);
841 842 843 844 845 846
          goto FAIL;
        }
        tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
                (int32_t)taosArrayGetSize(block.pDataBlock));

        block.info.id.uid = uid;
847
        block.info.version = pReader->msg.ver;
848 849 850
        if (blockDataEnsureCapacity(&block, numOfRows - curRow) < 0) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          blockDataFreeRes(&block);
851
          tDeleteSchemaWrapper(pSW);
852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880
          goto FAIL;
        }
        taosArrayPush(blocks, &block);
        taosArrayPush(schemas, &pSW);
      }

      SSDataBlock* pBlock = taosArrayGetLast(blocks);

      tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
              (int32_t)taosArrayGetSize(blocks));

      int32_t targetIdx = 0;
      int32_t sourceIdx = 0;
      int32_t colActual = blockDataGetNumOfCols(pBlock);
      while (targetIdx < colActual) {
        SColData*        pCol = taosArrayGet(pCols, sourceIdx);
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
        SColVal          colVal;

        if (pCol->cid < pColData->info.colId) {
          sourceIdx++;
        } else if (pCol->cid == pColData->info.colId) {
          tColDataGetValue(pCol, i, &colVal);

          if (IS_STR_DATA_TYPE(colVal.type)) {
            if (colVal.value.pData != NULL) {
              char val[65535 + 2];
              memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
              varDataSetLen(val, colVal.value.nData);
881
              if (colDataSetVal(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
882 883 884
                goto FAIL;
              }
            } else {
X
Xiaoyu Wang 已提交
885
              colDataSetNULL(pColData, curRow - lastRow);
886 887
            }
          } else {
888
            if (colDataSetVal(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
889 890 891 892 893 894 895 896 897 898 899
              goto FAIL;
            }
          }
          sourceIdx++;
          targetIdx++;
        }
      }

      curRow++;
    }
  } else {
900
    SSchemaWrapper* pWrapper = pReader->pSchemaWrapper;
901 902
    STSchema*       pTSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);
    SArray*         pRows = pSubmitTbData->aRowP;
903

904
    for (int32_t i = 0; i < numOfRows; i++) {
905 906
      SRow* pRow = taosArrayGetP(pRows, i);
      bool  buildNew = false;
907

908
      for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
909
        SColVal colVal;
910
        tRowGet(pRow, pTSchema, j, &colVal);
911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929
        if (curRow == 0) {
          assigned[j] = !COL_VAL_IS_NONE(&colVal);
          buildNew = true;
        } else {
          bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal);
          if (currentRowAssigned != assigned[j]) {
            assigned[j] = currentRowAssigned;
            buildNew = true;
          }
        }
      }

      if (buildNew) {
        if (taosArrayGetSize(blocks) > 0) {
          SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
          pLastBlock->info.rows = curRow - lastRow;
          lastRow = curRow;
        }

930
        SSDataBlock     block = {0};
931
        SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
932
        if (pSW == NULL) {
933 934 935 936 937 938
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          goto FAIL;
        }

        if (tqMaskBlock(pSW, &block, pSchemaWrapper, assigned) < 0) {
          blockDataFreeRes(&block);
939
          tDeleteSchemaWrapper(pSW);
940 941 942 943 944 945
          goto FAIL;
        }
        tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
                (int32_t)taosArrayGetSize(block.pDataBlock));

        block.info.id.uid = uid;
946
        block.info.version = pReader->msg.ver;
947 948 949
        if (blockDataEnsureCapacity(&block, numOfRows - curRow) < 0) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          blockDataFreeRes(&block);
950
          tDeleteSchemaWrapper(pSW);
951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966
          goto FAIL;
        }
        taosArrayPush(blocks, &block);
        taosArrayPush(schemas, &pSW);
      }

      SSDataBlock* pBlock = taosArrayGetLast(blocks);

      tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
              (int32_t)taosArrayGetSize(blocks));

      int32_t targetIdx = 0;
      int32_t sourceIdx = 0;
      int32_t colActual = blockDataGetNumOfCols(pBlock);
      while (targetIdx < colActual) {
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
967
        SColVal          colVal;
968
        tRowGet(pRow, pTSchema, sourceIdx, &colVal);
969 970 971 972 973 974 975 976 977

        if (colVal.cid < pColData->info.colId) {
          sourceIdx++;
        } else if (colVal.cid == pColData->info.colId) {
          if (IS_STR_DATA_TYPE(colVal.type)) {
            if (colVal.value.pData != NULL) {
              char val[65535 + 2];
              memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
              varDataSetLen(val, colVal.value.nData);
978
              if (colDataSetVal(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
979 980 981
                goto FAIL;
              }
            } else {
X
Xiaoyu Wang 已提交
982
              colDataSetNULL(pColData, curRow - lastRow);
983 984
            }
          } else {
985
            if (colDataSetVal(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
986 987 988 989 990 991 992 993 994
              goto FAIL;
            }
          }
          sourceIdx++;
          targetIdx++;
        }
      }
      curRow++;
    }
995 996

    taosMemoryFreeClear(pTSchema);
L
Liu Jicong 已提交
997
  }
998 999 1000 1001 1002 1003 1004

  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
  pLastBlock->info.rows = curRow - lastRow;

  taosMemoryFree(assigned);
  return 0;

1005 1006 1007 1008 1009
FAIL:
  taosMemoryFree(assigned);
  return -1;
}

L
Liu Jicong 已提交
1010
void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) { pReader->pColIdList = pColIdList; }
H
Hongze Cheng 已提交
1011

1012
int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
L
Liu Jicong 已提交
1013 1014 1015
  if (pReader->tbIdHash) {
    taosHashClear(pReader->tbIdHash);
  } else {
5
54liuyao 已提交
1016
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
H
Hongze Cheng 已提交
1017 1018
  }

L
Liu Jicong 已提交
1019
  if (pReader->tbIdHash == NULL) {
H
Hongze Cheng 已提交
1020 1021 1022 1023 1024 1025
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
L
Liu Jicong 已提交
1026
    taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
H
Hongze Cheng 已提交
1027 1028
  }

1029
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t) taosArrayGetSize(tbUidList));
H
Hongze Cheng 已提交
1030 1031 1032
  return 0;
}

1033
int tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
L
Liu Jicong 已提交
1034
  if (pReader->tbIdHash == NULL) {
5
54liuyao 已提交
1035
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
1036
    if (pReader->tbIdHash == NULL) {
H
Hongze Cheng 已提交
1037 1038 1039 1040 1041
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
  }

1042 1043 1044
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
  for (int i = 0; i < numOfTables; i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
L
Liu Jicong 已提交
1045
    taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
H
Hongze Cheng 已提交
1046 1047 1048 1049
  }

  return 0;
}
1050

1051 1052 1053 1054 1055 1056 1057 1058
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t));
}

bool tqCurrentBlockConsumed(const STqReader* pReader) {
    return pReader->msg.msgStr == NULL;
}

L
Liu Jicong 已提交
1059
int tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
L
Liu Jicong 已提交
1060 1061
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
L
Liu Jicong 已提交
1062
    taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t));
1063 1064 1065 1066
  }

  return 0;
}
L
Liu Jicong 已提交
1067

1068
// todo update the table list in wal reader
L
Liu Jicong 已提交
1069
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
1070 1071 1072 1073
  void*   pIter = NULL;
  int32_t vgId = TD_VID(pTq->pVnode);

  // update the table list for each consumer handle
L
Liu Jicong 已提交
1074
  while (1) {
1075
    pIter = taosHashIterate(pTq->pHandle, pIter);
H
Haojun Liao 已提交
1076 1077 1078 1079
    if (pIter == NULL) {
      break;
    }

1080 1081
    STqHandle* pTqHandle = (STqHandle*)pIter;
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
1082
      int32_t code = qUpdateTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList, isAdd);
L
Liu Jicong 已提交
1083
      if (code != 0) {
1084
        tqError("update qualified table error for %s", pTqHandle->subKey);
L
Liu Jicong 已提交
1085 1086
        continue;
      }
1087
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
L
Liu Jicong 已提交
1088 1089 1090 1091
      if (!isAdd) {
        int32_t sz = taosArrayGetSize(tbUidList);
        for (int32_t i = 0; i < sz; i++) {
          int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
1092
          taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
L
Liu Jicong 已提交
1093 1094
        }
      }
1095
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
1096
      if (isAdd) {
1097
        SArray* list = NULL;
wmmhello's avatar
wmmhello 已提交
1098
        int ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node, &list, pTqHandle->execHandle.task);
1099 1100 1101 1102
        if(ret != TDB_CODE_SUCCESS) {
          tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId);
          taosArrayDestroy(list);
          return ret;
1103
        }
H
Haojun Liao 已提交
1104
        tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
1105
        taosArrayDestroy(list);
1106
      } else {
1107
        tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
1108
      }
L
Liu Jicong 已提交
1109 1110
    }
  }
1111 1112

  // update the table list handle for each stream scanner/wal reader
1113
  taosWLockLatch(&pTq->pStreamMeta->lock);
L
Liu Jicong 已提交
1114
  while (1) {
L
Liu Jicong 已提交
1115
    pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
1116 1117 1118 1119
    if (pIter == NULL) {
      break;
    }

1120
    SStreamTask* pTask = *(SStreamTask**)pIter;
1121
    if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
1122
      int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd);
L
Liu Jicong 已提交
1123
      if (code != 0) {
1124
        tqError("vgId:%d, s-task:%s update qualified table error for stream task", vgId, pTask->id.idStr);
L
Liu Jicong 已提交
1125 1126
        continue;
      }
L
Liu Jicong 已提交
1127 1128
    }
  }
1129

1130
  taosWUnLockLatch(&pTq->pStreamMeta->lock);
L
Liu Jicong 已提交
1131 1132
  return 0;
}