tqExec.c 6.7 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tq.h"

L
Liu Jicong 已提交
18
static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp) {
L
Liu Jicong 已提交
19 20 21 22 23 24 25 26 27 28 29 30 31
  int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
  void*   buf = taosMemoryCalloc(1, dataStrLen);
  if (buf == NULL) return -1;

  SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
  pRetrieve->useconds = 0;
  pRetrieve->precision = TSDB_DEFAULT_PRECISION;
  pRetrieve->compressed = 0;
  pRetrieve->completed = 1;
  pRetrieve->numOfRows = htonl(pBlock->info.rows);

  // TODO enable compress
  int32_t actualLen = 0;
32
  blockEncode(pBlock, pRetrieve->data, &actualLen, taosArrayGetSize(pBlock->pDataBlock), false);
L
Liu Jicong 已提交
33 34 35 36 37 38 39
  actualLen += sizeof(SRetrieveTableRsp);
  ASSERT(actualLen <= dataStrLen);
  taosArrayPush(pRsp->blockDataLen, &actualLen);
  taosArrayPush(pRsp->blockData, &buf);
  return 0;
}

L
Liu Jicong 已提交
40
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, int32_t workerId, SMqDataRsp* pRsp) {
L
Liu Jicong 已提交
41
  SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper);
L
Liu Jicong 已提交
42 43 44
  if (pSW == NULL) {
    return -1;
  }
L
Liu Jicong 已提交
45 46 47 48
  taosArrayPush(pRsp->blockSchema, &pSW);
  return 0;
}

L
Liu Jicong 已提交
49
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
L
Liu Jicong 已提交
50 51 52 53 54 55 56 57 58 59 60 61
  SMetaReader mr = {0};
  metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
  if (metaGetTableEntryByUid(&mr, uid) < 0) {
    ASSERT(0);
    return -1;
  }
  char* tbName = strdup(mr.me.name);
  taosArrayPush(pRsp->blockTbName, &tbName);
  metaReaderClear(&mr);
  return 0;
}

L
Liu Jicong 已提交
62
int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
L
Liu Jicong 已提交
63 64
  qTaskInfo_t task = pExec->execCol.task[0];

L
Liu Jicong 已提交
65
  if (qStreamPrepareScan(task, pOffset) < 0) {
L
Liu Jicong 已提交
66
    ASSERT(pOffset->type == TMQ_OFFSET__LOG);
L
Liu Jicong 已提交
67 68 69
    pRsp->rspOffset = *pOffset;
    pRsp->rspOffset.version--;
    return 0;
L
Liu Jicong 已提交
70 71
  }

L
Liu Jicong 已提交
72
  int32_t rowCnt = 0;
L
Liu Jicong 已提交
73 74 75 76 77 78 79 80 81
  while (1) {
    SSDataBlock* pDataBlock = NULL;
    uint64_t     ts = 0;
    if (qExecTask(task, &pDataBlock, &ts) < 0) {
      ASSERT(0);
    }

    if (pDataBlock != NULL) {
      tqAddBlockDataToRsp(pDataBlock, pRsp);
L
Liu Jicong 已提交
82
      pRsp->blockNum++;
L
Liu Jicong 已提交
83
      if (pRsp->withTbName) {
L
Liu Jicong 已提交
84 85 86 87 88 89 90 91
        if (pOffset->type == TMQ_OFFSET__LOG) {
          int64_t uid = pExec->pExecReader[0]->msgIter.uid;
          tqAddTbNameToRsp(pTq, uid, pRsp);
        } else {
          pRsp->withTbName = 0;
        }
      }
      if (pOffset->type == TMQ_OFFSET__LOG) {
L
Liu Jicong 已提交
92 93
        continue;
      } else {
L
Liu Jicong 已提交
94
        rowCnt += pDataBlock->info.rows;
L
Liu Jicong 已提交
95
        if (rowCnt <= 4096) continue;
L
Liu Jicong 已提交
96 97 98 99 100 101 102 103 104 105 106
      }
    }

    void* meta = qStreamExtractMetaMsg(task);
    if (meta != NULL) {
      // tq add meta to rsp
    }

    if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) {
      ASSERT(0);
    }
L
Liu Jicong 已提交
107

L
Liu Jicong 已提交
108 109
    ASSERT(pRsp->rspOffset.type != 0);

L
Liu Jicong 已提交
110 111 112 113
    if (pRsp->rspOffset.type == TMQ_OFFSET__LOG) {
      ASSERT(pRsp->rspOffset.version + 1 >= pRsp->reqOffset.version);
    }

L
Liu Jicong 已提交
114 115 116 117 118 119
    break;
  }

  return 0;
}

L
Liu Jicong 已提交
120
#if 0
L
Liu Jicong 已提交
121
int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset, int32_t workerId) {
L
Liu Jicong 已提交
122 123
  ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);
  qTaskInfo_t task = pExec->execCol.task[workerId];
L
Liu Jicong 已提交
124

L
Liu Jicong 已提交
125
  if (qStreamPrepareTsdbScan(task, offset.uid, offset.ts) < 0) {
L
Liu Jicong 已提交
126 127 128 129
    ASSERT(0);
  }

  int32_t rowCnt = 0;
L
Liu Jicong 已提交
130 131 132 133 134 135 136 137 138
  while (1) {
    SSDataBlock* pDataBlock = NULL;
    uint64_t     ts = 0;
    if (qExecTask(task, &pDataBlock, &ts) < 0) {
      ASSERT(0);
    }
    if (pDataBlock == NULL) break;

    ASSERT(pDataBlock->info.rows != 0);
H
Haojun Liao 已提交
139
    ASSERT(taosArrayGetSize(pDataBlock->pDataBlock) != 0);
L
Liu Jicong 已提交
140 141

    tqAddBlockDataToRsp(pDataBlock, pRsp);
L
Liu Jicong 已提交
142 143

    if (pRsp->withTbName) {
L
Liu Jicong 已提交
144
      pRsp->withTbName = 0;
145
#if 0
146 147 148 149 150
      int64_t uid;
      int64_t ts;
      if (qGetStreamScanStatus(task, &uid, &ts) < 0) {
        ASSERT(0);
      }
L
Liu Jicong 已提交
151
      tqAddTbNameToRsp(pTq, uid, pRsp);
L
Liu Jicong 已提交
152
#endif
L
Liu Jicong 已提交
153
    }
L
Liu Jicong 已提交
154
    pRsp->blockNum++;
L
Liu Jicong 已提交
155 156 157 158 159 160 161 162

    rowCnt += pDataBlock->info.rows;
    if (rowCnt >= 4096) break;
  }
  int64_t uid;
  int64_t ts;
  if (qGetStreamScanStatus(task, &uid, &ts) < 0) {
    ASSERT(0);
L
Liu Jicong 已提交
163
  }
L
Liu Jicong 已提交
164
  tqOffsetResetToData(&pRsp->rspOffset, uid, ts);
L
Liu Jicong 已提交
165 166 167

  return 0;
}
L
Liu Jicong 已提交
168
#endif
L
Liu Jicong 已提交
169

L
Liu Jicong 已提交
170
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId) {
L
Liu Jicong 已提交
171
  if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
172
    qTaskInfo_t task = pExec->execCol.task[workerId];
L
Liu Jicong 已提交
173
    ASSERT(task);
L
Liu Jicong 已提交
174
    qSetStreamInput(task, pReq, STREAM_INPUT__DATA_SUBMIT, false);
L
Liu Jicong 已提交
175 176 177 178 179 180 181 182 183 184 185 186
    while (1) {
      SSDataBlock* pDataBlock = NULL;
      uint64_t     ts = 0;
      if (qExecTask(task, &pDataBlock, &ts) < 0) {
        ASSERT(0);
      }
      if (pDataBlock == NULL) break;

      ASSERT(pDataBlock->info.rows != 0);

      tqAddBlockDataToRsp(pDataBlock, pRsp);
      if (pRsp->withTbName) {
L
Liu Jicong 已提交
187
        int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
L
Liu Jicong 已提交
188
        tqAddTbNameToRsp(pTq, uid, pRsp);
L
Liu Jicong 已提交
189 190 191 192 193
      }
      pRsp->blockNum++;
    }
  } else if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
    pRsp->withSchema = 1;
L
Liu Jicong 已提交
194 195
    STqReader* pReader = pExec->pExecReader[workerId];
    tqReaderSetDataMsg(pReader, pReq, 0);
L
Liu Jicong 已提交
196 197
    while (tqNextDataBlock(pReader)) {
      SSDataBlock block = {0};
198
      if (tqRetrieveDataBlock(&block, pReader) < 0) {
L
Liu Jicong 已提交
199 200 201 202 203
        if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
        ASSERT(0);
      }
      tqAddBlockDataToRsp(&block, pRsp);
      if (pRsp->withTbName) {
L
Liu Jicong 已提交
204
        int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
L
Liu Jicong 已提交
205
        tqAddTbNameToRsp(pTq, uid, pRsp);
L
Liu Jicong 已提交
206 207 208 209 210 211
      }
      tqAddBlockSchemaToRsp(pExec, workerId, pRsp);
      pRsp->blockNum++;
    }
  } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
    pRsp->withSchema = 1;
L
Liu Jicong 已提交
212 213
    STqReader* pReader = pExec->pExecReader[workerId];
    tqReaderSetDataMsg(pReader, pReq, 0);
L
Liu Jicong 已提交
214
    while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
L
Liu Jicong 已提交
215
      SSDataBlock block = {0};
216
      if (tqRetrieveDataBlock(&block, pReader) < 0) {
L
Liu Jicong 已提交
217 218 219 220 221
        if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
        ASSERT(0);
      }
      tqAddBlockDataToRsp(&block, pRsp);
      if (pRsp->withTbName) {
L
Liu Jicong 已提交
222
        int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
L
Liu Jicong 已提交
223
        tqAddTbNameToRsp(pTq, uid, pRsp);
L
Liu Jicong 已提交
224 225 226 227 228 229 230 231 232 233 234
      }
      tqAddBlockSchemaToRsp(pExec, workerId, pRsp);
      pRsp->blockNum++;
    }
  }
  if (pRsp->blockNum == 0) {
    pRsp->skipLogNum++;
    return -1;
  }
  return 0;
}