tqExec.c 9.3 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tq.h"

18
static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols) {
L
Liu Jicong 已提交
19 20 21 22 23 24 25 26 27 28 29 30 31
  int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
  void*   buf = taosMemoryCalloc(1, dataStrLen);
  if (buf == NULL) return -1;

  SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
  pRetrieve->useconds = 0;
  pRetrieve->precision = TSDB_DEFAULT_PRECISION;
  pRetrieve->compressed = 0;
  pRetrieve->completed = 1;
  pRetrieve->numOfRows = htonl(pBlock->info.rows);

  // TODO enable compress
  int32_t actualLen = 0;
32
  blockEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false);
L
Liu Jicong 已提交
33 34 35 36 37 38 39
  actualLen += sizeof(SRetrieveTableRsp);
  ASSERT(actualLen <= dataStrLen);
  taosArrayPush(pRsp->blockDataLen, &actualLen);
  taosArrayPush(pRsp->blockData, &buf);
  return 0;
}

L
Liu Jicong 已提交
40 41
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRsp) {
  SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader->pSchemaWrapper);
L
Liu Jicong 已提交
42 43 44
  if (pSW == NULL) {
    return -1;
  }
L
Liu Jicong 已提交
45 46 47 48
  taosArrayPush(pRsp->blockSchema, &pSW);
  return 0;
}

L
Liu Jicong 已提交
49
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
L
Liu Jicong 已提交
50 51
  SMetaReader mr = {0};
  metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
52
  // TODO add reference to gurantee success
L
Liu Jicong 已提交
53
  if (metaGetTableEntryByUid(&mr, uid) < 0) {
54
    metaReaderClear(&mr);
L
Liu Jicong 已提交
55 56 57 58 59 60 61 62
    return -1;
  }
  char* tbName = strdup(mr.me.name);
  taosArrayPush(pRsp->blockTbName, &tbName);
  metaReaderClear(&mr);
  return 0;
}

63
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
L
Liu Jicong 已提交
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
  const STqExecHandle* pExec = &pHandle->execHandle;
  ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);

  qTaskInfo_t task = pExec->task;

  if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
    tqDebug("prepare scan failed, return");
    if (pOffset->type == TMQ_OFFSET__LOG) {
      pRsp->rspOffset = *pOffset;
      return 0;
    } else {
      tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
      if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
        tqDebug("prepare scan failed, return");
        pRsp->rspOffset = *pOffset;
        return 0;
      }
    }
  }

  int32_t rowCnt = 0;
  while (1) {
    SSDataBlock* pDataBlock = NULL;
    uint64_t     ts = 0;
L
Liu Jicong 已提交
88
    tqDebug("tmq task start to execute");
L
Liu Jicong 已提交
89 90 91
    if (qExecTask(task, &pDataBlock, &ts) < 0) {
      ASSERT(0);
    }
92
    tqDebug("tmq task executed, get %p", pDataBlock);
L
Liu Jicong 已提交
93

94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
    if (pDataBlock == NULL) {
      break;
    }

    tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols);
    pRsp->blockNum++;

    if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
      rowCnt += pDataBlock->info.rows;
      if (rowCnt >= 4096) break;
    }
  }

  if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) {
    ASSERT(0);
    return -1;
  }
  ASSERT(pRsp->rspOffset.type != 0);

  if (pRsp->withTbName) {
    if (pRsp->rspOffset.type == TMQ_OFFSET__LOG) {
      int64_t uid = pExec->pExecReader->msgIter.uid;
      tqAddTbNameToRsp(pTq, uid, pRsp);
    } else {
      pRsp->withTbName = false;
L
Liu Jicong 已提交
119 120
    }
  }
121
  ASSERT(pRsp->withSchema == false);
L
Liu Jicong 已提交
122 123 124 125

  return 0;
}

126
int32_t tqScan(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) {
L
Liu Jicong 已提交
127
  const STqExecHandle* pExec = &pHandle->execHandle;
128
  qTaskInfo_t          task = pExec->task;
L
Liu Jicong 已提交
129

130
  if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
131
    tqDebug("prepare scan failed, return");
L
Liu Jicong 已提交
132 133 134 135
    if (pOffset->type == TMQ_OFFSET__LOG) {
      pRsp->rspOffset = *pOffset;
      return 0;
    } else {
136
      tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
137
      if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
138
        tqDebug("prepare scan failed, return");
L
Liu Jicong 已提交
139 140 141 142
        pRsp->rspOffset = *pOffset;
        return 0;
      }
    }
L
Liu Jicong 已提交
143 144
  }

L
Liu Jicong 已提交
145
  int32_t rowCnt = 0;
L
Liu Jicong 已提交
146 147 148
  while (1) {
    SSDataBlock* pDataBlock = NULL;
    uint64_t     ts = 0;
wmmhello's avatar
wmmhello 已提交
149
    tqDebug("tmqsnap task start to execute");
L
Liu Jicong 已提交
150 151 152
    if (qExecTask(task, &pDataBlock, &ts) < 0) {
      ASSERT(0);
    }
wmmhello's avatar
wmmhello 已提交
153
    tqDebug("tmqsnap task execute end, get %p", pDataBlock);
L
Liu Jicong 已提交
154 155 156

    if (pDataBlock != NULL) {
      if (pRsp->withTbName) {
wmmhello's avatar
wmmhello 已提交
157
        int64_t uid = 0;
L
Liu Jicong 已提交
158
        if (pOffset->type == TMQ_OFFSET__LOG) {
wmmhello's avatar
wmmhello 已提交
159
          uid = pExec->pExecReader->msgIter.uid;
160
          if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
161 162
            continue;
          }
L
Liu Jicong 已提交
163 164 165
        } else {
          char* tbName = strdup(qExtractTbnameFromTask(task));
          taosArrayPush(pRsp->blockTbName, &tbName);
L
Liu Jicong 已提交
166 167
        }
      }
L
Liu Jicong 已提交
168
      if (pRsp->withSchema) {
wmmhello's avatar
wmmhello 已提交
169
        if (pOffset->type == TMQ_OFFSET__LOG) {
170
          tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);
L
Liu Jicong 已提交
171
        } else {
wmmhello's avatar
wmmhello 已提交
172 173 174
          SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
          taosArrayPush(pRsp->blockSchema, &pSW);
        }
wmmhello's avatar
wmmhello 已提交
175
      }
wmmhello's avatar
wmmhello 已提交
176

177
      tqAddBlockDataToRsp(pDataBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pDataBlock->pDataBlock));
178
      pRsp->blockNum++;
L
Liu Jicong 已提交
179
      if (pOffset->type == TMQ_OFFSET__LOG) {
L
Liu Jicong 已提交
180 181
        continue;
      } else {
L
Liu Jicong 已提交
182
        rowCnt += pDataBlock->info.rows;
L
Liu Jicong 已提交
183
        if (rowCnt <= 4096) continue;
L
Liu Jicong 已提交
184 185 186
      }
    }

187 188
    if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
      if (qStreamExtractPrepareUid(task) != 0) {
189 190
        continue;
      }
191 192 193 194
      tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
              pHandle->snapshotVer + 1);
      break;
    }
195

196 197 198
    if (pRsp->blockNum > 0) {
      tqDebug("tmqsnap task exec exited, get data");
      break;
L
Liu Jicong 已提交
199
    }
200

201 202 203 204 205 206 207 208 209 210 211 212
    SMqMetaRsp* tmp = qStreamExtractMetaMsg(task);
    if (tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
      tqOffsetResetToData(pOffset, tmp->rspOffset.uid, tmp->rspOffset.ts);
      qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
      tmp->rspOffset.type = TMQ_OFFSET__SNAPSHOT_META;
      tqDebug("tmqsnap task exec change to get data");
      continue;
    }

    *pMetaRsp = *tmp;
    tqDebug("tmqsnap task exec exited, get meta");

213
    tqDebug("task exec exited");
L
Liu Jicong 已提交
214 215 216
    break;
  }

217 218 219 220 221
  if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) {
    ASSERT(0);
  }

  ASSERT(pRsp->rspOffset.type != 0);
L
Liu Jicong 已提交
222 223 224
  return 0;
}

225
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp) {
L
Liu Jicong 已提交
226
  STqExecHandle* pExec = &pHandle->execHandle;
wmmhello's avatar
wmmhello 已提交
227 228 229 230 231
  ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN);

  if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
    STqReader* pReader = pExec->pExecReader;
    tqReaderSetDataMsg(pReader, pReq, 0);
L
Liu Jicong 已提交
232
    while (tqNextDataBlock(pReader)) {
wmmhello's avatar
wmmhello 已提交
233 234
      SSDataBlock block = {0};
      if (tqRetrieveDataBlock(&block, pReader) < 0) {
L
Liu Jicong 已提交
235 236
        if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
      }
wmmhello's avatar
wmmhello 已提交
237 238
      if (pRsp->withTbName) {
        int64_t uid = pExec->pExecReader->msgIter.uid;
239
        if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
240 241 242 243
          blockDataFreeRes(&block);
          continue;
        }
      }
244
      tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock));
wmmhello's avatar
wmmhello 已提交
245
      blockDataFreeRes(&block);
246
      tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);
wmmhello's avatar
wmmhello 已提交
247
      pRsp->blockNum++;
L
Liu Jicong 已提交
248
    }
wmmhello's avatar
wmmhello 已提交
249 250 251 252 253 254
  } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
    STqReader* pReader = pExec->pExecReader;
    tqReaderSetDataMsg(pReader, pReq, 0);
    while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
      SSDataBlock block = {0};
      if (tqRetrieveDataBlock(&block, pReader) < 0) {
L
Liu Jicong 已提交
255 256
        if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
      }
wmmhello's avatar
wmmhello 已提交
257 258
      if (pRsp->withTbName) {
        int64_t uid = pExec->pExecReader->msgIter.uid;
259
        if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
260 261 262 263
          blockDataFreeRes(&block);
          continue;
        }
      }
264
      tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock));
wmmhello's avatar
wmmhello 已提交
265
      blockDataFreeRes(&block);
266
      tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);
wmmhello's avatar
wmmhello 已提交
267
      pRsp->blockNum++;
L
Liu Jicong 已提交
268
    }
269
#if 1
L
Liu Jicong 已提交
270 271 272 273 274 275
    if (pHandle->fetchMeta && pRsp->blockNum) {
      SSubmitMsgIter iter = {0};
      tInitSubmitMsgIter(pReq, &iter);
      STaosxRsp* pXrsp = (STaosxRsp*)pRsp;
      while (1) {
        SSubmitBlk* pBlk = NULL;
L
Liu Jicong 已提交
276 277
        if (tGetSubmitMsgNext(&iter, &pBlk) < 0) break;
        if (pBlk == NULL) break;
L
Liu Jicong 已提交
278 279 280 281 282 283 284 285 286 287 288 289 290 291
        if (pBlk->schemaLen > 0) {
          if (pXrsp->createTableNum == 0) {
            pXrsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
            pXrsp->createTableReq = taosArrayInit(0, sizeof(void*));
          }
          void* createReq = taosMemoryCalloc(1, pBlk->schemaLen);
          memcpy(createReq, pBlk->data, pBlk->schemaLen);
          taosArrayPush(pXrsp->createTableLen, &pBlk->schemaLen);
          taosArrayPush(pXrsp->createTableReq, &createReq);
          pXrsp->createTableNum++;
        }
      }
    }
#endif
L
Liu Jicong 已提交
292
  }
L
Liu Jicong 已提交
293

wmmhello's avatar
wmmhello 已提交
294 295 296 297 298
  if (pRsp->blockNum == 0) {
    return -1;
  }

  return 0;
L
Liu Jicong 已提交
299
}