tqScan.c 11.4 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tq.h"

L
Liu Jicong 已提交
18
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
L
Liu Jicong 已提交
19 20
  int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
  void*   buf = taosMemoryCalloc(1, dataStrLen);
21 22 23
  if (buf == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
L
Liu Jicong 已提交
24 25 26

  SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
  pRetrieve->useconds = 0;
L
Liu Jicong 已提交
27
  pRetrieve->precision = precision;
L
Liu Jicong 已提交
28 29
  pRetrieve->compressed = 0;
  pRetrieve->completed = 1;
30
  pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
L
Liu Jicong 已提交
31

H
Haojun Liao 已提交
32
  int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
L
Liu Jicong 已提交
33 34 35
  actualLen += sizeof(SRetrieveTableRsp);
  taosArrayPush(pRsp->blockDataLen, &actualLen);
  taosArrayPush(pRsp->blockData, &buf);
36 37

  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
38 39
}

40
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp) {
41
  SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pTqReader->pSchemaWrapper);
L
Liu Jicong 已提交
42 43 44
  if (pSW == NULL) {
    return -1;
  }
L
Liu Jicong 已提交
45 46 47 48
  taosArrayPush(pRsp->blockSchema, &pSW);
  return 0;
}

49
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, int32_t n) {
L
Liu Jicong 已提交
50 51
  SMetaReader mr = {0};
  metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
52

53
  // TODO add reference to gurantee success
H
Haojun Liao 已提交
54
  if (metaGetTableEntryByUidCache(&mr, uid) < 0) {
55
    metaReaderClear(&mr);
L
Liu Jicong 已提交
56 57
    return -1;
  }
58

59
  for (int32_t i = 0; i < n; i++) {
60
    char* tbName = taosStrdup(mr.me.name);
61 62
    taosArrayPush(pRsp->blockTbName, &tbName);
  }
L
Liu Jicong 已提交
63 64 65 66
  metaReaderClear(&mr);
  return 0;
}

67
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
68
  const int32_t MAX_ROWS_TO_RETURN = 4096;
69 70 71
  int32_t       vgId = TD_VID(pTq->pVnode);
  int32_t       code = 0;
  int32_t       totalRows = 0;
L
Liu Jicong 已提交
72 73

  const STqExecHandle* pExec = &pHandle->execHandle;
74
  qTaskInfo_t          task = pExec->task;
L
Liu Jicong 已提交
75 76

  if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
77
    return -1;
L
Liu Jicong 已提交
78 79 80 81 82
  }

  while (1) {
    SSDataBlock* pDataBlock = NULL;
    uint64_t     ts = 0;
83
    qStreamSetOpen(task);
X
Xiaoyu Wang 已提交
84
    tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
wmmhello's avatar
wmmhello 已提交
85
    if (qExecTask(task, &pDataBlock, &ts) != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
86
      tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, terrstr());
L
Liu Jicong 已提交
87
      return -1;
L
Liu Jicong 已提交
88
    }
89

X
Xiaoyu Wang 已提交
90 91
    tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq one task end executed, pDataBlock:%p", pHandle->consumerId, vgId,
            pDataBlock);
92
    // current scan should be stopped asap, since the rebalance occurs.
93 94 95 96
    if (pDataBlock == NULL) {
      break;
    }

97 98 99 100 101 102
    code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
    if (code != TSDB_CODE_SUCCESS) {
      tqError("vgId:%d, failed to add block to rsp msg", vgId);
      return code;
    }

103
    pRsp->blockNum++;
wmmhello's avatar
wmmhello 已提交
104 105
    totalRows += pDataBlock->info.rows;
    if (totalRows >= MAX_ROWS_TO_RETURN) {
wmmhello's avatar
wmmhello 已提交
106
      break;
107 108 109
    }
  }

X
Xiaoyu Wang 已提交
110 111
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d",
          pHandle->consumerId, vgId, pRsp->blockNum, totalRows);
112
  qStreamExtractOffset(task, &pRsp->rspOffset);
L
Liu Jicong 已提交
113 114 115
  return 0;
}

L
Liu Jicong 已提交
116
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) {
L
Liu Jicong 已提交
117
  const STqExecHandle* pExec = &pHandle->execHandle;
118
  qTaskInfo_t          task = pExec->task;
L
Liu Jicong 已提交
119

120
  if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
121
    return -1;
L
Liu Jicong 已提交
122 123
  }

L
Liu Jicong 已提交
124
  int32_t rowCnt = 0;
L
Liu Jicong 已提交
125 126 127
  while (1) {
    SSDataBlock* pDataBlock = NULL;
    uint64_t     ts = 0;
wmmhello's avatar
wmmhello 已提交
128
    tqDebug("tmqsnap task start to execute");
L
Liu Jicong 已提交
129
    if (qExecTask(task, &pDataBlock, &ts) < 0) {
K
kailixu 已提交
130
      tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, terrstr());
L
Liu Jicong 已提交
131
      return -1;
L
Liu Jicong 已提交
132
    }
wmmhello's avatar
wmmhello 已提交
133
    tqDebug("tmqsnap task execute end, get %p", pDataBlock);
L
Liu Jicong 已提交
134

wmmhello's avatar
wmmhello 已提交
135
    if (pDataBlock != NULL && pDataBlock->info.rows > 0) {
L
Liu Jicong 已提交
136
      if (pRsp->withTbName) {
L
Liu Jicong 已提交
137
        if (pOffset->type == TMQ_OFFSET__LOG) {
138
          int64_t uid = pExec->pTqReader->lastBlkUid;
139
          if (tqAddTbNameToRsp(pTq, uid, pRsp, 1) < 0) {
wmmhello's avatar
wmmhello 已提交
140 141
            continue;
          }
L
Liu Jicong 已提交
142
        } else {
143
          char* tbName = taosStrdup(qExtractTbnameFromTask(task));
L
Liu Jicong 已提交
144
          taosArrayPush(pRsp->blockTbName, &tbName);
L
Liu Jicong 已提交
145 146
        }
      }
L
Liu Jicong 已提交
147
      if (pRsp->withSchema) {
wmmhello's avatar
wmmhello 已提交
148
        if (pOffset->type == TMQ_OFFSET__LOG) {
149
          tqAddBlockSchemaToRsp(pExec, pRsp);
L
Liu Jicong 已提交
150
        } else {
wmmhello's avatar
wmmhello 已提交
151 152 153
          SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
          taosArrayPush(pRsp->blockSchema, &pSW);
        }
wmmhello's avatar
wmmhello 已提交
154
      }
wmmhello's avatar
wmmhello 已提交
155

L
Liu Jicong 已提交
156 157
      tqAddBlockDataToRsp(pDataBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pDataBlock->pDataBlock),
                          pTq->pVnode->config.tsdbCfg.precision);
158
      pRsp->blockNum++;
L
Liu Jicong 已提交
159
      if (pOffset->type == TMQ_OFFSET__LOG) {
L
Liu Jicong 已提交
160 161
        continue;
      } else {
L
Liu Jicong 已提交
162
        rowCnt += pDataBlock->info.rows;
L
Liu Jicong 已提交
163
        if (rowCnt <= 4096) continue;
L
Liu Jicong 已提交
164 165 166
      }
    }

167 168 169 170 171 172
    // get meta
    SMqMetaRsp* tmp = qStreamExtractMetaMsg(task);
    if (tmp->metaRspLen > 0) {
      qStreamExtractOffset(task, &tmp->rspOffset);
      *pMetaRsp = *tmp;

173
      tqDebug("tmqsnap task get meta");
174 175 176 177 178 179
      break;
    }

    if (pDataBlock == NULL) {
      qStreamExtractOffset(task, pOffset);
      if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
180 181
        continue;
      }
182 183
      tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
              pHandle->snapshotVer + 1);
184
      qStreamExtractOffset(task, &pRsp->rspOffset);
185 186
      break;
    }
187

188 189
    if (pRsp->blockNum > 0) {
      tqDebug("tmqsnap task exec exited, get data");
190
      qStreamExtractOffset(task, &pRsp->rspOffset);
191
      break;
L
Liu Jicong 已提交
192
    }
L
Liu Jicong 已提交
193 194 195 196 197
  }

  return 0;
}

198
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows) {
L
Liu Jicong 已提交
199
  STqExecHandle* pExec = &pHandle->execHandle;
X
Xiaoyu Wang 已提交
200 201
  SArray*        pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
  SArray*        pSchemas = taosArrayInit(0, sizeof(void*));
202

wmmhello's avatar
wmmhello 已提交
203
  if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
204 205 206
    STqReader* pReader = pExec->pTqReader;
    tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver);
    while (tqNextDataBlock(pReader)) {
207 208
      taosArrayClear(pBlocks);
      taosArrayClear(pSchemas);
209 210
      SSubmitTbData* pSubmitTbDataRet = NULL;
      if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
L
Liu Jicong 已提交
211 212
        if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
      }
wmmhello's avatar
wmmhello 已提交
213
      if (pRsp->withTbName) {
214
        int64_t uid = pExec->pTqReader->lastBlkUid;
215
        if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
216
          taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
217 218 219
          taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper);
          pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
          pSchemas = taosArrayInit(0, sizeof(void*));
wmmhello's avatar
wmmhello 已提交
220 221 222
          continue;
        }
      }
223 224 225 226 227 228 229 230 231 232 233
      if (pHandle->fetchMeta && pSubmitTbDataRet->pCreateTbReq != NULL) {
        if (pRsp->createTableNum == 0) {
          pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
          pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
        }

        int32_t  code = TSDB_CODE_SUCCESS;
        uint32_t len = 0;
        tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code);
        if (TSDB_CODE_SUCCESS != code) {
          continue;
L
Liu Jicong 已提交
234
        }
X
Xiaoyu Wang 已提交
235
        void*    createReq = taosMemoryCalloc(1, len);
236 237 238 239 240 241 242 243 244 245 246 247 248 249
        SEncoder encoder = {0};
        tEncoderInit(&encoder, createReq, len);
        code = tEncodeSVCreateTbReq(&encoder, pSubmitTbDataRet->pCreateTbReq);
        if (code < 0) {
          tEncoderClear(&encoder);
          taosMemoryFree(createReq);
          continue;
        }

        taosArrayPush(pRsp->createTableLen, &len);
        taosArrayPush(pRsp->createTableReq, &createReq);
        pRsp->createTableNum++;

        tEncoderClear(&encoder);
L
Liu Jicong 已提交
250
      }
251 252 253 254
      for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
        SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
        tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
                            pTq->pVnode->config.tsdbCfg.precision);
255
        totalRows += pBlock->info.rows;
256
        blockDataFreeRes(pBlock);
257
        SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
258 259 260
        taosArrayPush(pRsp->blockSchema, &pSW);
        pRsp->blockNum++;
      }
L
Liu Jicong 已提交
261
    }
wmmhello's avatar
wmmhello 已提交
262
  } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
263 264
    STqReader* pReader = pExec->pTqReader;
    tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver);
L
Liu Jicong 已提交
265
    while (tqNextDataBlockFilterOut2(pReader, pExec->execDb.pFilterOutTbUid)) {
266 267
      taosArrayClear(pBlocks);
      taosArrayClear(pSchemas);
268 269
      SSubmitTbData* pSubmitTbDataRet = NULL;
      if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
L
Liu Jicong 已提交
270 271
        if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
      }
wmmhello's avatar
wmmhello 已提交
272
      if (pRsp->withTbName) {
273
        int64_t uid = pExec->pTqReader->lastBlkUid;
274
        if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
275
          taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
276 277 278
          taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper);
          pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
          pSchemas = taosArrayInit(0, sizeof(void*));
L
Liu Jicong 已提交
279
          continue;
wmmhello's avatar
wmmhello 已提交
280 281
        }
      }
282 283 284 285 286 287 288 289 290 291 292
      if (pHandle->fetchMeta && pSubmitTbDataRet->pCreateTbReq != NULL) {
        if (pRsp->createTableNum == 0) {
          pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
          pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
        }

        int32_t  code = TSDB_CODE_SUCCESS;
        uint32_t len = 0;
        tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code);
        if (TSDB_CODE_SUCCESS != code) {
          continue;
L
Liu Jicong 已提交
293
        }
X
Xiaoyu Wang 已提交
294
        void*    createReq = taosMemoryCalloc(1, len);
295 296 297 298 299 300 301
        SEncoder encoder = {0};
        tEncoderInit(&encoder, createReq, len);
        code = tEncodeSVCreateTbReq(&encoder, pSubmitTbDataRet->pCreateTbReq);
        if (code < 0) {
          tEncoderClear(&encoder);
          taosMemoryFree(createReq);
          continue;
L
Liu Jicong 已提交
302
        }
303 304 305 306 307 308

        taosArrayPush(pRsp->createTableLen, &len);
        taosArrayPush(pRsp->createTableReq, &createReq);
        pRsp->createTableNum++;

        tEncoderClear(&encoder);
L
Liu Jicong 已提交
309
      }
310 311 312 313
      for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
        SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
        tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
                            pTq->pVnode->config.tsdbCfg.precision);
314
        *totalRows += pBlock->info.rows;
315
        blockDataFreeRes(pBlock);
316
        SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
317 318 319
        taosArrayPush(pRsp->blockSchema, &pSW);
        pRsp->blockNum++;
      }
L
Liu Jicong 已提交
320
    }
L
Liu Jicong 已提交
321
  }
322 323
  taosArrayDestroy(pBlocks);
  taosArrayDestroy(pSchemas);
wmmhello's avatar
wmmhello 已提交
324
  return 0;
L
Liu Jicong 已提交
325
}