vnodeQuery.c 4.9 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnd.h"
D
dapan1121 已提交
17

S
Shengliang Guan 已提交
18
int vnodeQueryOpen(SVnode *pVnode) {
H
refact  
Hongze Cheng 已提交
19
  return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), NULL, (void **)&pVnode->pQuery, &pVnode->msgCb);
S
Shengliang Guan 已提交
20
}
D
dapan1121 已提交
21

L
Liu Jicong 已提交
22
void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); }
D
dapan1121 已提交
23

H
Hongze Cheng 已提交
24
int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
H
Hongze Cheng 已提交
25 26 27 28 29 30 31 32 33 34 35
  STableInfoReq  infoReq = {0};
  STableMetaRsp  metaRsp = {0};
  SMetaReader    mer1 = {0};
  SMetaReader    mer2 = {0};
  char           tableFName[TSDB_TABLE_FNAME_LEN];
  SRpcMsg        rpcMsg;
  int32_t        code = 0;
  int32_t        rspLen = 0;
  void          *pRsp = NULL;
  SSchemaWrapper schema = {0};
  SSchemaWrapper schemaTag = {0};
H
Hongze Cheng 已提交
36 37 38 39 40 41 42

  // decode req
  if (tDeserializeSTableInfoReq(pMsg->pCont, pMsg->contLen, &infoReq) != 0) {
    code = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }

H
Hongze Cheng 已提交
43
  metaRsp.dbId = pVnode->config.dbId;
D
dapan1121 已提交
44
  strcpy(metaRsp.tbName, infoReq.tbName);
H
Hongze Cheng 已提交
45
  memcpy(metaRsp.dbFName, infoReq.dbFName, sizeof(metaRsp.dbFName));
H
Hongze Cheng 已提交
46

D
dapan1121 已提交
47
  sprintf(tableFName, "%s.%s", infoReq.dbFName, infoReq.tbName);
H
Hongze Cheng 已提交
48
  code = vnodeValidateTableHash(pVnode, tableFName);
D
dapan1121 已提交
49 50 51 52
  if (code) {
    goto _exit;
  }

H
Hongze Cheng 已提交
53
  // query meta
H
Hongze Cheng 已提交
54
  metaReaderInit(&mer1, pVnode->pMeta, 0);
H
more  
Hongze Cheng 已提交
55

H
Hongze Cheng 已提交
56
  if (metaGetTableEntryByName(&mer1, infoReq.tbName) < 0) {
H
Hongze Cheng 已提交
57
    code = terrno;
H
more  
Hongze Cheng 已提交
58
    goto _exit;
H
more  
Hongze Cheng 已提交
59 60
  }

H
Hongze Cheng 已提交
61
  metaRsp.tableType = mer1.me.type;
H
Hongze Cheng 已提交
62
  metaRsp.vgId = TD_VID(pVnode);
H
Hongze Cheng 已提交
63 64 65
  metaRsp.tuid = mer1.me.uid;

  if (mer1.me.type == TSDB_SUPER_TABLE) {
H
Hongze Cheng 已提交
66
    strcpy(metaRsp.stbName, mer1.me.name);
H
Hongze Cheng 已提交
67 68 69 70
    schema = mer1.me.stbEntry.schema;
    schemaTag = mer1.me.stbEntry.schemaTag;
    metaRsp.suid = mer1.me.uid;
  } else if (mer1.me.type == TSDB_CHILD_TABLE) {
H
Hongze Cheng 已提交
71
    metaReaderInit(&mer2, pVnode->pMeta, 0);
H
Hongze Cheng 已提交
72
    if (metaGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit;
H
Hongze Cheng 已提交
73

H
Hongze Cheng 已提交
74
    strcpy(metaRsp.stbName, mer2.me.name);
H
Hongze Cheng 已提交
75 76 77 78 79
    metaRsp.suid = mer2.me.uid;
    schema = mer2.me.stbEntry.schema;
    schemaTag = mer2.me.stbEntry.schemaTag;
  } else if (mer1.me.type == TSDB_NORMAL_TABLE) {
    schema = mer1.me.ntbEntry.schema;
H
Hongze Cheng 已提交
80 81
  } else {
    ASSERT(0);
H
Hongze Cheng 已提交
82 83
  }

H
Hongze Cheng 已提交
84 85 86 87 88 89 90 91 92 93 94
  metaRsp.numOfTags = schemaTag.nCols;
  metaRsp.numOfColumns = schema.nCols;
  metaRsp.precision = pVnode->config.tsdbCfg.precision;
  metaRsp.sversion = schema.sver;
  metaRsp.pSchemas = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * (metaRsp.numOfColumns + metaRsp.numOfTags));

  memcpy(metaRsp.pSchemas, schema.pSchema, sizeof(SSchema) * schema.nCols);
  if (schemaTag.nCols) {
    memcpy(metaRsp.pSchemas + schema.nCols, schemaTag.pSchema, sizeof(SSchema) * schemaTag.nCols);
  }

H
Hongze Cheng 已提交
95
  // encode and send response
D
dapan1121 已提交
96
  rspLen = tSerializeSTableMetaRsp(NULL, 0, &metaRsp);
S
Shengliang Guan 已提交
97 98 99 100 101
  if (rspLen < 0) {
    code = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }

D
dapan1121 已提交
102
  pRsp = rpcMallocCont(rspLen);
S
Shengliang Guan 已提交
103 104 105
  if (pRsp == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
H
Hongze Cheng 已提交
106
  }
S
Shengliang Guan 已提交
107
  tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp);
H
more  
Hongze Cheng 已提交
108

H
Hongze Cheng 已提交
109
_exit:
S
Shengliang Guan 已提交
110 111 112
  rpcMsg.info.handle = pMsg->info.handle;
  rpcMsg.info.ahandle = pMsg->info.ahandle;
  rpcMsg.info.refId = pMsg->info.refId;
S
Shengliang Guan 已提交
113 114
  rpcMsg.pCont = pRsp;
  rpcMsg.contLen = rspLen;
D
dapan1121 已提交
115
  rpcMsg.code = code;
D
dapan1121 已提交
116

S
shm  
Shengliang Guan 已提交
117
  tmsgSendRsp(&rpcMsg);
H
Hongze Cheng 已提交
118

H
Hongze Cheng 已提交
119
  taosMemoryFree(metaRsp.pSchemas);
H
Hongze Cheng 已提交
120 121
  metaReaderClear(&mer2);
  metaReaderClear(&mer1);
D
dapan 已提交
122
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
123
}
H
Hongze Cheng 已提交
124 125

int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
H
refact  
Hongze Cheng 已提交
126
  pLoad->vgId = TD_VID(pVnode);
S
Shengliang Guan 已提交
127
  pLoad->syncState = syncGetMyRole(pVnode->sync);
H
Hongze Cheng 已提交
128 129 130 131 132 133 134 135 136 137 138 139
  pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
  pLoad->numOfTimeSeries = 400;
  pLoad->totalStorage = 300;
  pLoad->compStorage = 200;
  pLoad->pointsWritten = 100;
  pLoad->numOfSelectReqs = 1;
  pLoad->numOfInsertReqs = 3;
  pLoad->numOfInsertSuccessReqs = 2;
  pLoad->numOfBatchInsertReqs = 5;
  pLoad->numOfBatchInsertSuccessReqs = 4;
  return 0;
}
140 141 142 143 144 145 146 147 148

void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) {
  if (dbname) {
    *dbname = pVnode->config.dbname;
  }

  if (vgId) {
    *vgId = TD_VID(pVnode);
  }
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
}

// wrapper of tsdb read interface
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId,
                               void *pMemRef) {
#if 0
  return tsdbQueryCacheLastT(pVnode->pTsdb, pCond, groupList, qId, pMemRef);
#endif
  return 0;
}
int32_t tsdbGetTableGroupFromIdList(SVnode *pVnode, SArray *pTableIdList, STableGroupInfo *pGroupInfo) {
#if 0
  return tsdbGetTableGroupFromIdListT(pVnode->pTsdb, pTableIdList, pGroupInfo);
#endif
  return 0;
164
}