tqScan.c 11.4 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tq.h"

L
Liu Jicong 已提交
18
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
L
Liu Jicong 已提交
19 20
  int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
  void*   buf = taosMemoryCalloc(1, dataStrLen);
21 22 23
  if (buf == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
L
Liu Jicong 已提交
24 25 26

  SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
  pRetrieve->useconds = 0;
L
Liu Jicong 已提交
27
  pRetrieve->precision = precision;
L
Liu Jicong 已提交
28 29
  pRetrieve->compressed = 0;
  pRetrieve->completed = 1;
30
  pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
L
Liu Jicong 已提交
31

H
Haojun Liao 已提交
32
  int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
L
Liu Jicong 已提交
33 34 35
  actualLen += sizeof(SRetrieveTableRsp);
  taosArrayPush(pRsp->blockDataLen, &actualLen);
  taosArrayPush(pRsp->blockData, &buf);
36 37

  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
38 39
}

40
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp) {
41
  SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pTqReader->pSchemaWrapper);
L
Liu Jicong 已提交
42 43 44
  if (pSW == NULL) {
    return -1;
  }
L
Liu Jicong 已提交
45 46 47 48
  taosArrayPush(pRsp->blockSchema, &pSW);
  return 0;
}

49
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, int32_t n) {
L
Liu Jicong 已提交
50 51
  SMetaReader mr = {0};
  metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
52

53
  // TODO add reference to gurantee success
H
Haojun Liao 已提交
54
  if (metaGetTableEntryByUidCache(&mr, uid) < 0) {
55
    metaReaderClear(&mr);
L
Liu Jicong 已提交
56 57
    return -1;
  }
58

59
  for (int32_t i = 0; i < n; i++) {
60
    char* tbName = taosStrdup(mr.me.name);
61 62
    taosArrayPush(pRsp->blockTbName, &tbName);
  }
L
Liu Jicong 已提交
63 64 65 66
  metaReaderClear(&mr);
  return 0;
}

67
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
68
  const int32_t MAX_ROWS_TO_RETURN = 4096;
69 70 71 72

  int32_t vgId = TD_VID(pTq->pVnode);
  int32_t code = 0;
  int32_t totalRows = 0;
L
Liu Jicong 已提交
73 74

  const STqExecHandle* pExec = &pHandle->execHandle;
75
  qTaskInfo_t          task = pExec->task;
L
Liu Jicong 已提交
76 77

  if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
78
    return -1;
L
Liu Jicong 已提交
79 80 81 82 83
  }

  while (1) {
    SSDataBlock* pDataBlock = NULL;
    uint64_t     ts = 0;
84
    qStreamSetOpen(task);
85

X
Xiaoyu Wang 已提交
86
    tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
wmmhello's avatar
wmmhello 已提交
87
    if (qExecTask(task, &pDataBlock, &ts) != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
88
      tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, terrstr());
L
Liu Jicong 已提交
89
      return -1;
L
Liu Jicong 已提交
90
    }
91

X
Xiaoyu Wang 已提交
92 93
    tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq one task end executed, pDataBlock:%p", pHandle->consumerId, vgId,
            pDataBlock);
94
    // current scan should be stopped asap, since the rebalance occurs.
95 96 97 98
    if (pDataBlock == NULL) {
      break;
    }

99 100 101 102 103 104
    code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
    if (code != TSDB_CODE_SUCCESS) {
      tqError("vgId:%d, failed to add block to rsp msg", vgId);
      return code;
    }

105
    pRsp->blockNum++;
wmmhello's avatar
wmmhello 已提交
106 107
    totalRows += pDataBlock->info.rows;
    if (totalRows >= MAX_ROWS_TO_RETURN) {
wmmhello's avatar
wmmhello 已提交
108
      break;
109 110 111
    }
  }

X
Xiaoyu Wang 已提交
112 113
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d",
          pHandle->consumerId, vgId, pRsp->blockNum, totalRows);
114
  qStreamExtractOffset(task, &pRsp->rspOffset);
L
Liu Jicong 已提交
115 116 117
  return 0;
}

L
Liu Jicong 已提交
118
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) {
L
Liu Jicong 已提交
119
  const STqExecHandle* pExec = &pHandle->execHandle;
120
  qTaskInfo_t          task = pExec->task;
L
Liu Jicong 已提交
121

122
  if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
123
    return -1;
L
Liu Jicong 已提交
124 125
  }

L
Liu Jicong 已提交
126
  int32_t rowCnt = 0;
L
Liu Jicong 已提交
127 128 129
  while (1) {
    SSDataBlock* pDataBlock = NULL;
    uint64_t     ts = 0;
wmmhello's avatar
wmmhello 已提交
130
    tqDebug("tmqsnap task start to execute");
L
Liu Jicong 已提交
131
    if (qExecTask(task, &pDataBlock, &ts) < 0) {
K
kailixu 已提交
132
      tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, terrstr());
L
Liu Jicong 已提交
133
      return -1;
L
Liu Jicong 已提交
134
    }
135

wmmhello's avatar
wmmhello 已提交
136
    tqDebug("tmqsnap task execute end, get %p", pDataBlock);
L
Liu Jicong 已提交
137

wmmhello's avatar
wmmhello 已提交
138
    if (pDataBlock != NULL && pDataBlock->info.rows > 0) {
L
Liu Jicong 已提交
139
      if (pRsp->withTbName) {
L
Liu Jicong 已提交
140
        if (pOffset->type == TMQ_OFFSET__LOG) {
141
          int64_t uid = pExec->pTqReader->lastBlkUid;
142
          if (tqAddTbNameToRsp(pTq, uid, pRsp, 1) < 0) {
wmmhello's avatar
wmmhello 已提交
143 144
            continue;
          }
L
Liu Jicong 已提交
145
        } else {
146
          char* tbName = taosStrdup(qExtractTbnameFromTask(task));
L
Liu Jicong 已提交
147
          taosArrayPush(pRsp->blockTbName, &tbName);
L
Liu Jicong 已提交
148 149
        }
      }
L
Liu Jicong 已提交
150
      if (pRsp->withSchema) {
wmmhello's avatar
wmmhello 已提交
151
        if (pOffset->type == TMQ_OFFSET__LOG) {
152
          tqAddBlockSchemaToRsp(pExec, pRsp);
L
Liu Jicong 已提交
153
        } else {
wmmhello's avatar
wmmhello 已提交
154 155 156
          SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
          taosArrayPush(pRsp->blockSchema, &pSW);
        }
wmmhello's avatar
wmmhello 已提交
157
      }
wmmhello's avatar
wmmhello 已提交
158

L
Liu Jicong 已提交
159 160
      tqAddBlockDataToRsp(pDataBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pDataBlock->pDataBlock),
                          pTq->pVnode->config.tsdbCfg.precision);
161
      pRsp->blockNum++;
L
Liu Jicong 已提交
162
      if (pOffset->type == TMQ_OFFSET__LOG) {
L
Liu Jicong 已提交
163 164
        continue;
      } else {
L
Liu Jicong 已提交
165
        rowCnt += pDataBlock->info.rows;
L
Liu Jicong 已提交
166
        if (rowCnt <= 4096) continue;
L
Liu Jicong 已提交
167 168 169
      }
    }

170 171 172 173 174 175
    // get meta
    SMqMetaRsp* tmp = qStreamExtractMetaMsg(task);
    if (tmp->metaRspLen > 0) {
      qStreamExtractOffset(task, &tmp->rspOffset);
      *pMetaRsp = *tmp;

176
      tqDebug("tmqsnap task get meta");
177 178 179 180 181 182
      break;
    }

    if (pDataBlock == NULL) {
      qStreamExtractOffset(task, pOffset);
      if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
183 184
        continue;
      }
185 186
      tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
              pHandle->snapshotVer + 1);
187
      qStreamExtractOffset(task, &pRsp->rspOffset);
188 189
      break;
    }
190

191 192
    if (pRsp->blockNum > 0) {
      tqDebug("tmqsnap task exec exited, get data");
193
      qStreamExtractOffset(task, &pRsp->rspOffset);
194
      break;
L
Liu Jicong 已提交
195
    }
L
Liu Jicong 已提交
196 197 198 199 200
  }

  return 0;
}

201
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows) {
L
Liu Jicong 已提交
202
  STqExecHandle* pExec = &pHandle->execHandle;
X
Xiaoyu Wang 已提交
203 204
  SArray*        pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
  SArray*        pSchemas = taosArrayInit(0, sizeof(void*));
205

wmmhello's avatar
wmmhello 已提交
206
  if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
207 208
    STqReader* pReader = pExec->pTqReader;
    tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver);
209
    while (tqNextBlockImpl(pReader)) {
210 211
      taosArrayClear(pBlocks);
      taosArrayClear(pSchemas);
212
      SSubmitTbData* pSubmitTbDataRet = NULL;
213
      if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
L
Liu Jicong 已提交
214 215
        if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
      }
wmmhello's avatar
wmmhello 已提交
216
      if (pRsp->withTbName) {
217
        int64_t uid = pExec->pTqReader->lastBlkUid;
218
        if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
219
          taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
220
          taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
221 222
          pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
          pSchemas = taosArrayInit(0, sizeof(void*));
wmmhello's avatar
wmmhello 已提交
223 224 225
          continue;
        }
      }
226 227 228 229 230 231 232 233 234 235 236
      if (pHandle->fetchMeta && pSubmitTbDataRet->pCreateTbReq != NULL) {
        if (pRsp->createTableNum == 0) {
          pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
          pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
        }

        int32_t  code = TSDB_CODE_SUCCESS;
        uint32_t len = 0;
        tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code);
        if (TSDB_CODE_SUCCESS != code) {
          continue;
L
Liu Jicong 已提交
237
        }
X
Xiaoyu Wang 已提交
238
        void*    createReq = taosMemoryCalloc(1, len);
239 240 241 242 243 244 245 246 247 248 249 250 251 252
        SEncoder encoder = {0};
        tEncoderInit(&encoder, createReq, len);
        code = tEncodeSVCreateTbReq(&encoder, pSubmitTbDataRet->pCreateTbReq);
        if (code < 0) {
          tEncoderClear(&encoder);
          taosMemoryFree(createReq);
          continue;
        }

        taosArrayPush(pRsp->createTableLen, &len);
        taosArrayPush(pRsp->createTableReq, &createReq);
        pRsp->createTableNum++;

        tEncoderClear(&encoder);
L
Liu Jicong 已提交
253
      }
254 255 256 257
      for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
        SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
        tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
                            pTq->pVnode->config.tsdbCfg.precision);
258
        totalRows += pBlock->info.rows;
259
        blockDataFreeRes(pBlock);
260
        SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
261 262 263
        taosArrayPush(pRsp->blockSchema, &pSW);
        pRsp->blockNum++;
      }
L
Liu Jicong 已提交
264
    }
wmmhello's avatar
wmmhello 已提交
265
  } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
266 267
    STqReader* pReader = pExec->pTqReader;
    tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver);
268
    while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
269 270
      taosArrayClear(pBlocks);
      taosArrayClear(pSchemas);
271
      SSubmitTbData* pSubmitTbDataRet = NULL;
272
      if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
L
Liu Jicong 已提交
273 274
        if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
      }
wmmhello's avatar
wmmhello 已提交
275
      if (pRsp->withTbName) {
276
        int64_t uid = pExec->pTqReader->lastBlkUid;
277
        if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
278
          taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
279
          taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
280 281
          pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
          pSchemas = taosArrayInit(0, sizeof(void*));
L
Liu Jicong 已提交
282
          continue;
wmmhello's avatar
wmmhello 已提交
283 284
        }
      }
285 286 287 288 289 290 291 292 293 294 295
      if (pHandle->fetchMeta && pSubmitTbDataRet->pCreateTbReq != NULL) {
        if (pRsp->createTableNum == 0) {
          pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
          pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
        }

        int32_t  code = TSDB_CODE_SUCCESS;
        uint32_t len = 0;
        tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code);
        if (TSDB_CODE_SUCCESS != code) {
          continue;
L
Liu Jicong 已提交
296
        }
X
Xiaoyu Wang 已提交
297
        void*    createReq = taosMemoryCalloc(1, len);
298 299 300 301 302 303 304
        SEncoder encoder = {0};
        tEncoderInit(&encoder, createReq, len);
        code = tEncodeSVCreateTbReq(&encoder, pSubmitTbDataRet->pCreateTbReq);
        if (code < 0) {
          tEncoderClear(&encoder);
          taosMemoryFree(createReq);
          continue;
L
Liu Jicong 已提交
305
        }
306 307 308 309 310 311

        taosArrayPush(pRsp->createTableLen, &len);
        taosArrayPush(pRsp->createTableReq, &createReq);
        pRsp->createTableNum++;

        tEncoderClear(&encoder);
L
Liu Jicong 已提交
312
      }
313 314 315 316
      for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
        SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
        tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
                            pTq->pVnode->config.tsdbCfg.precision);
317
        *totalRows += pBlock->info.rows;
318
        blockDataFreeRes(pBlock);
319
        SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
320 321 322
        taosArrayPush(pRsp->blockSchema, &pSW);
        pRsp->blockNum++;
      }
L
Liu Jicong 已提交
323
    }
L
Liu Jicong 已提交
324
  }
325 326
  taosArrayDestroy(pBlocks);
  taosArrayDestroy(pSchemas);
wmmhello's avatar
wmmhello 已提交
327
  return 0;
L
Liu Jicong 已提交
328
}