tqExec.c 9.0 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tq.h"

18
static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols) {
L
Liu Jicong 已提交
19 20 21 22 23 24 25 26 27 28 29 30 31
  int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
  void*   buf = taosMemoryCalloc(1, dataStrLen);
  if (buf == NULL) return -1;

  SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
  pRetrieve->useconds = 0;
  pRetrieve->precision = TSDB_DEFAULT_PRECISION;
  pRetrieve->compressed = 0;
  pRetrieve->completed = 1;
  pRetrieve->numOfRows = htonl(pBlock->info.rows);

  // TODO enable compress
  int32_t actualLen = 0;
32
  blockEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false);
L
Liu Jicong 已提交
33 34 35 36 37 38 39
  actualLen += sizeof(SRetrieveTableRsp);
  ASSERT(actualLen <= dataStrLen);
  taosArrayPush(pRsp->blockDataLen, &actualLen);
  taosArrayPush(pRsp->blockData, &buf);
  return 0;
}

L
Liu Jicong 已提交
40 41
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRsp) {
  SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader->pSchemaWrapper);
L
Liu Jicong 已提交
42 43 44
  if (pSW == NULL) {
    return -1;
  }
L
Liu Jicong 已提交
45 46 47 48
  taosArrayPush(pRsp->blockSchema, &pSW);
  return 0;
}

L
Liu Jicong 已提交
49
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
L
Liu Jicong 已提交
50 51
  SMetaReader mr = {0};
  metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
52
  // TODO add reference to gurantee success
L
Liu Jicong 已提交
53
  if (metaGetTableEntryByUid(&mr, uid) < 0) {
54
    metaReaderClear(&mr);
L
Liu Jicong 已提交
55 56 57 58 59 60 61 62
    return -1;
  }
  char* tbName = strdup(mr.me.name);
  taosArrayPush(pRsp->blockTbName, &tbName);
  metaReaderClear(&mr);
  return 0;
}

L
Liu Jicong 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
int64_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
  const STqExecHandle* pExec = &pHandle->execHandle;
  ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);

  qTaskInfo_t task = pExec->task;

  if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
    tqDebug("prepare scan failed, return");
    if (pOffset->type == TMQ_OFFSET__LOG) {
      pRsp->rspOffset = *pOffset;
      return 0;
    } else {
      tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
      if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
        tqDebug("prepare scan failed, return");
        pRsp->rspOffset = *pOffset;
        return 0;
      }
    }
  }

  int32_t rowCnt = 0;
  while (1) {
    SSDataBlock* pDataBlock = NULL;
    uint64_t     ts = 0;
L
Liu Jicong 已提交
88
    tqDebug("tmq task start to execute");
L
Liu Jicong 已提交
89 90 91
    if (qExecTask(task, &pDataBlock, &ts) < 0) {
      ASSERT(0);
    }
L
Liu Jicong 已提交
92
    tqDebug("tmq task execute end, get %p", pDataBlock);
L
Liu Jicong 已提交
93 94 95 96 97 98 99 100 101 102

    if (pDataBlock) {
      tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols);
      pRsp->blockNum++;
    }
  }

  return 0;
}

wmmhello's avatar
wmmhello 已提交
103
int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) {
L
Liu Jicong 已提交
104
  const STqExecHandle* pExec = &pHandle->execHandle;
105
  qTaskInfo_t          task = pExec->task;
L
Liu Jicong 已提交
106

107
  if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
108
    tqDebug("prepare scan failed, return");
L
Liu Jicong 已提交
109 110 111 112
    if (pOffset->type == TMQ_OFFSET__LOG) {
      pRsp->rspOffset = *pOffset;
      return 0;
    } else {
113
      tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
114
      if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
115
        tqDebug("prepare scan failed, return");
L
Liu Jicong 已提交
116 117 118 119
        pRsp->rspOffset = *pOffset;
        return 0;
      }
    }
L
Liu Jicong 已提交
120 121
  }

L
Liu Jicong 已提交
122
  int32_t rowCnt = 0;
L
Liu Jicong 已提交
123 124 125
  while (1) {
    SSDataBlock* pDataBlock = NULL;
    uint64_t     ts = 0;
wmmhello's avatar
wmmhello 已提交
126
    tqDebug("tmqsnap task start to execute");
L
Liu Jicong 已提交
127 128 129
    if (qExecTask(task, &pDataBlock, &ts) < 0) {
      ASSERT(0);
    }
wmmhello's avatar
wmmhello 已提交
130
    tqDebug("tmqsnap task execute end, get %p", pDataBlock);
L
Liu Jicong 已提交
131 132 133

    if (pDataBlock != NULL) {
      if (pRsp->withTbName) {
wmmhello's avatar
wmmhello 已提交
134
        int64_t uid = 0;
L
Liu Jicong 已提交
135
        if (pOffset->type == TMQ_OFFSET__LOG) {
wmmhello's avatar
wmmhello 已提交
136
          uid = pExec->pExecReader->msgIter.uid;
wmmhello's avatar
wmmhello 已提交
137 138 139
          if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
            continue;
          }
L
Liu Jicong 已提交
140 141 142
        } else {
          char* tbName = strdup(qExtractTbnameFromTask(task));
          taosArrayPush(pRsp->blockTbName, &tbName);
L
Liu Jicong 已提交
143 144
        }
      }
L
Liu Jicong 已提交
145
      if (pRsp->withSchema) {
wmmhello's avatar
wmmhello 已提交
146 147
        if (pOffset->type == TMQ_OFFSET__LOG) {
          tqAddBlockSchemaToRsp(pExec, pRsp);
L
Liu Jicong 已提交
148
        } else {
wmmhello's avatar
wmmhello 已提交
149 150 151
          SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
          taosArrayPush(pRsp->blockSchema, &pSW);
        }
wmmhello's avatar
wmmhello 已提交
152
      }
wmmhello's avatar
wmmhello 已提交
153

L
Liu Jicong 已提交
154
      if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
wmmhello's avatar
wmmhello 已提交
155
        tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols);
L
Liu Jicong 已提交
156
      } else {
wmmhello's avatar
wmmhello 已提交
157 158
        tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock));
      }
159
      pRsp->blockNum++;
L
Liu Jicong 已提交
160
      if (pOffset->type == TMQ_OFFSET__LOG) {
L
Liu Jicong 已提交
161 162
        continue;
      } else {
L
Liu Jicong 已提交
163
        rowCnt += pDataBlock->info.rows;
L
Liu Jicong 已提交
164
        if (rowCnt <= 4096) continue;
L
Liu Jicong 已提交
165 166 167
      }
    }

L
Liu Jicong 已提交
168 169 170
    if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) {
      if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
        if (qStreamExtractPrepareUid(task) != 0) {
171 172 173 174 175 176
          continue;
        }
        tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
                pHandle->snapshotVer + 1);
        break;
      }
177

L
Liu Jicong 已提交
178
      if (pRsp->blockNum > 0) {
179 180 181
        tqDebug("tmqsnap task exec exited, get data");
        break;
      }
L
Liu Jicong 已提交
182

183
      SMqMetaRsp* tmp = qStreamExtractMetaMsg(task);
L
Liu Jicong 已提交
184
      if (tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
185 186 187 188 189 190 191 192 193
        tqOffsetResetToData(pOffset, tmp->rspOffset.uid, tmp->rspOffset.ts);
        qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
        tmp->rspOffset.type = TMQ_OFFSET__SNAPSHOT_META;
        tqDebug("tmqsnap task exec change to get data");
        continue;
      }

      *pMetaRsp = *tmp;
      tqDebug("tmqsnap task exec exited, get meta");
L
Liu Jicong 已提交
194
    }
195

196
    tqDebug("task exec exited");
L
Liu Jicong 已提交
197 198 199
    break;
  }

200 201 202 203 204
  if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) {
    ASSERT(0);
  }

  ASSERT(pRsp->rspOffset.type != 0);
L
Liu Jicong 已提交
205 206 207
  return 0;
}

L
Liu Jicong 已提交
208 209
int32_t tqLogScanExec(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, SMqDataRsp* pRsp) {
  STqExecHandle* pExec = &pHandle->execHandle;
wmmhello's avatar
wmmhello 已提交
210 211 212 213 214 215
  ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN);

  if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
    pRsp->withSchema = 1;
    STqReader* pReader = pExec->pExecReader;
    tqReaderSetDataMsg(pReader, pReq, 0);
L
Liu Jicong 已提交
216
    while (tqNextDataBlock(pReader)) {
wmmhello's avatar
wmmhello 已提交
217 218
      SSDataBlock block = {0};
      if (tqRetrieveDataBlock(&block, pReader) < 0) {
L
Liu Jicong 已提交
219 220
        if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
      }
wmmhello's avatar
wmmhello 已提交
221 222 223 224 225 226 227 228 229 230 231
      if (pRsp->withTbName) {
        int64_t uid = pExec->pExecReader->msgIter.uid;
        if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
          blockDataFreeRes(&block);
          continue;
        }
      }
      tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
      blockDataFreeRes(&block);
      tqAddBlockSchemaToRsp(pExec, pRsp);
      pRsp->blockNum++;
L
Liu Jicong 已提交
232
    }
wmmhello's avatar
wmmhello 已提交
233 234 235 236 237 238 239
  } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
    pRsp->withSchema = 1;
    STqReader* pReader = pExec->pExecReader;
    tqReaderSetDataMsg(pReader, pReq, 0);
    while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
      SSDataBlock block = {0};
      if (tqRetrieveDataBlock(&block, pReader) < 0) {
L
Liu Jicong 已提交
240 241
        if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
      }
wmmhello's avatar
wmmhello 已提交
242 243 244 245 246 247 248 249 250 251 252
      if (pRsp->withTbName) {
        int64_t uid = pExec->pExecReader->msgIter.uid;
        if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
          blockDataFreeRes(&block);
          continue;
        }
      }
      tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
      blockDataFreeRes(&block);
      tqAddBlockSchemaToRsp(pExec, pRsp);
      pRsp->blockNum++;
L
Liu Jicong 已提交
253
    }
L
Liu Jicong 已提交
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
#if 0
    if (pHandle->fetchMeta && pRsp->blockNum) {
      SSubmitMsgIter iter = {0};
      tInitSubmitMsgIter(pReq, &iter);
      STaosxRsp* pXrsp = (STaosxRsp*)pRsp;
      while (1) {
        SSubmitBlk* pBlk = NULL;
        if (tGetSubmitMsgNext(&iter, &pBlk) < 0) return -1;
        if (pBlk->schemaLen > 0) {
          if (pXrsp->createTableNum == 0) {
            pXrsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
            pXrsp->createTableReq = taosArrayInit(0, sizeof(void*));
          }
          void* createReq = taosMemoryCalloc(1, pBlk->schemaLen);
          memcpy(createReq, pBlk->data, pBlk->schemaLen);
          taosArrayPush(pXrsp->createTableLen, &pBlk->schemaLen);
          taosArrayPush(pXrsp->createTableReq, &createReq);
          pXrsp->createTableNum++;
        }
      }
    }
#endif
L
Liu Jicong 已提交
276
  }
L
Liu Jicong 已提交
277

wmmhello's avatar
wmmhello 已提交
278 279 280 281 282
  if (pRsp->blockNum == 0) {
    return -1;
  }

  return 0;
L
Liu Jicong 已提交
283
}