vnodeQuery.c 5.3 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "vnodeQuery.h"
H
more  
Hongze Cheng 已提交
17
#include "vnodeDef.h"
D
dapan1121 已提交
18

H
Haojun Liao 已提交
19 20
static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg);

H
more  
Hongze Cheng 已提交
21
int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NULL, &pVnode->pQuery); }
D
dapan1121 已提交
22 23 24

int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
  vInfo("query message is processed");
D
dapan1121 已提交
25
  return qWorkerProcessQueryMsg(pVnode, pVnode->pQuery, pMsg);
D
dapan1121 已提交
26 27 28 29
}

int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
  vInfo("fetch message is processed");
D
dapan1121 已提交
30 31 32 33 34 35 36 37 38 39 40
  switch (pMsg->msgType) {
    case TDMT_VND_FETCH:
      return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_RES_READY:
      return qWorkerProcessReadyMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_TASKS_STATUS:
      return qWorkerProcessStatusMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_CANCEL_TASK:
      return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_DROP_TASK:
      return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg);
H
Haojun Liao 已提交
41 42 43
    case TDMT_VND_SHOW_TABLES:
      return qWorkerProcessShowMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_SHOW_TABLES_FETCH:
H
Haojun Liao 已提交
44 45
      return vnodeGetTableList(pVnode, pMsg);
//      return qWorkerProcessShowFetchMsg(pVnode->pMeta, pVnode->pQuery, pMsg);
D
dapan1121 已提交
46 47 48 49
    default:
      vError("unknown msg type:%d in fetch queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
D
dapan1121 已提交
50 51
}

H
more  
Hongze Cheng 已提交
52
static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
H
Hongze Cheng 已提交
53 54 55 56 57 58 59 60 61
  STableInfoMsg * pReq = (STableInfoMsg *)(pMsg->pCont);
  STbCfg *        pTbCfg = NULL;
  STbCfg *        pStbCfg = NULL;
  tb_uid_t        uid;
  int32_t         nCols;
  int32_t         nTagCols;
  SSchemaWrapper *pSW;
  STableMetaMsg * pTbMetaMsg;
  SSchema *       pTagSchema;
H
more  
Hongze Cheng 已提交
62

H
Hongze Cheng 已提交
63 64
  pTbCfg = metaGetTbInfoByName(pVnode->pMeta, pReq->tableFname, &uid);
  if (pTbCfg == NULL) {
H
more  
Hongze Cheng 已提交
65
    return -1;
H
Hongze Cheng 已提交
66 67 68 69 70
  }

  if (pTbCfg->type == META_CHILD_TABLE) {
    pStbCfg = metaGetTbInfoByUid(pVnode->pMeta, pTbCfg->ctbCfg.suid);
    if (pStbCfg == NULL) {
H
more  
Hongze Cheng 已提交
71
      return -1;
H
Hongze Cheng 已提交
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
    }

    pSW = metaGetTableSchema(pVnode->pMeta, pTbCfg->ctbCfg.suid, 0, true);
  } else {
    pSW = metaGetTableSchema(pVnode->pMeta, uid, 0, true);
  }

  nCols = pSW->nCols;
  if (pTbCfg->type == META_SUPER_TABLE) {
    nTagCols = pTbCfg->stbCfg.nTagCols;
    pTagSchema = pTbCfg->stbCfg.pTagSchema;
  } else if (pTbCfg->type == META_SUPER_TABLE) {
    nTagCols = pStbCfg->stbCfg.nTagCols;
    pTagSchema = pStbCfg->stbCfg.pTagSchema;
  } else {
    nTagCols = 0;
    pTagSchema = NULL;
H
more  
Hongze Cheng 已提交
89 90
  }

H
Hongze Cheng 已提交
91 92
  pTbMetaMsg = (STableMetaMsg *)calloc(1, sizeof(STableMetaMsg) + sizeof(SSchema) * (nCols + nTagCols));
  if (pTbMetaMsg == NULL) {
H
more  
Hongze Cheng 已提交
93 94 95
    return -1;
  }

H
Hongze Cheng 已提交
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
  strcpy(pTbMetaMsg->tbFname, pTbCfg->name);
  if (pTbCfg->type == META_CHILD_TABLE) {
    strcpy(pTbMetaMsg->stbFname, pStbCfg->name);
    pTbMetaMsg->suid = htobe64(pTbCfg->ctbCfg.suid);
  }
  pTbMetaMsg->numOfTags = htonl(nTagCols);
  pTbMetaMsg->numOfColumns = htonl(nCols);
  pTbMetaMsg->tableType = pTbCfg->type;
  pTbMetaMsg->tuid = htobe64(uid);
  pTbMetaMsg->vgId = htonl(pVnode->vgId);

  memcpy(pTbMetaMsg->pSchema, pSW->pSchema, sizeof(SSchema) * pSW->nCols);
  if (nTagCols) {
    memcpy(POINTER_SHIFT(pTbMetaMsg->pSchema, sizeof(SSchema) * pSW->nCols), pTagSchema, sizeof(SSchema) * nTagCols);
  }

  for (int i = 0; i < nCols + nTagCols; i++) {
    SSchema *pSch = pTbMetaMsg->pSchema + i;
    pSch->colId = htonl(pSch->colId);
    pSch->bytes = htonl(pSch->bytes);
  }
H
more  
Hongze Cheng 已提交
117

H
Haojun Liao 已提交
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
  return 0;
}

/**
 * @param pVnode
 * @param pMsg
 * @param pRsp
 */
static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) {
  SMTbCursor* pCur = metaOpenTbCursor(pVnode->pMeta);
  SArray* pArray = taosArrayInit(10, POINTER_BYTES);

  char* name = NULL;
  int32_t totalLen = 0;
  while ((name = metaTbCursorNext(pCur)) != NULL) {
    taosArrayPush(pArray, &name);
    totalLen += strlen(name);
  }

  metaCloseTbCursor(pCur);

  int32_t rowLen = (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE) + 8 + 4 + (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE) + 8 + 4;
  int32_t numOfTables = (int32_t) taosArrayGetSize(pArray);

  int32_t payloadLen = rowLen * numOfTables;
//  SVShowTablesFetchReq *pFetchReq = pMsg->pCont;

  SVShowTablesFetchRsp *pFetchRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp) + payloadLen);
  memset(pFetchRsp, 0, sizeof(struct SVShowTablesFetchRsp) + payloadLen);

  char* p = pFetchRsp->data;
  for(int32_t i = 0; i < numOfTables; ++i) {
    char* n = taosArrayGetP(pArray, i);
    STR_TO_VARSTR(p, n);

    p += rowLen;
  }

  pFetchRsp->numOfRows = htonl(numOfTables);
  pFetchRsp->precision = 0;

  SRpcMsg rpcMsg = {
      .handle  = pMsg->handle,
      .ahandle = pMsg->ahandle,
      .pCont   = pFetchRsp,
      .contLen = sizeof(SVShowTablesFetchRsp) + payloadLen,
      .code    = 0,
  };

  rpcSendResponse(&rpcMsg);
H
more  
Hongze Cheng 已提交
168 169
  return 0;
}