querymsg.c 9.9 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tmsg.h"
D
dapan1121 已提交
17
#include "queryInt.h"
D
dapan1121 已提交
18
#include "query.h"
D
catalog  
dapan1121 已提交
19
#include "trpc.h"
D
dapan 已提交
20

S
Shengliang Guan 已提交
21 22 23
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wformat-truncation"

S
Shengliang Guan 已提交
24 25
int32_t (*queryBuildMsg[TDMT_MAX])(void *input, char **msg, int32_t msgSize, int32_t *msgLen) = {0};
int32_t (*queryProcessMsgRsp[TDMT_MAX])(void *output, char *msg, int32_t msgSize) = {0};
D
dapan 已提交
26

X
Xiaoyu Wang 已提交
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) {
  memcpy(pOut->db, usedbRsp->db, TSDB_DB_FNAME_LEN);
  pOut->dbId = usedbRsp->uid;
  pOut->dbVgroup = calloc(1, sizeof(SDBVgInfo));
  if (NULL == pOut->dbVgroup) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  pOut->dbVgroup->vgVersion = usedbRsp->vgVersion;
  pOut->dbVgroup->hashMethod = usedbRsp->hashMethod;
  pOut->dbVgroup->vgHash =
      taosHashInit(usedbRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
  if (NULL == pOut->dbVgroup->vgHash) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  for (int32_t i = 0; i < usedbRsp->vgNum; ++i) {
    SVgroupInfo *pVgInfo = taosArrayGet(usedbRsp->pVgroupInfos, i);
D
dapan 已提交
45
    pOut->dbVgroup->numOfTable += pVgInfo->numOfTable;
X
Xiaoyu Wang 已提交
46 47 48 49 50 51 52 53
    if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pVgInfo->vgId, sizeof(int32_t), pVgInfo, sizeof(SVgroupInfo))) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
54 55
int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) {
  SBuildTableMetaInput *pInput = input;
D
dapan1121 已提交
56 57 58 59
  if (NULL == input || NULL == msg || NULL == msgLen) {
    return TSDB_CODE_TSC_INVALID_INPUT;
  }

S
Shengliang Guan 已提交
60 61
  STableInfoReq infoReq = {0};
  infoReq.header.vgId = pInput->vgId;
S
Shengliang Guan 已提交
62
  if (pInput->dbFName) {
S
Shengliang Guan 已提交
63
    tstrncpy(infoReq.dbFName, pInput->dbFName, TSDB_DB_FNAME_LEN);
D
dapan1121 已提交
64
  }
S
Shengliang Guan 已提交
65
  tstrncpy(infoReq.tbName, pInput->tbName, TSDB_TABLE_NAME_LEN);
D
dapan1121 已提交
66

S
Shengliang Guan 已提交
67 68 69 70 71 72
  int32_t bufLen = tSerializeSTableInfoReq(NULL, 0, &infoReq);
  void   *pBuf = rpcMallocCont(bufLen);
  tSerializeSTableInfoReq(pBuf, bufLen, &infoReq);

  *msg = pBuf;
  *msgLen = bufLen;
D
dapan1121 已提交
73 74 75 76

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
77
int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) {
S
Shengliang Guan 已提交
78 79
  SBuildUseDBInput *pInput = input;
  if (NULL == pInput || NULL == msg || NULL == msgLen) {
D
dapan1121 已提交
80 81 82
    return TSDB_CODE_TSC_INVALID_INPUT;
  }

S
Shengliang Guan 已提交
83
  SUseDbReq usedbReq = {0};
S
Shengliang Guan 已提交
84
  strncpy(usedbReq.db, pInput->db, sizeof(usedbReq.db));
S
Shengliang Guan 已提交
85
  usedbReq.db[sizeof(usedbReq.db) - 1] = 0;
S
Shengliang Guan 已提交
86
  usedbReq.vgVersion = pInput->vgVersion;
X
Xiaoyu Wang 已提交
87
  usedbReq.dbId = pInput->dbId;
D
dapan 已提交
88
  usedbReq.numOfTable = pInput->numOfTable;
D
dapan1121 已提交
89

S
Shengliang Guan 已提交
90 91 92
  int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq);
  void   *pBuf = rpcMallocCont(bufLen);
  tSerializeSUseDbReq(pBuf, bufLen, &usedbReq);
D
dapan1121 已提交
93

S
Shengliang Guan 已提交
94 95
  *msg = pBuf;
  *msgLen = bufLen;
D
dapan1121 已提交
96

S
Shengliang Guan 已提交
97
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
98 99
}

D
dapan1121 已提交
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) {
  if (NULL == msg || NULL == msgLen) {
    return TSDB_CODE_TSC_INVALID_INPUT;
  }

  SQnodeListReq qnodeListReq = {0};
  qnodeListReq.rowNum = -1;

  int32_t bufLen = tSerializeSQnodeListReq(NULL, 0, &qnodeListReq);
  void   *pBuf = rpcMallocCont(bufLen);
  tSerializeSQnodeListReq(pBuf, bufLen, &qnodeListReq);

  *msg = pBuf;
  *msgLen = bufLen;

  return TSDB_CODE_SUCCESS;
}


S
Shengliang Guan 已提交
119
int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
S
Shengliang Guan 已提交
120 121 122 123
  SUseDbOutput *pOut = output;
  SUseDbRsp     usedbRsp = {0};
  int32_t       code = -1;

D
dapan1121 已提交
124
  if (NULL == output || NULL == msg || msgSize <= 0) {
S
Shengliang Guan 已提交
125 126
    code = TSDB_CODE_TSC_INVALID_INPUT;
    goto PROCESS_USEDB_OVER;
D
dapan1121 已提交
127 128
  }

S
Shengliang Guan 已提交
129 130
  if (tDeserializeSUseDbRsp(msg, msgSize, &usedbRsp) != 0) {
    qError("invalid use db rsp msg, msgSize:%d", msgSize);
S
Shengliang Guan 已提交
131 132
    code = TSDB_CODE_INVALID_MSG;
    goto PROCESS_USEDB_OVER;
D
dapan1121 已提交
133 134
  }

S
Shengliang Guan 已提交
135 136
  if (usedbRsp.vgNum < 0) {
    qError("invalid db[%s] vgroup number[%d]", usedbRsp.db, usedbRsp.vgNum);
S
Shengliang Guan 已提交
137 138
    code = TSDB_CODE_TSC_INVALID_VALUE;
    goto PROCESS_USEDB_OVER;
D
dapan1121 已提交
139 140
  }

X
Xiaoyu Wang 已提交
141
  code = queryBuildUseDbOutput(pOut, &usedbRsp);
142

S
Shengliang Guan 已提交
143
PROCESS_USEDB_OVER:
X
Xiaoyu Wang 已提交
144

S
Shengliang Guan 已提交
145 146 147 148 149 150
  if (code != 0) {
    if (pOut) {
      if (pOut->dbVgroup) taosHashCleanup(pOut->dbVgroup->vgHash);
      tfree(pOut->dbVgroup);
    }
    qError("failed to process usedb rsp since %s", terrstr());
D
dapan1121 已提交
151
  }
S
Shengliang Guan 已提交
152

S
Shengliang Guan 已提交
153
  tFreeSUsedbRsp(&usedbRsp);
D
dapan1121 已提交
154 155 156
  return code;
}

S
Shengliang Guan 已提交
157
static int32_t queryConvertTableMetaMsg(STableMetaRsp *pMetaMsg) {
D
dapan1121 已提交
158 159 160 161 162 163 164 165 166 167
  if (pMetaMsg->numOfTags < 0 || pMetaMsg->numOfTags > TSDB_MAX_TAGS) {
    qError("invalid numOfTags[%d] in table meta rsp msg", pMetaMsg->numOfTags);
    return TSDB_CODE_TSC_INVALID_VALUE;
  }

  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    qError("invalid numOfColumns[%d] in table meta rsp msg", pMetaMsg->numOfColumns);
    return TSDB_CODE_TSC_INVALID_VALUE;
  }

S
Shengliang Guan 已提交
168 169
  if (pMetaMsg->tableType != TSDB_SUPER_TABLE && pMetaMsg->tableType != TSDB_CHILD_TABLE &&
      pMetaMsg->tableType != TSDB_NORMAL_TABLE) {
D
dapan1121 已提交
170 171 172 173 174 175 176 177 178 179 180 181 182 183
    qError("invalid tableType[%d] in table meta rsp msg", pMetaMsg->tableType);
    return TSDB_CODE_TSC_INVALID_VALUE;
  }

  if (pMetaMsg->sversion < 0) {
    qError("invalid sversion[%d] in table meta rsp msg", pMetaMsg->sversion);
    return TSDB_CODE_TSC_INVALID_VALUE;
  }

  if (pMetaMsg->tversion < 0) {
    qError("invalid tversion[%d] in table meta rsp msg", pMetaMsg->tversion);
    return TSDB_CODE_TSC_INVALID_VALUE;
  }

S
Shengliang Guan 已提交
184 185
  if (pMetaMsg->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
    qError("invalid colId[%d] for the first column in table meta rsp msg", pMetaMsg->pSchemas[0].colId);
D
dapan1121 已提交
186 187 188 189 190 191
    return TSDB_CODE_TSC_INVALID_VALUE;
  }

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
192
int32_t queryCreateTableMetaFromMsg(STableMetaRsp *msg, bool isSuperTable, STableMeta **pMeta) {
D
dapan1121 已提交
193 194
  int32_t total = msg->numOfColumns + msg->numOfTags;
  int32_t metaSize = sizeof(STableMeta) + sizeof(SSchema) * total;
S
Shengliang Guan 已提交
195 196

  STableMeta *pTableMeta = calloc(1, metaSize);
D
dapan1121 已提交
197 198 199 200
  if (NULL == pTableMeta) {
    qError("calloc size[%d] failed", metaSize);
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
D
dapan1121 已提交
201 202

  pTableMeta->vgId = isSuperTable ? 0 : msg->vgId;
D
dapan1121 已提交
203
  pTableMeta->tableType = isSuperTable ? TSDB_SUPER_TABLE : msg->tableType;
S
Shengliang Guan 已提交
204
  pTableMeta->uid = isSuperTable ? msg->suid : msg->tuid;
D
dapan1121 已提交
205 206 207 208 209 210 211 212
  pTableMeta->suid = msg->suid;
  pTableMeta->sversion = msg->sversion;
  pTableMeta->tversion = msg->tversion;

  pTableMeta->tableInfo.numOfTags = msg->numOfTags;
  pTableMeta->tableInfo.precision = msg->precision;
  pTableMeta->tableInfo.numOfColumns = msg->numOfColumns;

S
Shengliang Guan 已提交
213
  memcpy(pTableMeta->schema, msg->pSchemas, sizeof(SSchema) * total);
D
dapan1121 已提交
214

S
Shengliang Guan 已提交
215
  for (int32_t i = 0; i < msg->numOfColumns; ++i) {
D
dapan1121 已提交
216 217 218 219 220 221 222
    pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes;
  }

  *pMeta = pTableMeta;
  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
223
int32_t queryProcessTableMetaRsp(void *output, char *msg, int32_t msgSize) {
D
dapan1121 已提交
224
  int32_t       code = 0;
S
Shengliang Guan 已提交
225 226 227 228 229 230
  STableMetaRsp metaRsp = {0};

  if (NULL == output || NULL == msg || msgSize <= 0) {
    code = TSDB_CODE_TSC_INVALID_INPUT;
    goto PROCESS_META_OVER;
  }
D
dapan1121 已提交
231

S
Shengliang Guan 已提交
232 233 234 235 236 237
  if (tDeserializeSTableMetaRsp(msg, msgSize, &metaRsp) != 0) {
    code = TSDB_CODE_INVALID_MSG;
    goto PROCESS_META_OVER;
  }

  code = queryConvertTableMetaMsg(&metaRsp);
D
dapan1121 已提交
238
  if (code != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
239
    goto PROCESS_META_OVER;
D
dapan1121 已提交
240 241
  }

D
dapan1121 已提交
242
  if (0 != strcmp(metaRsp.dbFName, TSDB_INFORMATION_SCHEMA_DB) && !tIsValidSchema(metaRsp.pSchemas, metaRsp.numOfColumns, metaRsp.numOfTags)) {
S
Shengliang Guan 已提交
243 244
    code = TSDB_CODE_TSC_INVALID_VALUE;
    goto PROCESS_META_OVER;
D
dapan1121 已提交
245 246
  }

S
Shengliang Guan 已提交
247 248 249
  STableMetaOutput *pOut = output;
  strcpy(pOut->dbFName, metaRsp.dbFName);
  pOut->dbId = metaRsp.dbId;
D
dapan1121 已提交
250

S
Shengliang Guan 已提交
251
  if (metaRsp.tableType == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
252
    SET_META_TYPE_BOTH_TABLE(pOut->metaType);
D
dapan1121 已提交
253

S
Shengliang Guan 已提交
254 255
    strcpy(pOut->ctbName, metaRsp.tbName);
    strcpy(pOut->tbName, metaRsp.stbName);
D
dapan1121 已提交
256

S
Shengliang Guan 已提交
257 258 259 260 261 262
    pOut->ctbMeta.vgId = metaRsp.vgId;
    pOut->ctbMeta.tableType = metaRsp.tableType;
    pOut->ctbMeta.uid = metaRsp.tuid;
    pOut->ctbMeta.suid = metaRsp.suid;

    code = queryCreateTableMetaFromMsg(&metaRsp, true, &pOut->tbMeta);
D
dapan1121 已提交
263
  } else {
D
dapan1121 已提交
264
    SET_META_TYPE_TABLE(pOut->metaType);
S
Shengliang Guan 已提交
265 266 267 268 269 270
    strcpy(pOut->tbName, metaRsp.tbName);
    code = queryCreateTableMetaFromMsg(&metaRsp, (metaRsp.tableType == TSDB_SUPER_TABLE), &pOut->tbMeta);
  }

PROCESS_META_OVER:
  if (code != 0) {
D
dapan1121 已提交
271
    qError("failed to process table meta rsp since %s", tstrerror(code));
D
dapan1121 已提交
272
  }
S
Shengliang Guan 已提交
273 274

  tFreeSTableMetaRsp(&metaRsp);
D
dapan1121 已提交
275 276 277
  return code;
}

D
dapan1121 已提交
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306

int32_t queryProcessQnodeListRsp(void *output, char *msg, int32_t msgSize) {
  SQnodeListRsp out = {0};
  int32_t       code = -1;

  if (NULL == output || NULL == msg || msgSize <= 0) {
    code = TSDB_CODE_TSC_INVALID_INPUT;
    goto PROCESS_QLIST_OVER;
  }

  if (tDeserializeSQnodeListRsp(msg, msgSize, &out) != 0) {
    qError("invalid qnode list rsp msg, msgSize:%d", msgSize);
    code = TSDB_CODE_INVALID_MSG;
    goto PROCESS_QLIST_OVER;
  }

PROCESS_QLIST_OVER:

  if (code != 0) {
    tFreeSQnodeListRsp(&out);
    out.epSetList = NULL;
  }

  *(SArray **)output = out.epSetList;

  return code;
}


307
void initQueryModuleMsgHandle() {
D
catalog  
dapan1121 已提交
308
  queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg;
D
dapan1121 已提交
309
  queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg;
D
catalog  
dapan1121 已提交
310
  queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg;
D
dapan1121 已提交
311
  queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg;
D
dapan1121 已提交
312

D
catalog  
dapan1121 已提交
313
  queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
D
dapan1121 已提交
314
  queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
D
catalog  
dapan1121 已提交
315
  queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp;
D
dapan1121 已提交
316
  queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp;
D
dapan 已提交
317 318
}

D
dapan1121 已提交
319
#pragma GCC diagnostic pop