vnodeQuery.c 7.9 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnd.h"
D
dapan1121 已提交
17

S
Shengliang Guan 已提交
18
int vnodeQueryOpen(SVnode *pVnode) {
H
refact  
Hongze Cheng 已提交
19
  return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), NULL, (void **)&pVnode->pQuery, &pVnode->msgCb);
S
Shengliang Guan 已提交
20
}
D
dapan1121 已提交
21

L
Liu Jicong 已提交
22
void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); }
D
dapan1121 已提交
23

H
Hongze Cheng 已提交
24
int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
H
Hongze Cheng 已提交
25 26 27 28 29
  STableInfoReq  infoReq = {0};
  STableMetaRsp  metaRsp = {0};
  SMetaReader    mer1 = {0};
  SMetaReader    mer2 = {0};
  char           tableFName[TSDB_TABLE_FNAME_LEN];
dengyihao's avatar
dengyihao 已提交
30
  SRpcMsg        rpcMsg = {0};
H
Hongze Cheng 已提交
31 32
  int32_t        code = 0;
  int32_t        rspLen = 0;
dengyihao's avatar
dengyihao 已提交
33
  void *         pRsp = NULL;
H
Hongze Cheng 已提交
34 35
  SSchemaWrapper schema = {0};
  SSchemaWrapper schemaTag = {0};
H
Hongze Cheng 已提交
36 37 38 39 40 41 42

  // decode req
  if (tDeserializeSTableInfoReq(pMsg->pCont, pMsg->contLen, &infoReq) != 0) {
    code = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }

H
Hongze Cheng 已提交
43
  metaRsp.dbId = pVnode->config.dbId;
D
dapan1121 已提交
44
  strcpy(metaRsp.tbName, infoReq.tbName);
H
Hongze Cheng 已提交
45
  memcpy(metaRsp.dbFName, infoReq.dbFName, sizeof(metaRsp.dbFName));
H
Hongze Cheng 已提交
46

D
dapan1121 已提交
47
  sprintf(tableFName, "%s.%s", infoReq.dbFName, infoReq.tbName);
H
Hongze Cheng 已提交
48
  code = vnodeValidateTableHash(pVnode, tableFName);
D
dapan1121 已提交
49 50 51 52
  if (code) {
    goto _exit;
  }

H
Hongze Cheng 已提交
53
  // query meta
H
Hongze Cheng 已提交
54
  metaReaderInit(&mer1, pVnode->pMeta, 0);
H
more  
Hongze Cheng 已提交
55

H
Hongze Cheng 已提交
56
  if (metaGetTableEntryByName(&mer1, infoReq.tbName) < 0) {
H
Hongze Cheng 已提交
57
    code = terrno;
H
more  
Hongze Cheng 已提交
58
    goto _exit;
H
more  
Hongze Cheng 已提交
59 60
  }

H
Hongze Cheng 已提交
61
  metaRsp.tableType = mer1.me.type;
H
Hongze Cheng 已提交
62
  metaRsp.vgId = TD_VID(pVnode);
H
Hongze Cheng 已提交
63 64 65
  metaRsp.tuid = mer1.me.uid;

  if (mer1.me.type == TSDB_SUPER_TABLE) {
H
Hongze Cheng 已提交
66
    strcpy(metaRsp.stbName, mer1.me.name);
67
    schema = mer1.me.stbEntry.schemaRow;
H
Hongze Cheng 已提交
68 69 70
    schemaTag = mer1.me.stbEntry.schemaTag;
    metaRsp.suid = mer1.me.uid;
  } else if (mer1.me.type == TSDB_CHILD_TABLE) {
H
Hongze Cheng 已提交
71
    metaReaderInit(&mer2, pVnode->pMeta, 0);
H
Hongze Cheng 已提交
72
    if (metaGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit;
H
Hongze Cheng 已提交
73

H
Hongze Cheng 已提交
74
    strcpy(metaRsp.stbName, mer2.me.name);
H
Hongze Cheng 已提交
75
    metaRsp.suid = mer2.me.uid;
76
    schema = mer2.me.stbEntry.schemaRow;
H
Hongze Cheng 已提交
77 78
    schemaTag = mer2.me.stbEntry.schemaTag;
  } else if (mer1.me.type == TSDB_NORMAL_TABLE) {
79
    schema = mer1.me.ntbEntry.schemaRow;
H
Hongze Cheng 已提交
80 81
  } else {
    ASSERT(0);
H
Hongze Cheng 已提交
82 83
  }

H
Hongze Cheng 已提交
84 85 86
  metaRsp.numOfTags = schemaTag.nCols;
  metaRsp.numOfColumns = schema.nCols;
  metaRsp.precision = pVnode->config.tsdbCfg.precision;
87
  metaRsp.sversion = schema.version;
H
Hongze Cheng 已提交
88 89 90 91 92 93 94
  metaRsp.pSchemas = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * (metaRsp.numOfColumns + metaRsp.numOfTags));

  memcpy(metaRsp.pSchemas, schema.pSchema, sizeof(SSchema) * schema.nCols);
  if (schemaTag.nCols) {
    memcpy(metaRsp.pSchemas + schema.nCols, schemaTag.pSchema, sizeof(SSchema) * schemaTag.nCols);
  }

H
Hongze Cheng 已提交
95
  // encode and send response
D
dapan1121 已提交
96
  rspLen = tSerializeSTableMetaRsp(NULL, 0, &metaRsp);
S
Shengliang Guan 已提交
97 98 99 100 101
  if (rspLen < 0) {
    code = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }

D
dapan1121 已提交
102
  pRsp = rpcMallocCont(rspLen);
S
Shengliang Guan 已提交
103 104 105
  if (pRsp == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
H
Hongze Cheng 已提交
106
  }
S
Shengliang Guan 已提交
107
  tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp);
H
more  
Hongze Cheng 已提交
108

H
Hongze Cheng 已提交
109
_exit:
S
Shengliang Guan 已提交
110
  rpcMsg.info = pMsg->info;
S
Shengliang Guan 已提交
111 112
  rpcMsg.pCont = pRsp;
  rpcMsg.contLen = rspLen;
D
dapan1121 已提交
113
  rpcMsg.code = code;
dengyihao's avatar
dengyihao 已提交
114
  rpcMsg.msgType = pMsg->msgType;
D
dapan1121 已提交
115

D
dapan1121 已提交
116 117 118 119
  if (code) {
    qError("get table %s meta failed cause of %s", infoReq.tbName, tstrerror(code));
  }

S
shm  
Shengliang Guan 已提交
120
  tmsgSendRsp(&rpcMsg);
H
Hongze Cheng 已提交
121

H
Hongze Cheng 已提交
122
  taosMemoryFree(metaRsp.pSchemas);
H
Hongze Cheng 已提交
123 124
  metaReaderClear(&mer2);
  metaReaderClear(&mer1);
D
dapan1121 已提交
125 126 127 128 129 130 131 132 133
  return TSDB_CODE_SUCCESS;
}

int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg) {
  STableCfgReq   cfgReq = {0};
  STableCfgRsp   cfgRsp = {0};
  SMetaReader    mer1 = {0};
  SMetaReader    mer2 = {0};
  char           tableFName[TSDB_TABLE_FNAME_LEN];
dengyihao's avatar
dengyihao 已提交
134
  SRpcMsg        rpcMsg = {0};
D
dapan1121 已提交
135 136
  int32_t        code = 0;
  int32_t        rspLen = 0;
dengyihao's avatar
dengyihao 已提交
137
  void *         pRsp = NULL;
D
dapan1121 已提交
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
  SSchemaWrapper schema = {0};
  SSchemaWrapper schemaTag = {0};

  // decode req
  if (tDeserializeSTableCfgReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
    code = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }

  strcpy(cfgRsp.tbName, cfgReq.tbName);
  memcpy(cfgRsp.dbFName, cfgReq.dbFName, sizeof(cfgRsp.dbFName));

  sprintf(tableFName, "%s.%s", cfgReq.dbFName, cfgReq.tbName);
  code = vnodeValidateTableHash(pVnode, tableFName);
  if (code) {
    goto _exit;
  }

  // query meta
  metaReaderInit(&mer1, pVnode->pMeta, 0);

  if (metaGetTableEntryByName(&mer1, cfgReq.tbName) < 0) {
    code = terrno;
    goto _exit;
  }

  cfgRsp.tableType = mer1.me.type;

  if (mer1.me.type == TSDB_SUPER_TABLE) {
D
dapan1121 已提交
167 168
    code = TSDB_CODE_VND_HASH_MISMATCH;
    goto _exit;
D
dapan1121 已提交
169 170 171 172 173 174 175
  } else if (mer1.me.type == TSDB_CHILD_TABLE) {
    metaReaderInit(&mer2, pVnode->pMeta, 0);
    if (metaGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit;

    strcpy(cfgRsp.stbName, mer2.me.name);
    schema = mer2.me.stbEntry.schemaRow;
    schemaTag = mer2.me.stbEntry.schemaTag;
D
dapan1121 已提交
176 177 178 179 180 181 182 183 184
    cfgRsp.ttl = mer1.me.ctbEntry.ttlDays;
    cfgRsp.commentLen = mer1.me.ctbEntry.commentLen;
    if (mer1.me.ctbEntry.commentLen > 0) {
      cfgRsp.pComment = strdup(mer1.me.ctbEntry.comment);
    }
    STag *pTag = (STag *)mer1.me.ctbEntry.pTags;
    cfgRsp.tagsLen = pTag->len;
    cfgRsp.pTags = taosMemoryMalloc(cfgRsp.tagsLen);
    memcpy(cfgRsp.pTags, pTag, cfgRsp.tagsLen);
D
dapan1121 已提交
185 186
  } else if (mer1.me.type == TSDB_NORMAL_TABLE) {
    schema = mer1.me.ntbEntry.schemaRow;
D
dapan1121 已提交
187 188 189 190 191
    cfgRsp.ttl = mer1.me.ntbEntry.ttlDays;
    cfgRsp.commentLen = mer1.me.ntbEntry.commentLen;
    if (mer1.me.ntbEntry.commentLen > 0) {
      cfgRsp.pComment = strdup(mer1.me.ntbEntry.comment);
    }
D
dapan1121 已提交
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
  } else {
    ASSERT(0);
  }

  cfgRsp.numOfTags = schemaTag.nCols;
  cfgRsp.numOfColumns = schema.nCols;
  cfgRsp.pSchemas = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * (cfgRsp.numOfColumns + cfgRsp.numOfTags));

  memcpy(cfgRsp.pSchemas, schema.pSchema, sizeof(SSchema) * schema.nCols);
  if (schemaTag.nCols) {
    memcpy(cfgRsp.pSchemas + schema.nCols, schemaTag.pSchema, sizeof(SSchema) * schemaTag.nCols);
  }

  // encode and send response
  rspLen = tSerializeSTableCfgRsp(NULL, 0, &cfgRsp);
  if (rspLen < 0) {
    code = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }

  pRsp = rpcMallocCont(rspLen);
  if (pRsp == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
  tSerializeSTableCfgRsp(pRsp, rspLen, &cfgRsp);

_exit:
  rpcMsg.info = pMsg->info;
  rpcMsg.pCont = pRsp;
  rpcMsg.contLen = rspLen;
  rpcMsg.code = code;
dengyihao's avatar
dengyihao 已提交
224
  rpcMsg.msgType = pMsg->msgType;
D
dapan1121 已提交
225 226 227 228 229 230 231

  if (code) {
    qError("get table %s cfg failed cause of %s", cfgReq.tbName, tstrerror(code));
  }

  tmsgSendRsp(&rpcMsg);

D
dapan1121 已提交
232
  tFreeSTableCfgRsp(&cfgRsp);
D
dapan1121 已提交
233 234
  metaReaderClear(&mer2);
  metaReaderClear(&mer1);
D
dapan 已提交
235
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
236
}
H
Hongze Cheng 已提交
237 238

int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
H
refact  
Hongze Cheng 已提交
239
  pLoad->vgId = TD_VID(pVnode);
S
Shengliang Guan 已提交
240
  pLoad->syncState = syncGetMyRole(pVnode->sync);
H
Hongze Cheng 已提交
241 242 243 244 245 246 247 248 249 250 251 252
  pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
  pLoad->numOfTimeSeries = 400;
  pLoad->totalStorage = 300;
  pLoad->compStorage = 200;
  pLoad->pointsWritten = 100;
  pLoad->numOfSelectReqs = 1;
  pLoad->numOfInsertReqs = 3;
  pLoad->numOfInsertSuccessReqs = 2;
  pLoad->numOfBatchInsertReqs = 5;
  pLoad->numOfBatchInsertSuccessReqs = 4;
  return 0;
}
253 254 255 256 257 258 259 260 261

void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) {
  if (dbname) {
    *dbname = pVnode->config.dbname;
  }

  if (vgId) {
    *vgId = TD_VID(pVnode);
  }
262 263 264
}

// wrapper of tsdb read interface
dengyihao's avatar
dengyihao 已提交
265
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableList, uint64_t qId,
266 267 268 269 270
                               void *pMemRef) {
#if 0
  return tsdbQueryCacheLastT(pVnode->pTsdb, pCond, groupList, qId, pMemRef);
#endif
  return 0;
D
dapan1121 已提交
271
}