tqExec.c 6.3 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tq.h"

L
Liu Jicong 已提交
18
static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp) {
L
Liu Jicong 已提交
19 20 21 22 23 24 25 26 27 28 29 30 31
  int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
  void*   buf = taosMemoryCalloc(1, dataStrLen);
  if (buf == NULL) return -1;

  SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
  pRetrieve->useconds = 0;
  pRetrieve->precision = TSDB_DEFAULT_PRECISION;
  pRetrieve->compressed = 0;
  pRetrieve->completed = 1;
  pRetrieve->numOfRows = htonl(pBlock->info.rows);

  // TODO enable compress
  int32_t actualLen = 0;
32
  blockEncode(pBlock, pRetrieve->data, &actualLen, taosArrayGetSize(pBlock->pDataBlock), false);
L
Liu Jicong 已提交
33 34 35 36 37 38 39
  actualLen += sizeof(SRetrieveTableRsp);
  ASSERT(actualLen <= dataStrLen);
  taosArrayPush(pRsp->blockDataLen, &actualLen);
  taosArrayPush(pRsp->blockData, &buf);
  return 0;
}

L
Liu Jicong 已提交
40
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, int32_t workerId, SMqDataRsp* pRsp) {
L
Liu Jicong 已提交
41
  SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper);
L
Liu Jicong 已提交
42 43 44
  if (pSW == NULL) {
    return -1;
  }
L
Liu Jicong 已提交
45 46 47 48
  taosArrayPush(pRsp->blockSchema, &pSW);
  return 0;
}

L
Liu Jicong 已提交
49
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
L
Liu Jicong 已提交
50 51 52 53 54 55 56 57 58 59 60 61
  SMetaReader mr = {0};
  metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
  if (metaGetTableEntryByUid(&mr, uid) < 0) {
    ASSERT(0);
    return -1;
  }
  char* tbName = strdup(mr.me.name);
  taosArrayPush(pRsp->blockTbName, &tbName);
  metaReaderClear(&mr);
  return 0;
}

L
Liu Jicong 已提交
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
  qTaskInfo_t task = pExec->execCol.task[0];

  if (qStreamPrepareScan1(task, pOffset) < 0) {
    ASSERT(0);
  }

  while (1) {
    SSDataBlock* pDataBlock = NULL;
    uint64_t     ts = 0;
    if (qExecTask(task, &pDataBlock, &ts) < 0) {
      ASSERT(0);
    }

    if (pDataBlock != NULL) {
      tqAddBlockDataToRsp(pDataBlock, pRsp);
      if (pRsp->withTbName) {
        int64_t uid = pExec->pExecReader[0]->msgIter.uid;
        tqAddTbNameToRsp(pTq, uid, pRsp);
      }
      pRsp->blockNum++;
      continue;
    }

    void* meta = qStreamExtractMetaMsg(task);
    if (meta != NULL) {
      // tq add meta to rsp
    }

    if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) {
      ASSERT(0);
    }
    ASSERT(pRsp->rspOffset.type != 0);

    break;
  }

  return 0;
}

L
Liu Jicong 已提交
102
int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset, int32_t workerId) {
L
Liu Jicong 已提交
103 104
  ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);
  qTaskInfo_t task = pExec->execCol.task[workerId];
L
Liu Jicong 已提交
105

106 107 108 109
  /*if (qStreamScanSnapshot(task) < 0) {*/
  /*ASSERT(0);*/
  /*}*/

L
Liu Jicong 已提交
110
  if (qStreamPrepareTsdbScan(task, offset.uid, offset.ts) < 0) {
L
Liu Jicong 已提交
111 112 113 114
    ASSERT(0);
  }

  int32_t rowCnt = 0;
L
Liu Jicong 已提交
115 116 117 118 119 120 121 122 123
  while (1) {
    SSDataBlock* pDataBlock = NULL;
    uint64_t     ts = 0;
    if (qExecTask(task, &pDataBlock, &ts) < 0) {
      ASSERT(0);
    }
    if (pDataBlock == NULL) break;

    ASSERT(pDataBlock->info.rows != 0);
H
Haojun Liao 已提交
124
    ASSERT(taosArrayGetSize(pDataBlock->pDataBlock) != 0);
L
Liu Jicong 已提交
125 126

    tqAddBlockDataToRsp(pDataBlock, pRsp);
L
Liu Jicong 已提交
127 128

    if (pRsp->withTbName) {
L
Liu Jicong 已提交
129
      pRsp->withTbName = 0;
130
#if 0
131 132 133 134 135
      int64_t uid;
      int64_t ts;
      if (qGetStreamScanStatus(task, &uid, &ts) < 0) {
        ASSERT(0);
      }
L
Liu Jicong 已提交
136
      tqAddTbNameToRsp(pTq, uid, pRsp);
L
Liu Jicong 已提交
137
#endif
L
Liu Jicong 已提交
138
    }
L
Liu Jicong 已提交
139
    pRsp->blockNum++;
L
Liu Jicong 已提交
140 141 142 143 144 145 146 147

    rowCnt += pDataBlock->info.rows;
    if (rowCnt >= 4096) break;
  }
  int64_t uid;
  int64_t ts;
  if (qGetStreamScanStatus(task, &uid, &ts) < 0) {
    ASSERT(0);
L
Liu Jicong 已提交
148
  }
L
Liu Jicong 已提交
149
  tqOffsetResetToData(&pRsp->rspOffset, uid, ts);
L
Liu Jicong 已提交
150 151 152 153

  return 0;
}

L
Liu Jicong 已提交
154
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId) {
L
Liu Jicong 已提交
155
  if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
156
    qTaskInfo_t task = pExec->execCol.task[workerId];
L
Liu Jicong 已提交
157
    ASSERT(task);
L
Liu Jicong 已提交
158
    qSetStreamInput(task, pReq, STREAM_INPUT__DATA_SUBMIT, false);
L
Liu Jicong 已提交
159 160 161 162 163 164 165 166 167 168 169 170
    while (1) {
      SSDataBlock* pDataBlock = NULL;
      uint64_t     ts = 0;
      if (qExecTask(task, &pDataBlock, &ts) < 0) {
        ASSERT(0);
      }
      if (pDataBlock == NULL) break;

      ASSERT(pDataBlock->info.rows != 0);

      tqAddBlockDataToRsp(pDataBlock, pRsp);
      if (pRsp->withTbName) {
L
Liu Jicong 已提交
171
        int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
L
Liu Jicong 已提交
172
        tqAddTbNameToRsp(pTq, uid, pRsp);
L
Liu Jicong 已提交
173 174 175 176 177
      }
      pRsp->blockNum++;
    }
  } else if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
    pRsp->withSchema = 1;
L
Liu Jicong 已提交
178 179
    STqReader* pReader = pExec->pExecReader[workerId];
    tqReaderSetDataMsg(pReader, pReq, 0);
L
Liu Jicong 已提交
180 181
    while (tqNextDataBlock(pReader)) {
      SSDataBlock block = {0};
182
      if (tqRetrieveDataBlock(&block, pReader) < 0) {
L
Liu Jicong 已提交
183 184 185 186 187
        if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
        ASSERT(0);
      }
      tqAddBlockDataToRsp(&block, pRsp);
      if (pRsp->withTbName) {
L
Liu Jicong 已提交
188
        int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
L
Liu Jicong 已提交
189
        tqAddTbNameToRsp(pTq, uid, pRsp);
L
Liu Jicong 已提交
190 191 192 193 194 195
      }
      tqAddBlockSchemaToRsp(pExec, workerId, pRsp);
      pRsp->blockNum++;
    }
  } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
    pRsp->withSchema = 1;
L
Liu Jicong 已提交
196 197
    STqReader* pReader = pExec->pExecReader[workerId];
    tqReaderSetDataMsg(pReader, pReq, 0);
L
Liu Jicong 已提交
198
    while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
L
Liu Jicong 已提交
199
      SSDataBlock block = {0};
200
      if (tqRetrieveDataBlock(&block, pReader) < 0) {
L
Liu Jicong 已提交
201 202 203 204 205
        if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
        ASSERT(0);
      }
      tqAddBlockDataToRsp(&block, pRsp);
      if (pRsp->withTbName) {
L
Liu Jicong 已提交
206
        int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
L
Liu Jicong 已提交
207
        tqAddTbNameToRsp(pTq, uid, pRsp);
L
Liu Jicong 已提交
208 209 210 211 212 213 214 215 216 217 218
      }
      tqAddBlockSchemaToRsp(pExec, workerId, pRsp);
      pRsp->blockNum++;
    }
  }
  if (pRsp->blockNum == 0) {
    pRsp->skipLogNum++;
    return -1;
  }
  return 0;
}