tqScan.c 11.5 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tq.h"

L
Liu Jicong 已提交
18
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
L
Liu Jicong 已提交
19 20
  int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
  void*   buf = taosMemoryCalloc(1, dataStrLen);
21 22 23
  if (buf == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
L
Liu Jicong 已提交
24 25 26

  SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
  pRetrieve->useconds = 0;
L
Liu Jicong 已提交
27
  pRetrieve->precision = precision;
L
Liu Jicong 已提交
28 29
  pRetrieve->compressed = 0;
  pRetrieve->completed = 1;
30
  pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
L
Liu Jicong 已提交
31

H
Haojun Liao 已提交
32
  int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
L
Liu Jicong 已提交
33 34 35
  actualLen += sizeof(SRetrieveTableRsp);
  taosArrayPush(pRsp->blockDataLen, &actualLen);
  taosArrayPush(pRsp->blockData, &buf);
36 37

  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
38 39
}

40
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp) {
41
  SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pTqReader->pSchemaWrapper);
L
Liu Jicong 已提交
42 43 44
  if (pSW == NULL) {
    return -1;
  }
L
Liu Jicong 已提交
45 46 47 48
  taosArrayPush(pRsp->blockSchema, &pSW);
  return 0;
}

49
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, int32_t n) {
L
Liu Jicong 已提交
50 51
  SMetaReader mr = {0};
  metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
52

53
  // TODO add reference to gurantee success
54
  if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) {
55
    metaReaderClear(&mr);
L
Liu Jicong 已提交
56 57
    return -1;
  }
58

59
  for (int32_t i = 0; i < n; i++) {
60
    char* tbName = taosStrdup(mr.me.name);
61 62
    taosArrayPush(pRsp->blockTbName, &tbName);
  }
L
Liu Jicong 已提交
63 64 65 66
  metaReaderClear(&mr);
  return 0;
}

67
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
68
  const int32_t MAX_ROWS_TO_RETURN = 4096;
69 70 71 72

  int32_t vgId = TD_VID(pTq->pVnode);
  int32_t code = 0;
  int32_t totalRows = 0;
L
Liu Jicong 已提交
73 74

  const STqExecHandle* pExec = &pHandle->execHandle;
75
  qTaskInfo_t          task = pExec->task;
L
Liu Jicong 已提交
76 77

  if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
78
    return -1;
L
Liu Jicong 已提交
79 80 81 82 83
  }

  while (1) {
    SSDataBlock* pDataBlock = NULL;
    uint64_t     ts = 0;
84
    qStreamSetOpen(task);
85

X
Xiaoyu Wang 已提交
86
    tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
wmmhello's avatar
wmmhello 已提交
87 88 89 90
    code = qExecTask(task, &pDataBlock, &ts);
    if (code != TSDB_CODE_SUCCESS) {
      tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, tstrerror(code));
      terrno = code;
L
Liu Jicong 已提交
91
      return -1;
L
Liu Jicong 已提交
92
    }
93

X
Xiaoyu Wang 已提交
94 95
    tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq one task end executed, pDataBlock:%p", pHandle->consumerId, vgId,
            pDataBlock);
96
    // current scan should be stopped asap, since the rebalance occurs.
97 98 99 100
    if (pDataBlock == NULL) {
      break;
    }

101 102 103 104 105 106
    code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
    if (code != TSDB_CODE_SUCCESS) {
      tqError("vgId:%d, failed to add block to rsp msg", vgId);
      return code;
    }

107
    pRsp->blockNum++;
wmmhello's avatar
wmmhello 已提交
108 109
    totalRows += pDataBlock->info.rows;
    if (totalRows >= MAX_ROWS_TO_RETURN) {
wmmhello's avatar
wmmhello 已提交
110
      break;
111 112 113
    }
  }

X
Xiaoyu Wang 已提交
114 115
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d",
          pHandle->consumerId, vgId, pRsp->blockNum, totalRows);
116
  qStreamExtractOffset(task, &pRsp->rspOffset);
L
Liu Jicong 已提交
117 118 119
  return 0;
}

L
Liu Jicong 已提交
120
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) {
L
Liu Jicong 已提交
121
  const STqExecHandle* pExec = &pHandle->execHandle;
122
  qTaskInfo_t          task = pExec->task;
L
Liu Jicong 已提交
123

124
  if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
125
    return -1;
L
Liu Jicong 已提交
126 127
  }

L
Liu Jicong 已提交
128
  int32_t rowCnt = 0;
L
Liu Jicong 已提交
129 130 131
  while (1) {
    SSDataBlock* pDataBlock = NULL;
    uint64_t     ts = 0;
wmmhello's avatar
wmmhello 已提交
132
    tqDebug("tmqsnap task start to execute");
wmmhello's avatar
wmmhello 已提交
133 134 135 136
    int code = qExecTask(task, &pDataBlock, &ts);
    if (code != 0) {
      tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, tstrerror(code));
      terrno = code;
L
Liu Jicong 已提交
137
      return -1;
L
Liu Jicong 已提交
138
    }
139

wmmhello's avatar
wmmhello 已提交
140
    tqDebug("tmqsnap task execute end, get %p", pDataBlock);
L
Liu Jicong 已提交
141

wmmhello's avatar
wmmhello 已提交
142
    if (pDataBlock != NULL && pDataBlock->info.rows > 0) {
L
Liu Jicong 已提交
143
      if (pRsp->withTbName) {
L
Liu Jicong 已提交
144
        if (pOffset->type == TMQ_OFFSET__LOG) {
145
          int64_t uid = pExec->pTqReader->lastBlkUid;
146
          if (tqAddTbNameToRsp(pTq, uid, pRsp, 1) < 0) {
wmmhello's avatar
wmmhello 已提交
147 148
            continue;
          }
L
Liu Jicong 已提交
149
        } else {
150
          char* tbName = taosStrdup(qExtractTbnameFromTask(task));
L
Liu Jicong 已提交
151
          taosArrayPush(pRsp->blockTbName, &tbName);
L
Liu Jicong 已提交
152 153
        }
      }
L
Liu Jicong 已提交
154
      if (pRsp->withSchema) {
wmmhello's avatar
wmmhello 已提交
155
        if (pOffset->type == TMQ_OFFSET__LOG) {
156
          tqAddBlockSchemaToRsp(pExec, pRsp);
L
Liu Jicong 已提交
157
        } else {
wmmhello's avatar
wmmhello 已提交
158 159 160
          SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
          taosArrayPush(pRsp->blockSchema, &pSW);
        }
wmmhello's avatar
wmmhello 已提交
161
      }
wmmhello's avatar
wmmhello 已提交
162

L
Liu Jicong 已提交
163 164
      tqAddBlockDataToRsp(pDataBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pDataBlock->pDataBlock),
                          pTq->pVnode->config.tsdbCfg.precision);
165
      pRsp->blockNum++;
L
Liu Jicong 已提交
166
      if (pOffset->type == TMQ_OFFSET__LOG) {
L
Liu Jicong 已提交
167 168
        continue;
      } else {
L
Liu Jicong 已提交
169
        rowCnt += pDataBlock->info.rows;
L
Liu Jicong 已提交
170
        if (rowCnt <= 4096) continue;
L
Liu Jicong 已提交
171 172 173
      }
    }

174 175 176 177 178 179
    // get meta
    SMqMetaRsp* tmp = qStreamExtractMetaMsg(task);
    if (tmp->metaRspLen > 0) {
      qStreamExtractOffset(task, &tmp->rspOffset);
      *pMetaRsp = *tmp;

180
      tqDebug("tmqsnap task get meta");
181 182 183 184 185 186
      break;
    }

    if (pDataBlock == NULL) {
      qStreamExtractOffset(task, pOffset);
      if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
187 188
        continue;
      }
189 190
      tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
              pHandle->snapshotVer + 1);
191
      qStreamExtractOffset(task, &pRsp->rspOffset);
192 193
      break;
    }
194

195 196
    if (pRsp->blockNum > 0) {
      tqDebug("tmqsnap task exec exited, get data");
197
      qStreamExtractOffset(task, &pRsp->rspOffset);
198
      break;
L
Liu Jicong 已提交
199
    }
L
Liu Jicong 已提交
200 201 202 203 204
  }

  return 0;
}

205
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows) {
L
Liu Jicong 已提交
206
  STqExecHandle* pExec = &pHandle->execHandle;
X
Xiaoyu Wang 已提交
207 208
  SArray*        pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
  SArray*        pSchemas = taosArrayInit(0, sizeof(void*));
209

wmmhello's avatar
wmmhello 已提交
210
  if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
211 212
    STqReader* pReader = pExec->pTqReader;
    tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver);
213
    while (tqNextBlockImpl(pReader, NULL)) {
214 215
      taosArrayClear(pBlocks);
      taosArrayClear(pSchemas);
216
      SSubmitTbData* pSubmitTbDataRet = NULL;
217
      if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
L
Liu Jicong 已提交
218 219
        if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
      }
wmmhello's avatar
wmmhello 已提交
220
      if (pRsp->withTbName) {
221
        int64_t uid = pExec->pTqReader->lastBlkUid;
222
        if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
223
          taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
224
          taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
225 226
          pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
          pSchemas = taosArrayInit(0, sizeof(void*));
wmmhello's avatar
wmmhello 已提交
227 228 229
          continue;
        }
      }
230 231 232 233 234 235 236 237 238 239 240
      if (pHandle->fetchMeta && pSubmitTbDataRet->pCreateTbReq != NULL) {
        if (pRsp->createTableNum == 0) {
          pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
          pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
        }

        int32_t  code = TSDB_CODE_SUCCESS;
        uint32_t len = 0;
        tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code);
        if (TSDB_CODE_SUCCESS != code) {
          continue;
L
Liu Jicong 已提交
241
        }
X
Xiaoyu Wang 已提交
242
        void*    createReq = taosMemoryCalloc(1, len);
243 244 245 246 247 248 249 250 251 252 253 254 255 256
        SEncoder encoder = {0};
        tEncoderInit(&encoder, createReq, len);
        code = tEncodeSVCreateTbReq(&encoder, pSubmitTbDataRet->pCreateTbReq);
        if (code < 0) {
          tEncoderClear(&encoder);
          taosMemoryFree(createReq);
          continue;
        }

        taosArrayPush(pRsp->createTableLen, &len);
        taosArrayPush(pRsp->createTableReq, &createReq);
        pRsp->createTableNum++;

        tEncoderClear(&encoder);
L
Liu Jicong 已提交
257
      }
258 259 260 261
      for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
        SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
        tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
                            pTq->pVnode->config.tsdbCfg.precision);
262
        totalRows += pBlock->info.rows;
263
        blockDataFreeRes(pBlock);
264
        SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
265 266 267
        taosArrayPush(pRsp->blockSchema, &pSW);
        pRsp->blockNum++;
      }
L
Liu Jicong 已提交
268
    }
wmmhello's avatar
wmmhello 已提交
269
  } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
270 271
    STqReader* pReader = pExec->pTqReader;
    tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver);
272
    while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
273 274
      taosArrayClear(pBlocks);
      taosArrayClear(pSchemas);
275
      SSubmitTbData* pSubmitTbDataRet = NULL;
276
      if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
L
Liu Jicong 已提交
277 278
        if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
      }
wmmhello's avatar
wmmhello 已提交
279
      if (pRsp->withTbName) {
280
        int64_t uid = pExec->pTqReader->lastBlkUid;
281
        if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
282
          taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
283
          taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
284 285
          pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
          pSchemas = taosArrayInit(0, sizeof(void*));
L
Liu Jicong 已提交
286
          continue;
wmmhello's avatar
wmmhello 已提交
287 288
        }
      }
289 290 291 292 293 294 295 296 297 298 299
      if (pHandle->fetchMeta && pSubmitTbDataRet->pCreateTbReq != NULL) {
        if (pRsp->createTableNum == 0) {
          pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
          pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
        }

        int32_t  code = TSDB_CODE_SUCCESS;
        uint32_t len = 0;
        tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code);
        if (TSDB_CODE_SUCCESS != code) {
          continue;
L
Liu Jicong 已提交
300
        }
X
Xiaoyu Wang 已提交
301
        void*    createReq = taosMemoryCalloc(1, len);
302 303 304 305 306 307 308
        SEncoder encoder = {0};
        tEncoderInit(&encoder, createReq, len);
        code = tEncodeSVCreateTbReq(&encoder, pSubmitTbDataRet->pCreateTbReq);
        if (code < 0) {
          tEncoderClear(&encoder);
          taosMemoryFree(createReq);
          continue;
L
Liu Jicong 已提交
309
        }
310 311 312 313 314 315

        taosArrayPush(pRsp->createTableLen, &len);
        taosArrayPush(pRsp->createTableReq, &createReq);
        pRsp->createTableNum++;

        tEncoderClear(&encoder);
L
Liu Jicong 已提交
316
      }
317 318 319 320
      for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
        SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
        tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
                            pTq->pVnode->config.tsdbCfg.precision);
321
        *totalRows += pBlock->info.rows;
322
        blockDataFreeRes(pBlock);
323
        SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
324 325 326
        taosArrayPush(pRsp->blockSchema, &pSW);
        pRsp->blockNum++;
      }
L
Liu Jicong 已提交
327
    }
L
Liu Jicong 已提交
328
  }
329 330
  taosArrayDestroy(pBlocks);
  taosArrayDestroy(pSchemas);
wmmhello's avatar
wmmhello 已提交
331
  return 0;
L
Liu Jicong 已提交
332
}