tqExec.c 13.1 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tq.h"

L
Liu Jicong 已提交
18
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
L
Liu Jicong 已提交
19 20 21 22 23 24
  int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
  void*   buf = taosMemoryCalloc(1, dataStrLen);
  if (buf == NULL) return -1;

  SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
  pRetrieve->useconds = 0;
L
Liu Jicong 已提交
25
  pRetrieve->precision = precision;
L
Liu Jicong 已提交
26 27
  pRetrieve->compressed = 0;
  pRetrieve->completed = 1;
28
  pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
L
Liu Jicong 已提交
29

H
Haojun Liao 已提交
30
  int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
L
Liu Jicong 已提交
31 32 33 34 35 36
  actualLen += sizeof(SRetrieveTableRsp);
  taosArrayPush(pRsp->blockDataLen, &actualLen);
  taosArrayPush(pRsp->blockData, &buf);
  return 0;
}

37
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp) {
L
Liu Jicong 已提交
38
  SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader->pSchemaWrapper);
L
Liu Jicong 已提交
39 40 41
  if (pSW == NULL) {
    return -1;
  }
L
Liu Jicong 已提交
42 43 44 45
  taosArrayPush(pRsp->blockSchema, &pSW);
  return 0;
}

46
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, int32_t n) {
L
Liu Jicong 已提交
47 48
  SMetaReader mr = {0};
  metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
49
  // TODO add reference to gurantee success
H
Haojun Liao 已提交
50
  if (metaGetTableEntryByUidCache(&mr, uid) < 0) {
51
    metaReaderClear(&mr);
L
Liu Jicong 已提交
52 53
    return -1;
  }
54
  for (int32_t i = 0; i < n; i++) {
55
    char* tbName = taosStrdup(mr.me.name);
56 57
    taosArrayPush(pRsp->blockTbName, &tbName);
  }
L
Liu Jicong 已提交
58 59 60 61
  metaReaderClear(&mr);
  return 0;
}

62
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
L
Liu Jicong 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
  const STqExecHandle* pExec = &pHandle->execHandle;

  qTaskInfo_t task = pExec->task;

  if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
    tqDebug("prepare scan failed, return");
    if (pOffset->type == TMQ_OFFSET__LOG) {
      pRsp->rspOffset = *pOffset;
      return 0;
    } else {
      tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
      if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
        tqDebug("prepare scan failed, return");
        pRsp->rspOffset = *pOffset;
        return 0;
      }
    }
  }

  int32_t rowCnt = 0;
  while (1) {
    SSDataBlock* pDataBlock = NULL;
    uint64_t     ts = 0;
86
    tqDebug("vgId:%d, tmq task start to execute", pTq->pVnode->config.vgId);
L
Liu Jicong 已提交
87
    if (qExecTask(task, &pDataBlock, &ts) < 0) {
K
kailixu 已提交
88
      tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, terrstr());
L
Liu Jicong 已提交
89
      return -1;
L
Liu Jicong 已提交
90
    }
91
    tqDebug("vgId:%d, tmq task executed, get %p", pTq->pVnode->config.vgId, pDataBlock);
L
Liu Jicong 已提交
92

93 94 95 96
    if (pDataBlock == NULL) {
      break;
    }

L
Liu Jicong 已提交
97
    tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
98 99 100 101 102 103 104 105 106 107 108 109
    pRsp->blockNum++;

    if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
      rowCnt += pDataBlock->info.rows;
      if (rowCnt >= 4096) break;
    }
  }

  if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) {
    return -1;
  }

L
Liu Jicong 已提交
110 111 112 113
  if (pRsp->rspOffset.type == 0) {
    tqError("expected rsp offset: type %d %" PRId64 " %" PRId64 " %" PRId64, pRsp->rspOffset.type, pRsp->rspOffset.ts,
            pRsp->rspOffset.uid, pRsp->rspOffset.version);
    return -1;
L
Liu Jicong 已提交
114
  }
115

wmmhello's avatar
wmmhello 已提交
116 117 118 119
  if(pRsp->withTbName || pRsp->withSchema){
    tqError("get column should not with meta:%d,%d", pRsp->withTbName, pRsp->withSchema);
    return -1;
  }
L
Liu Jicong 已提交
120 121 122
  return 0;
}

L
Liu Jicong 已提交
123
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) {
L
Liu Jicong 已提交
124
  const STqExecHandle* pExec = &pHandle->execHandle;
125
  qTaskInfo_t          task = pExec->task;
L
Liu Jicong 已提交
126

127
  if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
128
    tqDebug("prepare scan failed, return");
L
Liu Jicong 已提交
129 130 131 132
    if (pOffset->type == TMQ_OFFSET__LOG) {
      pRsp->rspOffset = *pOffset;
      return 0;
    } else {
133
      tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
134
      if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
135
        tqDebug("prepare scan failed, return");
L
Liu Jicong 已提交
136 137 138 139
        pRsp->rspOffset = *pOffset;
        return 0;
      }
    }
L
Liu Jicong 已提交
140 141
  }

L
Liu Jicong 已提交
142
  int32_t rowCnt = 0;
L
Liu Jicong 已提交
143 144 145
  while (1) {
    SSDataBlock* pDataBlock = NULL;
    uint64_t     ts = 0;
wmmhello's avatar
wmmhello 已提交
146
    tqDebug("tmqsnap task start to execute");
L
Liu Jicong 已提交
147
    if (qExecTask(task, &pDataBlock, &ts) < 0) {
K
kailixu 已提交
148
      tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, terrstr());
L
Liu Jicong 已提交
149
      return -1;
L
Liu Jicong 已提交
150
    }
wmmhello's avatar
wmmhello 已提交
151
    tqDebug("tmqsnap task execute end, get %p", pDataBlock);
L
Liu Jicong 已提交
152

wmmhello's avatar
wmmhello 已提交
153
    if (pDataBlock != NULL && pDataBlock->info.rows > 0) {
L
Liu Jicong 已提交
154
      if (pRsp->withTbName) {
L
Liu Jicong 已提交
155
        if (pOffset->type == TMQ_OFFSET__LOG) {
L
Liu Jicong 已提交
156
          int64_t uid = pExec->pExecReader->lastBlkUid;
157
          if (tqAddTbNameToRsp(pTq, uid, pRsp, 1) < 0) {
wmmhello's avatar
wmmhello 已提交
158 159
            continue;
          }
L
Liu Jicong 已提交
160
        } else {
161
          char* tbName = taosStrdup(qExtractTbnameFromTask(task));
L
Liu Jicong 已提交
162
          taosArrayPush(pRsp->blockTbName, &tbName);
L
Liu Jicong 已提交
163 164
        }
      }
L
Liu Jicong 已提交
165
      if (pRsp->withSchema) {
wmmhello's avatar
wmmhello 已提交
166
        if (pOffset->type == TMQ_OFFSET__LOG) {
167
          tqAddBlockSchemaToRsp(pExec, pRsp);
L
Liu Jicong 已提交
168
        } else {
wmmhello's avatar
wmmhello 已提交
169 170 171
          SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
          taosArrayPush(pRsp->blockSchema, &pSW);
        }
wmmhello's avatar
wmmhello 已提交
172
      }
wmmhello's avatar
wmmhello 已提交
173

L
Liu Jicong 已提交
174 175
      tqAddBlockDataToRsp(pDataBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pDataBlock->pDataBlock),
                          pTq->pVnode->config.tsdbCfg.precision);
176
      pRsp->blockNum++;
L
Liu Jicong 已提交
177
      if (pOffset->type == TMQ_OFFSET__LOG) {
L
Liu Jicong 已提交
178 179
        continue;
      } else {
L
Liu Jicong 已提交
180
        rowCnt += pDataBlock->info.rows;
L
Liu Jicong 已提交
181
        if (rowCnt <= 4096) continue;
L
Liu Jicong 已提交
182 183 184
      }
    }

185 186
    if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
      if (qStreamExtractPrepareUid(task) != 0) {
187 188
        continue;
      }
189 190 191 192
      tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
              pHandle->snapshotVer + 1);
      break;
    }
193

194 195 196
    if (pRsp->blockNum > 0) {
      tqDebug("tmqsnap task exec exited, get data");
      break;
L
Liu Jicong 已提交
197
    }
198

199 200 201 202 203 204 205 206 207 208 209 210
    SMqMetaRsp* tmp = qStreamExtractMetaMsg(task);
    if (tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
      tqOffsetResetToData(pOffset, tmp->rspOffset.uid, tmp->rspOffset.ts);
      qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
      tmp->rspOffset.type = TMQ_OFFSET__SNAPSHOT_META;
      tqDebug("tmqsnap task exec change to get data");
      continue;
    }

    *pMetaRsp = *tmp;
    tqDebug("tmqsnap task exec exited, get meta");

211
    tqDebug("task exec exited");
L
Liu Jicong 已提交
212 213 214
    break;
  }

L
Liu Jicong 已提交
215 216 217 218 219 220
  qStreamExtractOffset(task, &pRsp->rspOffset);

  if (pRsp->rspOffset.type == 0) {
    tqError("expected rsp offset: type %d %" PRId64 " %" PRId64 " %" PRId64, pRsp->rspOffset.type, pRsp->rspOffset.ts,
            pRsp->rspOffset.uid, pRsp->rspOffset.version);
    return -1;
221 222
  }

L
Liu Jicong 已提交
223 224 225
  return 0;
}

L
Liu Jicong 已提交
226
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp) {
L
Liu Jicong 已提交
227
  STqExecHandle* pExec = &pHandle->execHandle;
L
Liu Jicong 已提交
228
  /*A(pExec->subType != TOPIC_SUB_TYPE__COLUMN);*/
wmmhello's avatar
wmmhello 已提交
229

230
  SArray* pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
231
  SArray* pSchemas = taosArrayInit(0, sizeof(void*));
232

wmmhello's avatar
wmmhello 已提交
233 234
  if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
    STqReader* pReader = pExec->pExecReader;
L
Liu Jicong 已提交
235 236 237
    /*tqReaderSetDataMsg(pReader, pReq, 0);*/
    tqReaderSetSubmitReq2(pReader, submit.msgStr, submit.msgLen, submit.ver);
    while (tqNextDataBlock2(pReader)) {
238 239 240 241 242 243 244
      /*SSDataBlock block = {0};*/
      /*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/
      /*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/
      /*}*/

      taosArrayClear(pBlocks);
      taosArrayClear(pSchemas);
245 246
      SSubmitTbData* pSubmitTbDataRet = NULL;
      if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
L
Liu Jicong 已提交
247 248
        if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
      }
wmmhello's avatar
wmmhello 已提交
249
      if (pRsp->withTbName) {
L
Liu Jicong 已提交
250 251
        /*int64_t uid = pExec->pExecReader->msgIter.uid;*/
        int64_t uid = pExec->pExecReader->lastBlkUid;
252
        if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
253
          taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
254 255 256
          taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper);
          pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
          pSchemas = taosArrayInit(0, sizeof(void*));
wmmhello's avatar
wmmhello 已提交
257 258 259
          continue;
        }
      }
260 261 262 263 264 265 266 267 268 269 270
      if (pHandle->fetchMeta && pSubmitTbDataRet->pCreateTbReq != NULL) {
        if (pRsp->createTableNum == 0) {
          pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
          pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
        }

        int32_t  code = TSDB_CODE_SUCCESS;
        uint32_t len = 0;
        tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code);
        if (TSDB_CODE_SUCCESS != code) {
          continue;
L
Liu Jicong 已提交
271
        }
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
        void* createReq = taosMemoryCalloc(1, len);
        SEncoder encoder = {0};
        tEncoderInit(&encoder, createReq, len);
        code = tEncodeSVCreateTbReq(&encoder, pSubmitTbDataRet->pCreateTbReq);
        if (code < 0) {
          tEncoderClear(&encoder);
          taosMemoryFree(createReq);
          continue;
        }

        taosArrayPush(pRsp->createTableLen, &len);
        taosArrayPush(pRsp->createTableReq, &createReq);
        pRsp->createTableNum++;

        tEncoderClear(&encoder);
L
Liu Jicong 已提交
287
      }
288 289 290 291 292
      for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
        SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
        tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
                            pTq->pVnode->config.tsdbCfg.precision);
        blockDataFreeRes(pBlock);
293
        SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
294 295 296
        taosArrayPush(pRsp->blockSchema, &pSW);
        pRsp->blockNum++;
      }
L
Liu Jicong 已提交
297
    }
wmmhello's avatar
wmmhello 已提交
298 299
  } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
    STqReader* pReader = pExec->pExecReader;
L
Liu Jicong 已提交
300 301 302
    /*tqReaderSetDataMsg(pReader, pReq, 0);*/
    tqReaderSetSubmitReq2(pReader, submit.msgStr, submit.msgLen, submit.ver);
    while (tqNextDataBlockFilterOut2(pReader, pExec->execDb.pFilterOutTbUid)) {
303 304 305 306 307 308
      /*SSDataBlock block = {0};*/
      /*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/
      /*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/
      /*}*/
      taosArrayClear(pBlocks);
      taosArrayClear(pSchemas);
309 310
      SSubmitTbData* pSubmitTbDataRet = NULL;
      if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
L
Liu Jicong 已提交
311 312
        if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
      }
wmmhello's avatar
wmmhello 已提交
313
      if (pRsp->withTbName) {
L
Liu Jicong 已提交
314
        int64_t uid = pExec->pExecReader->lastBlkUid;
315
        if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
316
          taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
317 318 319
          taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper);
          pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
          pSchemas = taosArrayInit(0, sizeof(void*));
L
Liu Jicong 已提交
320
          continue;
wmmhello's avatar
wmmhello 已提交
321 322
        }
      }
323 324 325 326 327 328 329 330 331 332 333
      if (pHandle->fetchMeta && pSubmitTbDataRet->pCreateTbReq != NULL) {
        if (pRsp->createTableNum == 0) {
          pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
          pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
        }

        int32_t  code = TSDB_CODE_SUCCESS;
        uint32_t len = 0;
        tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code);
        if (TSDB_CODE_SUCCESS != code) {
          continue;
L
Liu Jicong 已提交
334
        }
335 336 337 338 339 340 341 342
        void* createReq = taosMemoryCalloc(1, len);
        SEncoder encoder = {0};
        tEncoderInit(&encoder, createReq, len);
        code = tEncodeSVCreateTbReq(&encoder, pSubmitTbDataRet->pCreateTbReq);
        if (code < 0) {
          tEncoderClear(&encoder);
          taosMemoryFree(createReq);
          continue;
L
Liu Jicong 已提交
343
        }
344 345 346 347 348 349

        taosArrayPush(pRsp->createTableLen, &len);
        taosArrayPush(pRsp->createTableReq, &createReq);
        pRsp->createTableNum++;

        tEncoderClear(&encoder);
L
Liu Jicong 已提交
350
      }
351 352 353 354 355 356 357 358 359 360
      /*tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock),*/
      /*pTq->pVnode->config.tsdbCfg.precision);*/
      /*blockDataFreeRes(&block);*/
      /*tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);*/
      /*pRsp->blockNum++;*/
      for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
        SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
        tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
                            pTq->pVnode->config.tsdbCfg.precision);
        blockDataFreeRes(pBlock);
361
        SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
362 363 364
        taosArrayPush(pRsp->blockSchema, &pSW);
        pRsp->blockNum++;
      }
L
Liu Jicong 已提交
365
    }
L
Liu Jicong 已提交
366
  }
367 368
  taosArrayDestroy(pBlocks);
  taosArrayDestroy(pSchemas);
wmmhello's avatar
wmmhello 已提交
369 370 371
//  if (pRsp->blockNum == 0) {
//    return -1;
//  }
wmmhello's avatar
wmmhello 已提交
372 373

  return 0;
L
Liu Jicong 已提交
374
}