tqExec.c 6.4 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tq.h"

18
static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols) {
L
Liu Jicong 已提交
19 20 21 22 23 24 25 26 27 28 29 30 31
  int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
  void*   buf = taosMemoryCalloc(1, dataStrLen);
  if (buf == NULL) return -1;

  SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
  pRetrieve->useconds = 0;
  pRetrieve->precision = TSDB_DEFAULT_PRECISION;
  pRetrieve->compressed = 0;
  pRetrieve->completed = 1;
  pRetrieve->numOfRows = htonl(pBlock->info.rows);

  // TODO enable compress
  int32_t actualLen = 0;
32
  blockEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false);
L
Liu Jicong 已提交
33 34 35 36 37 38 39
  actualLen += sizeof(SRetrieveTableRsp);
  ASSERT(actualLen <= dataStrLen);
  taosArrayPush(pRsp->blockDataLen, &actualLen);
  taosArrayPush(pRsp->blockData, &buf);
  return 0;
}

L
Liu Jicong 已提交
40 41
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRsp) {
  SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader->pSchemaWrapper);
L
Liu Jicong 已提交
42 43 44
  if (pSW == NULL) {
    return -1;
  }
L
Liu Jicong 已提交
45 46 47 48
  taosArrayPush(pRsp->blockSchema, &pSW);
  return 0;
}

L
Liu Jicong 已提交
49
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
L
Liu Jicong 已提交
50 51
  SMetaReader mr = {0};
  metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
52
  // TODO add reference to gurantee success
L
Liu Jicong 已提交
53
  if (metaGetTableEntryByUid(&mr, uid) < 0) {
54
    metaReaderClear(&mr);
L
Liu Jicong 已提交
55 56 57 58 59 60 61 62
    return -1;
  }
  char* tbName = strdup(mr.me.name);
  taosArrayPush(pRsp->blockTbName, &tbName);
  metaReaderClear(&mr);
  return 0;
}

wmmhello's avatar
wmmhello 已提交
63
int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) {
L
Liu Jicong 已提交
64
  const STqExecHandle* pExec = &pHandle->execHandle;
65
  qTaskInfo_t          task = pExec->task;
L
Liu Jicong 已提交
66

67
  if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
68
    tqDebug("prepare scan failed, return");
L
Liu Jicong 已提交
69 70 71 72
    if (pOffset->type == TMQ_OFFSET__LOG) {
      pRsp->rspOffset = *pOffset;
      return 0;
    } else {
73
      tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
74
      if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
75
        tqDebug("prepare scan failed, return");
L
Liu Jicong 已提交
76 77 78 79
        pRsp->rspOffset = *pOffset;
        return 0;
      }
    }
L
Liu Jicong 已提交
80 81
  }

L
Liu Jicong 已提交
82
  int32_t rowCnt = 0;
L
Liu Jicong 已提交
83 84 85
  while (1) {
    SSDataBlock* pDataBlock = NULL;
    uint64_t     ts = 0;
L
Liu Jicong 已提交
86
    tqDebug("task start to execute");
L
Liu Jicong 已提交
87 88 89
    if (qExecTask(task, &pDataBlock, &ts) < 0) {
      ASSERT(0);
    }
L
Liu Jicong 已提交
90
    tqDebug("task execute end, get %p", pDataBlock);
L
Liu Jicong 已提交
91 92 93

    if (pDataBlock != NULL) {
      if (pRsp->withTbName) {
wmmhello's avatar
wmmhello 已提交
94
        int64_t uid = 0;
L
Liu Jicong 已提交
95
        if (pOffset->type == TMQ_OFFSET__LOG) {
wmmhello's avatar
wmmhello 已提交
96
          uid = pExec->pExecReader->msgIter.uid;
L
Liu Jicong 已提交
97
        } else {
wmmhello's avatar
wmmhello 已提交
98 99 100 101
          uid = pDataBlock->info.uid;
        }
        if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
          continue;
L
Liu Jicong 已提交
102 103
        }
      }
wmmhello's avatar
wmmhello 已提交
104 105 106 107
      if(pRsp->withSchema){
        tqAddBlockSchemaToRsp(pExec, pRsp);
      }
      tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock));
108
      pRsp->blockNum++;
L
Liu Jicong 已提交
109
      if (pOffset->type == TMQ_OFFSET__LOG) {
L
Liu Jicong 已提交
110 111
        continue;
      } else {
L
Liu Jicong 已提交
112
        rowCnt += pDataBlock->info.rows;
L
Liu Jicong 已提交
113
        if (rowCnt <= 4096) continue;
L
Liu Jicong 已提交
114 115 116
      }
    }

117
    if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
S
Shengliang Guan 已提交
118 119
      tqDebug("vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
              pHandle->snapshotVer + 1);
120
      tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
121
      qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
122 123 124
      continue;
    }

wmmhello's avatar
wmmhello 已提交
125 126
    if (pRsp->blockNum > 0){
      qStreamExtractOffset(task, &pRsp->rspOffset);
wmmhello's avatar
wmmhello 已提交
127
      tqDebug("task exec exited, get data");
wmmhello's avatar
wmmhello 已提交
128
      break;
L
Liu Jicong 已提交
129 130
    }

wmmhello's avatar
wmmhello 已提交
131 132 133 134
    SMqMetaRsp* tmp = qStreamExtractMetaMsg(task);
    if(tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA){
      qStreamPrepareScan(task, &tmp->rspOffset, pHandle->execHandle.subType);
      tmp->rspOffset.type = TMQ_OFFSET__SNAPSHOT_META;
wmmhello's avatar
wmmhello 已提交
135
      tqDebug("task exec change to get meta");
wmmhello's avatar
wmmhello 已提交
136
      continue;
L
Liu Jicong 已提交
137
    }
138

wmmhello's avatar
wmmhello 已提交
139
    *pMetaRsp = *tmp;
wmmhello's avatar
wmmhello 已提交
140
    tqDebug("task exec exited, get meta");
L
Liu Jicong 已提交
141 142 143 144 145 146
    break;
  }

  return 0;
}

L
Liu Jicong 已提交
147
#if 0
L
Liu Jicong 已提交
148
int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset, int32_t workerId) {
L
Liu Jicong 已提交
149 150
  ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);
  qTaskInfo_t task = pExec->execCol.task[workerId];
L
Liu Jicong 已提交
151

L
Liu Jicong 已提交
152
  if (qStreamPrepareTsdbScan(task, offset.uid, offset.ts) < 0) {
L
Liu Jicong 已提交
153 154 155 156
    ASSERT(0);
  }

  int32_t rowCnt = 0;
L
Liu Jicong 已提交
157 158 159 160 161 162 163 164 165
  while (1) {
    SSDataBlock* pDataBlock = NULL;
    uint64_t     ts = 0;
    if (qExecTask(task, &pDataBlock, &ts) < 0) {
      ASSERT(0);
    }
    if (pDataBlock == NULL) break;

    ASSERT(pDataBlock->info.rows != 0);
H
Haojun Liao 已提交
166
    ASSERT(taosArrayGetSize(pDataBlock->pDataBlock) != 0);
L
Liu Jicong 已提交
167 168

    tqAddBlockDataToRsp(pDataBlock, pRsp);
L
Liu Jicong 已提交
169 170

    if (pRsp->withTbName) {
L
Liu Jicong 已提交
171
      pRsp->withTbName = 0;
172
#if 0
173 174 175 176 177
      int64_t uid;
      int64_t ts;
      if (qGetStreamScanStatus(task, &uid, &ts) < 0) {
        ASSERT(0);
      }
L
Liu Jicong 已提交
178
      tqAddTbNameToRsp(pTq, uid, pRsp);
L
Liu Jicong 已提交
179
#endif
L
Liu Jicong 已提交
180
    }
L
Liu Jicong 已提交
181
    pRsp->blockNum++;
L
Liu Jicong 已提交
182 183 184 185 186 187 188 189

    rowCnt += pDataBlock->info.rows;
    if (rowCnt >= 4096) break;
  }
  int64_t uid;
  int64_t ts;
  if (qGetStreamScanStatus(task, &uid, &ts) < 0) {
    ASSERT(0);
L
Liu Jicong 已提交
190
  }
L
Liu Jicong 已提交
191
  tqOffsetResetToData(&pRsp->rspOffset, uid, ts);
L
Liu Jicong 已提交
192 193 194

  return 0;
}
L
Liu Jicong 已提交
195
#endif
L
Liu Jicong 已提交
196

wmmhello's avatar
wmmhello 已提交
197 198
SSDataBlock* tqLogScanExec(int8_t subType, STqReader* pReader, SHashObj* pFilterOutTbUid, SSDataBlock* block) {
  if (subType == TOPIC_SUB_TYPE__TABLE) {
L
Liu Jicong 已提交
199
    while (tqNextDataBlock(pReader)) {
wmmhello's avatar
wmmhello 已提交
200
      if (tqRetrieveDataBlock(block, pReader) < 0) {
L
Liu Jicong 已提交
201 202
        if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
      }
wmmhello's avatar
wmmhello 已提交
203
      return block;
L
Liu Jicong 已提交
204
    }
wmmhello's avatar
wmmhello 已提交
205 206 207
  } else if (subType == TOPIC_SUB_TYPE__DB) {
    while (tqNextDataBlockFilterOut(pReader, pFilterOutTbUid)) {
      if (tqRetrieveDataBlock(block, pReader) < 0) {
L
Liu Jicong 已提交
208 209
        if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
      }
wmmhello's avatar
wmmhello 已提交
210
      return block;
L
Liu Jicong 已提交
211 212
    }
  }
L
Liu Jicong 已提交
213

wmmhello's avatar
wmmhello 已提交
214
  return NULL;
L
Liu Jicong 已提交
215
}