tqRead.c 6.3 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

X
Xiaoyu Wang 已提交
16
#include "tdatablock.h"
L
Liu Jicong 已提交
17
#include "vnode.h"
L
Liu Jicong 已提交
18 19

STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
wafwerar's avatar
wafwerar 已提交
20
  STqReadHandle* pReadHandle = taosMemoryMalloc(sizeof(STqReadHandle));
L
Liu Jicong 已提交
21 22 23 24 25 26 27 28 29 30
  if (pReadHandle == NULL) {
    return NULL;
  }
  pReadHandle->pVnodeMeta = pMeta;
  pReadHandle->pMsg = NULL;
  pReadHandle->ver = -1;
  pReadHandle->pColIdList = NULL;
  pReadHandle->sver = -1;
  pReadHandle->pSchema = NULL;
  pReadHandle->pSchemaWrapper = NULL;
L
Liu Jicong 已提交
31
  pReadHandle->tbIdHash = NULL;
L
Liu Jicong 已提交
32 33 34
  return pReadHandle;
}

L
Liu Jicong 已提交
35
int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ver) {
L
Liu Jicong 已提交
36 37 38
  pReadHandle->pMsg = pMsg;
  pMsg->length = htonl(pMsg->length);
  pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
L
Liu Jicong 已提交
39

L
Liu Jicong 已提交
40
  // iterate and convert
L
Liu Jicong 已提交
41 42 43 44 45 46
  if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
  while (true) {
    if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1;
    if (pReadHandle->pBlock == NULL) break;

    pReadHandle->pBlock->uid = htobe64(pReadHandle->pBlock->uid);
X
Xiaoyu Wang 已提交
47
    pReadHandle->pBlock->suid = htobe64(pReadHandle->pBlock->suid);
L
Liu Jicong 已提交
48 49 50 51 52 53 54
    pReadHandle->pBlock->sversion = htonl(pReadHandle->pBlock->sversion);
    pReadHandle->pBlock->dataLen = htonl(pReadHandle->pBlock->dataLen);
    pReadHandle->pBlock->schemaLen = htonl(pReadHandle->pBlock->schemaLen);
    pReadHandle->pBlock->numOfRows = htons(pReadHandle->pBlock->numOfRows);
  }

  if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
L
Liu Jicong 已提交
55 56
  pReadHandle->ver = ver;
  memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
L
Liu Jicong 已提交
57
  return 0;
L
Liu Jicong 已提交
58 59 60 61 62 63 64 65 66
}

bool tqNextDataBlock(STqReadHandle* pHandle) {
  while (1) {
    if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
      return false;
    }
    if (pHandle->pBlock == NULL) return false;

L
Liu Jicong 已提交
67
    /*pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);*/
L
Liu Jicong 已提交
68 69 70 71
    /*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
    ASSERT(pHandle->tbIdHash);
    void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t));
    if (ret != NULL) {
72
      /*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/
L
Liu Jicong 已提交
73 74 75 76 77
      /*pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);*/
      /*pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion);*/
      /*pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen);*/
      /*pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen);*/
      /*pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows);*/
L
Liu Jicong 已提交
78
      return true;
L
Liu Jicong 已提交
79
      /*} else {*/
80
      /*printf("skip one tb %ld\n", pHandle->pBlock->uid);*/
L
Liu Jicong 已提交
81 82 83 84 85 86
    }
  }
  return false;
}

int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) {
L
Liu Jicong 已提交
87 88
  // currently only rows are used

L
Liu Jicong 已提交
89 90
  pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList);
  pBlockInfo->rows = pHandle->pBlock->numOfRows;
91
//  pBlockInfo->uid = pHandle->pBlock->uid; // the uid can not be assigned to pBlockData.
L
Liu Jicong 已提交
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
  return 0;
}

SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
  /*int32_t         sversion = pHandle->pBlock->sversion;*/
  // TODO set to real sversion
  int32_t sversion = 0;
  if (pHandle->sver != sversion) {
    pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->pBlock->uid, sversion);

    tb_uid_t quid;
    STbCfg*  pTbCfg = metaGetTbInfoByUid(pHandle->pVnodeMeta, pHandle->pBlock->uid);
    if (pTbCfg->type == META_CHILD_TABLE) {
      quid = pTbCfg->ctbCfg.suid;
    } else {
      quid = pHandle->pBlock->uid;
    }
    pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, quid, sversion, true);
    pHandle->sver = sversion;
  }

  STSchema*       pTschema = pHandle->pSchema;
  SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper;

  int32_t numOfRows = pHandle->pBlock->numOfRows;
L
Liu Jicong 已提交
117
  /*int32_t numOfCols = pHandle->pSchema->numOfCols;*/
L
Liu Jicong 已提交
118 119 120 121 122 123 124 125 126 127
  int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);

  if (colNumNeed > pSchemaWrapper->nCols) {
    colNumNeed = pSchemaWrapper->nCols;
  }

  SArray* pArray = taosArrayInit(colNumNeed, sizeof(SColumnInfoData));
  if (pArray == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
128 129 130 131
  int32_t colMeta = 0;
  int32_t colNeed = 0;
  while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) {
    SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta];
132 133
    col_id_t colIdSchema = pColSchema->colId;
    col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pHandle->pColIdList, colNeed);
L
Liu Jicong 已提交
134 135 136 137 138 139
    if (colIdSchema < colIdNeed) {
      colMeta++;
    } else if (colIdSchema > colIdNeed) {
      colNeed++;
    } else {
      SColumnInfoData colInfo = {0};
L
Liu Jicong 已提交
140
      /*int             sz = numOfRows * pColSchema->bytes;*/
L
Liu Jicong 已提交
141 142 143 144
      colInfo.info.bytes = pColSchema->bytes;
      colInfo.info.colId = pColSchema->colId;
      colInfo.info.type = pColSchema->type;

H
Haojun Liao 已提交
145
      if (colInfoDataEnsureCapacity(&colInfo, numOfRows) < 0) {
L
Liu Jicong 已提交
146 147 148
        taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock);
        return NULL;
      }
L
Liu Jicong 已提交
149 150 151 152 153
      taosArrayPush(pArray, &colInfo);
      colMeta++;
      colNeed++;
    }
  }
L
Liu Jicong 已提交
154 155 156 157 158 159 160 161 162

  STSRowIter iter = {0};
  tdSTSRowIterInit(&iter, pTschema);
  STSRow* row;
  int32_t curRow = 0;
  tInitSubmitBlkIter(pHandle->pBlock, &pHandle->blkIter);
  while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
    tdSTSRowIterReset(&iter, row);
    // get all wanted col of that block
L
Liu Jicong 已提交
163 164 165 166 167 168 169
    int32_t colTot = taosArrayGetSize(pArray);
    for (int32_t i = 0; i < colTot; i++) {
      SColumnInfoData* pColData = taosArrayGet(pArray, i);
      SCellVal         sVal = {0};
      if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
        break;
      }
L
Liu Jicong 已提交
170 171
      /*if (colDataAppend(pColData, curRow, sVal.val, false) < 0) {*/
      if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) {
L
Liu Jicong 已提交
172 173 174
        taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock);
        return NULL;
      }
L
Liu Jicong 已提交
175 176 177 178 179
    }
    curRow++;
  }
  return pArray;
}