tqRead.c 35.9 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "tmsg.h"
H
Hongze Cheng 已提交
17
#include "tq.h"
L
Liu Jicong 已提交
18

L
Liu Jicong 已提交
19 20
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
21 22 23
    return true;
  }

L
Liu Jicong 已提交
24 25 26
  int16_t msgType = pHead->msgType;
  char*   body = pHead->body;
  int32_t bodyLen = pHead->bodyLen;
27

L
Liu Jicong 已提交
28 29 30 31 32
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
  int64_t  realTbSuid = 0;
  SDecoder coder;
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
  int32_t  len = bodyLen - sizeof(SMsgHead);
33 34 35 36 37 38 39 40 41 42 43 44 45
  tDecoderInit(&coder, data, len);

  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
    SVCreateStbReq req = {0};
    if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
      goto end;
    }
    realTbSuid = req.suid;
  } else if (msgType == TDMT_VND_DROP_STB) {
    SVDropStbReq req = {0};
    if (tDecodeSVDropStbReq(&coder, &req) < 0) {
      goto end;
    }
L
Liu Jicong 已提交
46
    realTbSuid = req.suid;
47 48 49 50 51 52
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
    SVCreateTbBatchReq req = {0};
    if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
      goto end;
    }

L
Liu Jicong 已提交
53
    int32_t        needRebuild = 0;
54 55 56
    SVCreateTbReq* pCreateReq = NULL;
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
      pCreateReq = req.pReqs + iReq;
L
Liu Jicong 已提交
57
      if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
58 59 60
        needRebuild++;
      }
    }
L
Liu Jicong 已提交
61
    if (needRebuild == 0) {
62
      // do nothing
L
Liu Jicong 已提交
63
    } else if (needRebuild == req.nReqs) {
64
      realTbSuid = tbSuid;
L
Liu Jicong 已提交
65
    } else {
66 67 68 69 70
      realTbSuid = tbSuid;
      SVCreateTbBatchReq reqNew = {0};
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
        pCreateReq = req.pReqs + iReq;
L
Liu Jicong 已提交
71
        if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
72 73 74 75 76
          reqNew.nReqs++;
          taosArrayPush(reqNew.pArray, pCreateReq);
        }
      }

L
Liu Jicong 已提交
77
      int     tlen;
78 79 80 81 82
      int32_t ret = 0;
      tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret);
      void* buf = taosMemoryMalloc(tlen);
      if (NULL == buf) {
        taosArrayDestroy(reqNew.pArray);
83 84 85 86 87 88 89
        for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
          pCreateReq = req.pReqs + iReq;
          taosMemoryFreeClear(pCreateReq->comment);
          if (pCreateReq->type == TSDB_CHILD_TABLE) {
            taosArrayDestroy(pCreateReq->ctb.tagName);
          }
        }
90 91 92 93 94 95 96 97 98 99 100
        goto end;
      }
      SEncoder coderNew = {0};
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
      tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
      tEncoderClear(&coderNew);
      memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
      pHead->bodyLen = tlen + sizeof(SMsgHead);
      taosMemoryFree(buf);
      taosArrayDestroy(reqNew.pArray);
    }
101 102 103 104 105 106 107 108

    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
      pCreateReq = req.pReqs + iReq;
      taosMemoryFreeClear(pCreateReq->comment);
      if (pCreateReq->type == TSDB_CHILD_TABLE) {
        taosArrayDestroy(pCreateReq->ctb.tagName);
      }
    }
109
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
L
Liu Jicong 已提交
110
    SVAlterTbReq req = {0};
111 112 113 114 115 116

    if (tDecodeSVAlterTbReq(&coder, &req) < 0) {
      goto end;
    }

    SMetaReader mr = {0};
117
    metaReaderDoInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, 0);
118 119 120 121 122 123 124 125 126 127 128 129 130 131

    if (metaGetTableEntryByName(&mr, req.tbName) < 0) {
      metaReaderClear(&mr);
      goto end;
    }
    realTbSuid = mr.me.ctbEntry.suid;
    metaReaderClear(&mr);
  } else if (msgType == TDMT_VND_DROP_TABLE) {
    SVDropTbBatchReq req = {0};

    if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
      goto end;
    }

L
Liu Jicong 已提交
132
    int32_t      needRebuild = 0;
133 134 135 136
    SVDropTbReq* pDropReq = NULL;
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
      pDropReq = req.pReqs + iReq;

L
Liu Jicong 已提交
137
      if (pDropReq->suid == tbSuid) {
138 139 140
        needRebuild++;
      }
    }
L
Liu Jicong 已提交
141
    if (needRebuild == 0) {
142
      // do nothing
L
Liu Jicong 已提交
143
    } else if (needRebuild == req.nReqs) {
144
      realTbSuid = tbSuid;
L
Liu Jicong 已提交
145
    } else {
146 147 148 149 150
      realTbSuid = tbSuid;
      SVDropTbBatchReq reqNew = {0};
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
        pDropReq = req.pReqs + iReq;
L
Liu Jicong 已提交
151
        if (pDropReq->suid == tbSuid) {
152 153 154 155 156
          reqNew.nReqs++;
          taosArrayPush(reqNew.pArray, pDropReq);
        }
      }

L
Liu Jicong 已提交
157
      int     tlen;
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
      int32_t ret = 0;
      tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret);
      void* buf = taosMemoryMalloc(tlen);
      if (NULL == buf) {
        taosArrayDestroy(reqNew.pArray);
        goto end;
      }
      SEncoder coderNew = {0};
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
      tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
      tEncoderClear(&coderNew);
      memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
      pHead->bodyLen = tlen + sizeof(SMsgHead);
      taosMemoryFree(buf);
      taosArrayDestroy(reqNew.pArray);
    }
  } else if (msgType == TDMT_VND_DELETE) {
    SDeleteRes req = {0};
    if (tDecodeDeleteRes(&coder, &req) < 0) {
      goto end;
    }
    realTbSuid = req.suid;
  }

L
Liu Jicong 已提交
182
end:
183 184 185 186
  tDecoderClear(&coder);
  return tbSuid == realTbSuid;
}

187
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead, uint64_t reqId) {
L
Liu Jicong 已提交
188
  int32_t code = 0;
189
  int32_t vgId = TD_VID(pTq->pVnode);
H
Haojun Liao 已提交
190

wmmhello's avatar
wmmhello 已提交
191
  taosThreadMutexLock(&pHandle->pWalReader->mutex);
L
Liu Jicong 已提交
192 193 194
  int64_t offset = *fetchOffset;

  while (1) {
wmmhello's avatar
wmmhello 已提交
195
    if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
196 197
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
              ", no more log to return, reqId:0x%" PRIx64,
198
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId);
L
Liu Jicong 已提交
199 200 201 202 203
      *fetchOffset = offset - 1;
      code = -1;
      goto END;
    }

204 205
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64, vgId,
            pHandle->consumerId, offset, TMSG_INFO((*ppCkHead)->head.msgType), reqId);
206

L
Liu Jicong 已提交
207
    if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) {
wmmhello's avatar
wmmhello 已提交
208
      code = walFetchBody(pHandle->pWalReader, ppCkHead);
L
Liu Jicong 已提交
209 210 211 212 213 214 215 216 217 218

      if (code < 0) {
        *fetchOffset = offset;
        code = -1;
        goto END;
      }
      *fetchOffset = offset;
      code = 0;
      goto END;
    } else {
219
      if (pHandle->fetchMeta != WITH_DATA) {
L
Liu Jicong 已提交
220
        SWalCont* pHead = &((*ppCkHead)->head);
wmmhello's avatar
wmmhello 已提交
221
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
wmmhello's avatar
wmmhello 已提交
222
          code = walFetchBody(pHandle->pWalReader, ppCkHead);
L
Liu Jicong 已提交
223 224 225 226 227
          if (code < 0) {
            *fetchOffset = offset;
            code = -1;
            goto END;
          }
L
Liu Jicong 已提交
228

L
Liu Jicong 已提交
229
          if (isValValidForTable(pHandle, pHead)) {
230 231 232
            *fetchOffset = offset;
            code = 0;
            goto END;
L
Liu Jicong 已提交
233 234 235
          } else {
            offset++;
            continue;
236
          }
L
Liu Jicong 已提交
237 238
        }
      }
wmmhello's avatar
wmmhello 已提交
239
      code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead);
L
Liu Jicong 已提交
240 241 242 243 244 245 246 247
      if (code < 0) {
        *fetchOffset = offset;
        code = -1;
        goto END;
      }
      offset++;
    }
  }
H
Haojun Liao 已提交
248

L
Liu Jicong 已提交
249
END:
wmmhello's avatar
wmmhello 已提交
250
  taosThreadMutexUnlock(&pHandle->pWalReader->mutex);
L
Liu Jicong 已提交
251 252 253
  return code;
}

254
STqReader* tqReaderOpen(SVnode* pVnode) {
L
Liu Jicong 已提交
255
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
L
Liu Jicong 已提交
256
  if (pReader == NULL) {
L
Liu Jicong 已提交
257 258
    return NULL;
  }
L
Liu Jicong 已提交
259

L
Liu Jicong 已提交
260 261
  pReader->pWalReader = walOpenReader(pVnode->pWal, NULL);
  if (pReader->pWalReader == NULL) {
L
Liu Jicong 已提交
262
    taosMemoryFree(pReader);
L
Liu Jicong 已提交
263 264
    return NULL;
  }
L
Liu Jicong 已提交
265 266 267 268 269 270 271

  pReader->pVnodeMeta = pVnode->pMeta;
  pReader->pColIdList = NULL;
  pReader->cachedSchemaVer = 0;
  pReader->cachedSchemaSuid = 0;
  pReader->pSchemaWrapper = NULL;
  pReader->tbIdHash = NULL;
272
  pReader->pResBlock = createDataBlock();
L
Liu Jicong 已提交
273 274 275
  return pReader;
}

276
void tqReaderClose(STqReader* pReader) {
L
Liu Jicong 已提交
277
  // close wal reader
278 279 280
  if (pReader->pWalReader) {
    walCloseReader(pReader->pWalReader);
  }
281

282
  if (pReader->pSchemaWrapper) {
283
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
284
  }
285

286 287 288
  if (pReader->pColIdList) {
    taosArrayDestroy(pReader->pColIdList);
  }
289

L
Liu Jicong 已提交
290
  // free hash
291
  blockDataDestroy(pReader->pResBlock);
292
  taosHashCleanup(pReader->tbIdHash);
293
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
L
Liu Jicong 已提交
294 295 296
  taosMemoryFree(pReader);
}

297
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
298
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
L
Liu Jicong 已提交
299 300
    return -1;
  }
301
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
302
  return 0;
L
Liu Jicong 已提交
303 304
}

305 306 307 308
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id) {
  int32_t code = walNextValidMsg(pReader);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
309 310 311 312
  }

  int64_t ver = pReader->pHead->head.version;

313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
  if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
    void*   pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg));
    int32_t len = pReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);

    void* data = taosMemoryMalloc(len);
    if (data == NULL) {
      // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
      return -1;
    }

    memcpy(data, pBody, len);
    SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};

dengyihao's avatar
dengyihao 已提交
328
    *pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT);
329 330 331 332 333 334 335 336 337 338 339 340
    if (*pItem == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      tqError("%s failed to create data submit for stream since out of memory", id);
      return terrno;
    }
  } else if (pReader->pHead->head.msgType == TDMT_VND_DELETE) {
    void*   pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SMsgHead));
    int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead);

    extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
  } else {
    ASSERT(0);
341 342 343 344 345
  }

  return 0;
}

346
// todo ignore the error in wal?
347
bool tqNextBlockInWal(STqReader* pReader, const char* id) {
348 349
  SWalReader* pWalReader = pReader->pWalReader;

L
Liu Jicong 已提交
350
  while (1) {
351 352 353
    SArray* pBlockList = pReader->submit.aSubmitTbData;
    if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) {
      // try next message in wal file
354
      // todo always retry to avoid read failure caused by wal file deletion
355
      if (walNextValidMsg(pWalReader) < 0) {
356
        return false;
L
Liu Jicong 已提交
357
      }
358

359 360 361 362 363 364
      void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
      int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
      int64_t ver = pWalReader->pHead->head.version;

      SDecoder decoder = {0};
      tDecoderInit(&decoder, pBody, bodyLen);
H
Haojun Liao 已提交
365 366 367 368 369

      {
        int32_t nSubmitTbData = taosArrayGetSize(pReader->submit.aSubmitTbData);
        for (int32_t i = 0; i < nSubmitTbData; i++) {
          SSubmitTbData* pData = taosArrayGet(pReader->submit.aSubmitTbData, i);
H
Haojun Liao 已提交
370 371 372 373
          if (pData->pCreateTbReq != NULL) {
            taosArrayDestroy(pData->pCreateTbReq->ctb.tagName);
            taosMemoryFreeClear(pData->pCreateTbReq);
          }
H
Haojun Liao 已提交
374 375 376 377
          pData->aRowP = taosArrayDestroy(pData->aRowP);
        }
        pReader->submit.aSubmitTbData = taosArrayDestroy(pReader->submit.aSubmitTbData);
      }
H
Haojun Liao 已提交
378

379 380
      if (tDecodeSubmitReq(&decoder, &pReader->submit) < 0) {
        tDecoderClear(&decoder);
381
        tqError("decode wal file error, msgLen:%d, ver:%" PRId64, bodyLen, ver);
382
        return false;
383
      }
wmmhello's avatar
wmmhello 已提交
384

385 386
      tDecoderClear(&decoder);
      pReader->nextBlk = 0;
L
Liu Jicong 已提交
387 388
    }

H
Haojun Liao 已提交
389
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
390
    while (pReader->nextBlk < numOfBlocks) {
391
      tqTrace("tq reader next data block %d/%d, len:%d %" PRId64 " %d", pReader->nextBlk,
392
          numOfBlocks, pReader->msg.msgLen, pReader->msg.ver, pReader->nextBlk);
393 394 395 396

      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);

      if (pReader->tbIdHash == NULL) {
397 398
        SSDataBlock* pRes = NULL;
        int32_t      code = tqRetrieveDataBlock(pReader, &pRes, NULL);
399
        if (code == TSDB_CODE_SUCCESS && pRes->info.rows > 0) {
400
          return true;
401
        }
L
Liu Jicong 已提交
402
      }
403

404 405
      void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
      if (ret != NULL) {
406
        tqTrace("tq reader return submit block, uid:%" PRId64 ", ver:%" PRId64, pSubmitTbData->uid, pReader->msg.ver);
407

408 409 410
        SSDataBlock* pRes = NULL;
        int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL);
        if (code == TSDB_CODE_SUCCESS && pRes->info.rows > 0) {
411
          return true;
412 413 414
        }
      } else {
        pReader->nextBlk += 1;
415
        tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
416
      }
L
Liu Jicong 已提交
417
    }
418

419
    qTrace("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id);
420
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
421

422
    pReader->msg.msgStr = NULL;
L
Liu Jicong 已提交
423
  }
L
Liu Jicong 已提交
424 425
}

426
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
427 428 429
  pReader->msg.msgStr = msgStr;
  pReader->msg.msgLen = msgLen;
  pReader->msg.ver = ver;
L
Liu Jicong 已提交
430

L
Liu Jicong 已提交
431
  tqDebug("tq reader set msg %p %d", msgStr, msgLen);
wmmhello's avatar
wmmhello 已提交
432
  SDecoder decoder;
433 434 435

  tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen);
  if (tDecodeSubmitReq(&decoder, &pReader->submit) < 0) {
L
Liu Jicong 已提交
436
    tDecoderClear(&decoder);
437
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
wmmhello's avatar
wmmhello 已提交
438
    return -1;
L
Liu Jicong 已提交
439
  }
440

wmmhello's avatar
wmmhello 已提交
441
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
442 443
  return 0;
}
L
Liu Jicong 已提交
444

445 446 447 448
SWalReader* tqGetWalReader(STqReader* pReader) {
  return pReader->pWalReader;
}

H
Haojun Liao 已提交
449 450 451
SSDataBlock* tqGetResultBlock (STqReader* pReader) {
  return pReader->pResBlock;
}
452

453
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
454
  if (pReader->msg.msgStr == NULL) {
455 456 457
    return false;
  }

458 459
  int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
  while (pReader->nextBlk < numOfBlocks) {
460
    tqDebug("try next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver,
dengyihao's avatar
dengyihao 已提交
461
            pReader->nextBlk, numOfBlocks, idstr);
462

L
Liu Jicong 已提交
463
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
464 465 466
    if (pReader->tbIdHash == NULL) {
      return true;
    }
L
Liu Jicong 已提交
467 468 469

    void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
    if (ret != NULL) {
470
      tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64", %s", pReader->msg.ver, pSubmitTbData->uid, idstr);
L
Liu Jicong 已提交
471
      return true;
472
    } else {
473 474
      tqDebug("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid,
          taosHashGetSize(pReader->tbIdHash), idstr);
L
Liu Jicong 已提交
475
    }
476

L
Liu Jicong 已提交
477
    pReader->nextBlk++;
L
Liu Jicong 已提交
478 479
  }

480
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
L
Liu Jicong 已提交
481
  pReader->nextBlk = 0;
482
  pReader->msg.msgStr = NULL;
L
Liu Jicong 已提交
483 484 485 486

  return false;
}

487
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
488
  if (pReader->msg.msgStr == NULL) return false;
L
Liu Jicong 已提交
489

L
Liu Jicong 已提交
490
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
L
Liu Jicong 已提交
491
  while (pReader->nextBlk < blockSz) {
L
Liu Jicong 已提交
492
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
493
    if (filterOutUids == NULL) return true;
L
Liu Jicong 已提交
494

495
    void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
L
Liu Jicong 已提交
496 497 498
    if (ret == NULL) {
      return true;
    }
499
    pReader->nextBlk++;
L
Liu Jicong 已提交
500 501
  }

502
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
L
Liu Jicong 已提交
503
  pReader->nextBlk = 0;
504
  pReader->msg.msgStr = NULL;
L
Liu Jicong 已提交
505 506 507

  return false;
}
508

509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask) {
  int32_t code;

  int32_t cnt = 0;
  for (int32_t i = 0; i < pSrc->nCols; i++) {
    cnt += mask[i];
  }

  pDst->nCols = cnt;
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
  if (pDst->pSchema == NULL) {
    return -1;
  }

  int32_t j = 0;
  for (int32_t i = 0; i < pSrc->nCols; i++) {
    if (mask[i]) {
      pDst->pSchema[j++] = pSrc->pSchema[i];
      SColumnInfoData colInfo =
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
      code = blockDataAppendColInfo(pBlock, &colInfo);
      if (code != 0) {
        return -1;
      }
    }
  }
  return 0;
}

538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586
static int32_t buildResSDataBlock(SSDataBlock* pBlock, SSchemaWrapper* pSchema, const SArray* pColIdList) {
  if (blockDataGetNumOfCols(pBlock) > 0) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t numOfCols = taosArrayGetSize(pColIdList);

  if (numOfCols == 0) {  // all columns are required
    for (int32_t i = 0; i < pSchema->nCols; ++i) {
      SSchema*        pColSchema = &pSchema->pSchema[i];
      SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);

      int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
      if (code != TSDB_CODE_SUCCESS) {
        blockDataFreeRes(pBlock);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
    }
  } else {
    if (numOfCols > pSchema->nCols) {
      numOfCols = pSchema->nCols;
    }

    int32_t i = 0;
    int32_t j = 0;
    while (i < pSchema->nCols && j < numOfCols) {
      SSchema* pColSchema = &pSchema->pSchema[i];
      col_id_t colIdSchema = pColSchema->colId;

      col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pColIdList, j);
      if (colIdSchema < colIdNeed) {
        i++;
      } else if (colIdSchema > colIdNeed) {
        j++;
      } else {
        SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
        int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
        if (code != TSDB_CODE_SUCCESS) {
          return -1;
        }
        i++;
        j++;
      }
    }
  }

  return TSDB_CODE_SUCCESS;
}

587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
  int32_t code = TSDB_CODE_SUCCESS;

  if (IS_STR_DATA_TYPE(pColVal->type)) {
    char val[65535 + 2] = {0};
    if (pColVal->value.pData != NULL) {
      memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
      varDataSetLen(val, pColVal->value.nData);
      code = colDataSetVal(pColumnInfoData, rowIndex, val, !COL_VAL_IS_VALUE(pColVal));
    } else {
      colDataSetNULL(pColumnInfoData, rowIndex);
    }
  } else {
    code = colDataSetVal(pColumnInfoData, rowIndex, (void*)&pColVal->value.val, !COL_VAL_IS_VALUE(pColVal));
  }

  return code;
}

606
int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
607
  tqTrace("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
608
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
609

610
  SSDataBlock* pBlock = pReader->pResBlock;
611 612
  *pRes = pBlock;

H
Haojun Liao 已提交
613 614
  blockDataCleanup(pBlock);

615
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
L
Liu Jicong 已提交
616 617 618 619 620 621
  int32_t sversion = pSubmitTbData->sver;
  int64_t suid = pSubmitTbData->suid;
  int64_t uid = pSubmitTbData->uid;
  pReader->lastBlkUid = uid;

  pBlock->info.id.uid = uid;
622
  pBlock->info.version = pReader->msg.ver;
L
Liu Jicong 已提交
623

624 625
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
      (pReader->cachedSchemaVer != sversion)) {
626
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
L
Liu Jicong 已提交
627 628 629

    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1);
    if (pReader->pSchemaWrapper == NULL) {
630 631
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
             "version %d, possibly dropped table",
632
             vgId, suid, uid, pReader->cachedSchemaVer);
L
Liu Jicong 已提交
633 634 635 636 637
      pReader->cachedSchemaSuid = 0;
      terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
      return -1;
    }

638
    pReader->cachedSchemaUid = uid;
639 640
    pReader->cachedSchemaSuid = suid;
    pReader->cachedSchemaVer = sversion;
L
Liu Jicong 已提交
641

642
    ASSERT(pReader->cachedSchemaVer == pReader->pSchemaWrapper->version);
643 644 645
    if (blockDataGetNumOfCols(pBlock) == 0) {
      int32_t code = buildResSDataBlock(pReader->pResBlock, pReader->pSchemaWrapper, pReader->pColIdList);
      if (code != TSDB_CODE_SUCCESS) {
646
        tqError("vgId:%d failed to build data block, code:%s", vgId, tstrerror(code));
647
        return code;
L
Liu Jicong 已提交
648 649
      }
    }
650
  }
L
Liu Jicong 已提交
651

652 653 654 655 656 657 658
  int32_t numOfRows = 0;
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
    numOfRows = pCol->nVal;
  } else {
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
  }
L
Liu Jicong 已提交
659

660 661
  if (blockDataEnsureCapacity(pBlock, numOfRows) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
662
    return -1;
663
  }
L
Liu Jicong 已提交
664

665
  pBlock->info.rows = numOfRows;
L
Liu Jicong 已提交
666

667
  int32_t colActual = blockDataGetNumOfCols(pBlock);
L
Liu Jicong 已提交
668

669 670 671 672 673 674 675 676 677
  // convert and scan one block
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
    SArray* pCols = pSubmitTbData->aCol;
    int32_t numOfCols = taosArrayGetSize(pCols);
    int32_t targetIdx = 0;
    int32_t sourceIdx = 0;
    while (targetIdx < colActual) {
      if (sourceIdx >= numOfCols) {
        tqError("tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
678
        return -1;
679
      }
L
Liu Jicong 已提交
680

681 682 683 684 685 686
      SColData*        pCol = taosArrayGet(pCols, sourceIdx);
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
      SColVal          colVal;

      if (pCol->nVal != numOfRows) {
        tqError("tqRetrieveDataBlock pCol->nVal:%d != numOfRows:%d", pCol->nVal, numOfRows);
687
        return -1;
688 689 690 691 692 693 694
      }

      if (pCol->cid < pColData->info.colId) {
        sourceIdx++;
      } else if (pCol->cid == pColData->info.colId) {
        for (int32_t i = 0; i < pCol->nVal; i++) {
          tColDataGetValue(pCol, i, &colVal);
695 696 697
          int32_t code = doSetVal(pColData, i, &colVal);
          if (code != TSDB_CODE_SUCCESS) {
            return code;
698 699 700 701 702
          }
        }
        sourceIdx++;
        targetIdx++;
      } else {
703
        colDataSetNNULL(pColData, 0, pCol->nVal);
704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721
        targetIdx++;
      }
    }
  } else {
    SArray*         pRows = pSubmitTbData->aRowP;
    SSchemaWrapper* pWrapper = pReader->pSchemaWrapper;
    STSchema*       pTSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);

    for (int32_t i = 0; i < numOfRows; i++) {
      SRow*   pRow = taosArrayGetP(pRows, i);
      int32_t sourceIdx = 0;

      for (int32_t j = 0; j < colActual; j++) {
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
        while (1) {
          SColVal colVal;
          tRowGet(pRow, pTSchema, sourceIdx, &colVal);
          if (colVal.cid < pColData->info.colId) {
dengyihao's avatar
dengyihao 已提交
722 723 724
            //            tqDebug("colIndex:%d column id:%d in row, ignore, the required colId:%d, total cols in
            //            schema:%d",
            //                    sourceIdx, colVal.cid, pColData->info.colId, pTSchema->numOfCols);
725 726 727
            sourceIdx++;
            continue;
          } else if (colVal.cid == pColData->info.colId) {
728 729 730
            int32_t code = doSetVal(pColData, i, &colVal);
            if (code != TSDB_CODE_SUCCESS) {
              return code;
L
Liu Jicong 已提交
731
            }
L
Liu Jicong 已提交
732

733 734 735 736 737
            sourceIdx++;
            break;
          } else {
            colDataSetNULL(pColData, i);
            break;
L
Liu Jicong 已提交
738 739 740 741
          }
        }
      }
    }
742 743

    taosMemoryFreeClear(pTSchema);
L
Liu Jicong 已提交
744 745 746 747 748
  }

  return 0;
}

749
// todo refactor:
750
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) {
751
  tqDebug("tq reader retrieve data block %p, %d", pReader->msg.msgStr, pReader->nextBlk);
752 753 754 755

  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
  pReader->nextBlk++;

756 757 758 759
  if (pSubmitTbDataRet) {
    *pSubmitTbDataRet = pSubmitTbData;
  }

760 761 762 763 764
  int32_t sversion = pSubmitTbData->sver;
  int64_t suid = pSubmitTbData->suid;
  int64_t uid = pSubmitTbData->uid;
  pReader->lastBlkUid = uid;

765
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
766 767 768 769 770 771 772 773 774 775
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1);
  if (pReader->pSchemaWrapper == NULL) {
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
           pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
    pReader->cachedSchemaSuid = 0;
    terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
    return -1;
  }

  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
776
  int32_t         numOfRows = 0;
777 778 779 780 781 782 783 784 785 786 787 788

  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
    SArray*   pCols = pSubmitTbData->aCol;
    SColData* pCol = taosArrayGet(pCols, 0);
    numOfRows = pCol->nVal;
  } else {
    SArray* pRows = pSubmitTbData->aRowP;
    numOfRows = taosArrayGetSize(pRows);
  }

  int32_t curRow = 0;
  int32_t lastRow = 0;
789
  char*   assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
790 791 792 793 794 795 796 797 798
  if (assigned == NULL) return -1;

  // convert and scan one block
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
    SArray* pCols = pSubmitTbData->aCol;
    int32_t numOfCols = taosArrayGetSize(pCols);
    for (int32_t i = 0; i < numOfRows; i++) {
      bool buildNew = false;

799
      for (int32_t j = 0; j < numOfCols; j++) {
800
        SColData* pCol = taosArrayGet(pCols, j);
801
        SColVal   colVal;
802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821
        tColDataGetValue(pCol, i, &colVal);
        if (curRow == 0) {
          assigned[j] = !COL_VAL_IS_NONE(&colVal);
          buildNew = true;
        } else {
          bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal);
          if (currentRowAssigned != assigned[j]) {
            assigned[j] = currentRowAssigned;
            buildNew = true;
          }
        }
      }

      if (buildNew) {
        if (taosArrayGetSize(blocks) > 0) {
          SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
          pLastBlock->info.rows = curRow - lastRow;
          lastRow = curRow;
        }

822
        SSDataBlock     block = {0};
823
        SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
824
        if (pSW == NULL) {
825 826 827 828 829 830
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          goto FAIL;
        }

        if (tqMaskBlock(pSW, &block, pSchemaWrapper, assigned) < 0) {
          blockDataFreeRes(&block);
831
          tDeleteSchemaWrapper(pSW);
832 833 834 835 836 837
          goto FAIL;
        }
        tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
                (int32_t)taosArrayGetSize(block.pDataBlock));

        block.info.id.uid = uid;
838
        block.info.version = pReader->msg.ver;
839 840 841
        if (blockDataEnsureCapacity(&block, numOfRows - curRow) < 0) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          blockDataFreeRes(&block);
842
          tDeleteSchemaWrapper(pSW);
843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871
          goto FAIL;
        }
        taosArrayPush(blocks, &block);
        taosArrayPush(schemas, &pSW);
      }

      SSDataBlock* pBlock = taosArrayGetLast(blocks);

      tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
              (int32_t)taosArrayGetSize(blocks));

      int32_t targetIdx = 0;
      int32_t sourceIdx = 0;
      int32_t colActual = blockDataGetNumOfCols(pBlock);
      while (targetIdx < colActual) {
        SColData*        pCol = taosArrayGet(pCols, sourceIdx);
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
        SColVal          colVal;

        if (pCol->cid < pColData->info.colId) {
          sourceIdx++;
        } else if (pCol->cid == pColData->info.colId) {
          tColDataGetValue(pCol, i, &colVal);

          if (IS_STR_DATA_TYPE(colVal.type)) {
            if (colVal.value.pData != NULL) {
              char val[65535 + 2];
              memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
              varDataSetLen(val, colVal.value.nData);
872
              if (colDataSetVal(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
873 874 875
                goto FAIL;
              }
            } else {
X
Xiaoyu Wang 已提交
876
              colDataSetNULL(pColData, curRow - lastRow);
877 878
            }
          } else {
879
            if (colDataSetVal(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
880 881 882 883 884 885 886 887 888 889 890
              goto FAIL;
            }
          }
          sourceIdx++;
          targetIdx++;
        }
      }

      curRow++;
    }
  } else {
891
    SSchemaWrapper* pWrapper = pReader->pSchemaWrapper;
892 893
    STSchema*       pTSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);
    SArray*         pRows = pSubmitTbData->aRowP;
894

895
    for (int32_t i = 0; i < numOfRows; i++) {
896 897
      SRow* pRow = taosArrayGetP(pRows, i);
      bool  buildNew = false;
898

899
      for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
900
        SColVal colVal;
901
        tRowGet(pRow, pTSchema, j, &colVal);
902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920
        if (curRow == 0) {
          assigned[j] = !COL_VAL_IS_NONE(&colVal);
          buildNew = true;
        } else {
          bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal);
          if (currentRowAssigned != assigned[j]) {
            assigned[j] = currentRowAssigned;
            buildNew = true;
          }
        }
      }

      if (buildNew) {
        if (taosArrayGetSize(blocks) > 0) {
          SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
          pLastBlock->info.rows = curRow - lastRow;
          lastRow = curRow;
        }

921
        SSDataBlock     block = {0};
922
        SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
923
        if (pSW == NULL) {
924 925 926 927 928 929
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          goto FAIL;
        }

        if (tqMaskBlock(pSW, &block, pSchemaWrapper, assigned) < 0) {
          blockDataFreeRes(&block);
930
          tDeleteSchemaWrapper(pSW);
931 932 933 934 935 936
          goto FAIL;
        }
        tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
                (int32_t)taosArrayGetSize(block.pDataBlock));

        block.info.id.uid = uid;
937
        block.info.version = pReader->msg.ver;
938 939 940
        if (blockDataEnsureCapacity(&block, numOfRows - curRow) < 0) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          blockDataFreeRes(&block);
941
          tDeleteSchemaWrapper(pSW);
942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957
          goto FAIL;
        }
        taosArrayPush(blocks, &block);
        taosArrayPush(schemas, &pSW);
      }

      SSDataBlock* pBlock = taosArrayGetLast(blocks);

      tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
              (int32_t)taosArrayGetSize(blocks));

      int32_t targetIdx = 0;
      int32_t sourceIdx = 0;
      int32_t colActual = blockDataGetNumOfCols(pBlock);
      while (targetIdx < colActual) {
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
958
        SColVal          colVal;
959
        tRowGet(pRow, pTSchema, sourceIdx, &colVal);
960 961 962 963 964 965 966 967 968

        if (colVal.cid < pColData->info.colId) {
          sourceIdx++;
        } else if (colVal.cid == pColData->info.colId) {
          if (IS_STR_DATA_TYPE(colVal.type)) {
            if (colVal.value.pData != NULL) {
              char val[65535 + 2];
              memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
              varDataSetLen(val, colVal.value.nData);
969
              if (colDataSetVal(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
970 971 972
                goto FAIL;
              }
            } else {
X
Xiaoyu Wang 已提交
973
              colDataSetNULL(pColData, curRow - lastRow);
974 975
            }
          } else {
976
            if (colDataSetVal(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
977 978 979 980 981 982 983 984 985
              goto FAIL;
            }
          }
          sourceIdx++;
          targetIdx++;
        }
      }
      curRow++;
    }
986 987

    taosMemoryFreeClear(pTSchema);
L
Liu Jicong 已提交
988
  }
989 990 991 992 993 994 995

  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
  pLastBlock->info.rows = curRow - lastRow;

  taosMemoryFree(assigned);
  return 0;

996 997 998 999 1000
FAIL:
  taosMemoryFree(assigned);
  return -1;
}

L
Liu Jicong 已提交
1001
void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) { pReader->pColIdList = pColIdList; }
H
Hongze Cheng 已提交
1002

1003
int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
L
Liu Jicong 已提交
1004 1005 1006
  if (pReader->tbIdHash) {
    taosHashClear(pReader->tbIdHash);
  } else {
5
54liuyao 已提交
1007
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
H
Hongze Cheng 已提交
1008 1009
  }

L
Liu Jicong 已提交
1010
  if (pReader->tbIdHash == NULL) {
H
Hongze Cheng 已提交
1011 1012 1013 1014 1015 1016
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
L
Liu Jicong 已提交
1017
    taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
H
Hongze Cheng 已提交
1018 1019
  }

1020
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t) taosArrayGetSize(tbUidList));
H
Hongze Cheng 已提交
1021 1022 1023
  return 0;
}

1024
int tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
L
Liu Jicong 已提交
1025
  if (pReader->tbIdHash == NULL) {
5
54liuyao 已提交
1026
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
1027
    if (pReader->tbIdHash == NULL) {
H
Hongze Cheng 已提交
1028 1029 1030 1031 1032
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
  }

1033 1034 1035
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
  for (int i = 0; i < numOfTables; i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
L
Liu Jicong 已提交
1036
    taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
H
Hongze Cheng 已提交
1037 1038 1039 1040
  }

  return 0;
}
1041

1042 1043 1044 1045 1046 1047 1048 1049
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t));
}

bool tqCurrentBlockConsumed(const STqReader* pReader) {
    return pReader->msg.msgStr == NULL;
}

L
Liu Jicong 已提交
1050
int tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
L
Liu Jicong 已提交
1051 1052
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
L
Liu Jicong 已提交
1053
    taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t));
1054 1055 1056 1057
  }

  return 0;
}
L
Liu Jicong 已提交
1058

1059
// todo update the table list in wal reader
L
Liu Jicong 已提交
1060
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
1061 1062 1063 1064
  void*   pIter = NULL;
  int32_t vgId = TD_VID(pTq->pVnode);

  // update the table list for each consumer handle
L
Liu Jicong 已提交
1065
  while (1) {
1066
    pIter = taosHashIterate(pTq->pHandle, pIter);
H
Haojun Liao 已提交
1067 1068 1069 1070
    if (pIter == NULL) {
      break;
    }

1071 1072
    STqHandle* pTqHandle = (STqHandle*)pIter;
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
1073
      int32_t code = qUpdateTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList, isAdd);
L
Liu Jicong 已提交
1074
      if (code != 0) {
1075
        tqError("update qualified table error for %s", pTqHandle->subKey);
L
Liu Jicong 已提交
1076 1077
        continue;
      }
1078
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
L
Liu Jicong 已提交
1079 1080 1081 1082
      if (!isAdd) {
        int32_t sz = taosArrayGetSize(tbUidList);
        for (int32_t i = 0; i < sz; i++) {
          int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
1083
          taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
L
Liu Jicong 已提交
1084 1085
        }
      }
1086
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
1087
      if (isAdd) {
1088
        SArray* list = NULL;
wmmhello's avatar
wmmhello 已提交
1089
        int ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node, &list, pTqHandle->execHandle.task);
1090 1091 1092 1093
        if(ret != TDB_CODE_SUCCESS) {
          tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId);
          taosArrayDestroy(list);
          return ret;
1094
        }
H
Haojun Liao 已提交
1095
        tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
1096
        taosArrayDestroy(list);
1097
      } else {
1098
        tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
1099
      }
L
Liu Jicong 已提交
1100 1101
    }
  }
1102 1103

  // update the table list handle for each stream scanner/wal reader
1104
  taosWLockLatch(&pTq->pStreamMeta->lock);
L
Liu Jicong 已提交
1105
  while (1) {
L
Liu Jicong 已提交
1106
    pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
1107 1108 1109 1110
    if (pIter == NULL) {
      break;
    }

1111
    SStreamTask* pTask = *(SStreamTask**)pIter;
1112
    if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
1113
      int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd);
L
Liu Jicong 已提交
1114
      if (code != 0) {
1115
        tqError("vgId:%d, s-task:%s update qualified table error for stream task", vgId, pTask->id.idStr);
L
Liu Jicong 已提交
1116 1117
        continue;
      }
L
Liu Jicong 已提交
1118 1119
    }
  }
1120

1121
  taosWUnLockLatch(&pTq->pStreamMeta->lock);
L
Liu Jicong 已提交
1122 1123
  return 0;
}