querymsg.c 8.7 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tmsg.h"
D
dapan1121 已提交
17
#include "queryInt.h"
D
dapan1121 已提交
18
#include "query.h"
D
catalog  
dapan1121 已提交
19
#include "trpc.h"
D
dapan 已提交
20

S
Shengliang Guan 已提交
21 22 23
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wformat-truncation"

H
Hongze Cheng 已提交
24
int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen) = {0};
D
dapan 已提交
25

H
Hongze Cheng 已提交
26
int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize) = {0};
D
dapan 已提交
27

D
dapan1121 已提交
28
int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) {
D
dapan1121 已提交
29 30 31 32 33 34
  if (NULL == input || NULL == msg || NULL == msgLen) {
    return TSDB_CODE_TSC_INVALID_INPUT;
  }

  SBuildTableMetaInput* bInput = (SBuildTableMetaInput *)input;

S
Shengliang Guan 已提交
35
  int32_t estimateSize = sizeof(STableInfoReq);
D
dapan1121 已提交
36 37
  if (NULL == *msg || msgSize < estimateSize) {
    tfree(*msg);
D
catalog  
dapan1121 已提交
38
    *msg = rpcMallocCont(estimateSize);
D
dapan1121 已提交
39 40 41 42 43
    if (NULL == *msg) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }

S
Shengliang Guan 已提交
44
  STableInfoReq *bMsg = (STableInfoReq *)*msg;
D
dapan1121 已提交
45

D
dapan1121 已提交
46
  bMsg->header.vgId = htonl(bInput->vgId);
D
dapan1121 已提交
47

D
dapan1121 已提交
48 49
  if (bInput->dbFName) {
    tstrncpy(bMsg->dbFName, bInput->dbFName, tListLen(bMsg->dbFName));
D
dapan1121 已提交
50 51
  }

D
dapan1121 已提交
52
  tstrncpy(bMsg->tbName, bInput->tbName, tListLen(bMsg->tbName));
D
dapan1121 已提交
53 54 55 56 57

  *msgLen = (int32_t)sizeof(*bMsg);
  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
58
int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) {
D
dapan1121 已提交
59 60 61 62
  if (NULL == input || NULL == msg || NULL == msgLen) {
    return TSDB_CODE_TSC_INVALID_INPUT;
  }

S
Shengliang Guan 已提交
63
  SBuildUseDBInput *bInput = input;
D
dapan1121 已提交
64

S
Shengliang Guan 已提交
65 66 67 68
  SUseDbReq usedbReq = {0};
  strncpy(usedbReq.db, bInput->db, sizeof(usedbReq.db));
  usedbReq.db[sizeof(usedbReq.db) - 1] = 0;
  usedbReq.vgVersion = bInput->vgVersion;
D
dapan1121 已提交
69

S
Shengliang Guan 已提交
70 71 72
  int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq);
  void   *pBuf = rpcMallocCont(bufLen);
  tSerializeSUseDbReq(pBuf, bufLen, &usedbReq);
D
dapan1121 已提交
73

S
Shengliang Guan 已提交
74 75
  *msg = pBuf;
  *msgLen = bufLen;
D
dapan1121 已提交
76

S
Shengliang Guan 已提交
77
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
78 79
}

S
Shengliang Guan 已提交
80
int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
D
dapan1121 已提交
81 82 83 84 85
  if (NULL == output || NULL == msg || msgSize <= 0) {
    return TSDB_CODE_TSC_INVALID_INPUT;
  }

  SUseDbOutput *pOut = (SUseDbOutput *)output;
S
Shengliang Guan 已提交
86
  int32_t       code = 0;
D
dapan1121 已提交
87

S
Shengliang Guan 已提交
88 89 90 91
  SUseDbRsp usedbRsp = {0};
  if (tDeserializeSUseDbRsp(msg, msgSize, &usedbRsp) != 0) {
    qError("invalid use db rsp msg, msgSize:%d", msgSize);
    return TSDB_CODE_INVALID_MSG;
D
dapan1121 已提交
92 93
  }

S
Shengliang Guan 已提交
94 95
  if (usedbRsp.vgNum < 0) {
    qError("invalid db[%s] vgroup number[%d]", usedbRsp.db, usedbRsp.vgNum);
D
dapan1121 已提交
96 97 98
    return TSDB_CODE_TSC_INVALID_VALUE;
  }

D
dapan1121 已提交
99
  pOut->dbVgroup = calloc(1, sizeof(SDBVgInfo));
D
dapan1121 已提交
100
  if (NULL == pOut->dbVgroup) {
D
dapan1121 已提交
101
    qError("calloc %d failed", (int32_t)sizeof(SDBVgInfo));
D
dapan1121 已提交
102 103 104
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

S
Shengliang Guan 已提交
105 106 107 108 109
  pOut->dbId = usedbRsp.uid;
  pOut->dbVgroup->vgVersion = usedbRsp.vgVersion;
  pOut->dbVgroup->hashMethod = usedbRsp.hashMethod;
  pOut->dbVgroup->vgHash =
      taosHashInit(usedbRsp.vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
110
  if (NULL == pOut->dbVgroup->vgHash) {
S
Shengliang Guan 已提交
111
    qError("taosHashInit %d failed", usedbRsp.vgNum);
D
dapan1121 已提交
112
    tfree(pOut->dbVgroup);
113
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
D
dapan1121 已提交
114 115
  }

S
Shengliang Guan 已提交
116 117
  for (int32_t i = 0; i < usedbRsp.vgNum; ++i) {
    SVgroupInfo *pVgInfo = taosArrayGet(usedbRsp.pVgroupInfos, i);
D
dapan1121 已提交
118

S
Shengliang Guan 已提交
119
    if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pVgInfo->vgId, sizeof(int32_t), pVgInfo, sizeof(SVgroupInfo))) {
D
dapan1121 已提交
120
      qError("taosHashPut failed");
121
      goto _return;
D
dapan1121 已提交
122 123 124
    }
  }

S
Shengliang Guan 已提交
125
  memcpy(pOut->db, usedbRsp.db, TSDB_DB_FNAME_LEN);
126

D
dapan1121 已提交
127 128
  return code;

129
_return:
S
Shengliang Guan 已提交
130
  tFreeSUsedbRsp(&usedbRsp);
D
dapan1121 已提交
131

132
  if (pOut) {
D
dapan1121 已提交
133 134
    taosHashCleanup(pOut->dbVgroup->vgHash);
    tfree(pOut->dbVgroup);
D
dapan1121 已提交
135
  }
S
Shengliang Guan 已提交
136

D
dapan1121 已提交
137 138 139
  return code;
}

S
Shengliang Guan 已提交
140
static int32_t queryConvertTableMetaMsg(STableMetaRsp* pMetaMsg) {
D
dapan1121 已提交
141
  pMetaMsg->dbId = be64toh(pMetaMsg->dbId);
D
dapan1121 已提交
142 143 144 145
  pMetaMsg->numOfTags = ntohl(pMetaMsg->numOfTags);
  pMetaMsg->numOfColumns = ntohl(pMetaMsg->numOfColumns);
  pMetaMsg->sversion = ntohl(pMetaMsg->sversion);
  pMetaMsg->tversion = ntohl(pMetaMsg->tversion);
D
dapan1121 已提交
146 147
  pMetaMsg->tuid = be64toh(pMetaMsg->tuid);
  pMetaMsg->suid = be64toh(pMetaMsg->suid);
D
dapan1121 已提交
148
  pMetaMsg->vgId = ntohl(pMetaMsg->vgId);
D
dapan1121 已提交
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178

  if (pMetaMsg->numOfTags < 0 || pMetaMsg->numOfTags > TSDB_MAX_TAGS) {
    qError("invalid numOfTags[%d] in table meta rsp msg", pMetaMsg->numOfTags);
    return TSDB_CODE_TSC_INVALID_VALUE;
  }

  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    qError("invalid numOfColumns[%d] in table meta rsp msg", pMetaMsg->numOfColumns);
    return TSDB_CODE_TSC_INVALID_VALUE;
  }

  if (pMetaMsg->tableType != TSDB_SUPER_TABLE && pMetaMsg->tableType != TSDB_CHILD_TABLE && pMetaMsg->tableType != TSDB_NORMAL_TABLE) {
    qError("invalid tableType[%d] in table meta rsp msg", pMetaMsg->tableType);
    return TSDB_CODE_TSC_INVALID_VALUE;
  }

  if (pMetaMsg->sversion < 0) {
    qError("invalid sversion[%d] in table meta rsp msg", pMetaMsg->sversion);
    return TSDB_CODE_TSC_INVALID_VALUE;
  }

  if (pMetaMsg->tversion < 0) {
    qError("invalid tversion[%d] in table meta rsp msg", pMetaMsg->tversion);
    return TSDB_CODE_TSC_INVALID_VALUE;
  }
  
  SSchema* pSchema = pMetaMsg->pSchema;

  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
  for (int i = 0; i < numOfTotalCols; ++i) {
D
dapan1121 已提交
179 180
    pSchema->bytes = ntohl(pSchema->bytes);
    pSchema->colId = ntohl(pSchema->colId);
D
dapan1121 已提交
181 182 183 184 185 186 187 188 189 190 191 192

    pSchema++;
  }

  if (pMetaMsg->pSchema[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
    qError("invalid colId[%d] for the first column in table meta rsp msg", pMetaMsg->pSchema[0].colId);
    return TSDB_CODE_TSC_INVALID_VALUE;
  }

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
193
int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta **pMeta) {
D
dapan1121 已提交
194 195 196 197 198 199 200 201
  int32_t total = msg->numOfColumns + msg->numOfTags;
  int32_t metaSize = sizeof(STableMeta) + sizeof(SSchema) * total;
  
  STableMeta* pTableMeta = calloc(1, metaSize);
  if (NULL == pTableMeta) {
    qError("calloc size[%d] failed", metaSize);
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
D
dapan1121 已提交
202 203

  pTableMeta->vgId = isSuperTable ? 0 : msg->vgId;
D
dapan1121 已提交
204
  pTableMeta->tableType = isSuperTable ? TSDB_SUPER_TABLE : msg->tableType;
D
dapan1121 已提交
205
  pTableMeta->uid  = isSuperTable ? msg->suid : msg->tuid;
D
dapan1121 已提交
206 207 208 209 210 211 212 213
  pTableMeta->suid = msg->suid;
  pTableMeta->sversion = msg->sversion;
  pTableMeta->tversion = msg->tversion;

  pTableMeta->tableInfo.numOfTags = msg->numOfTags;
  pTableMeta->tableInfo.precision = msg->precision;
  pTableMeta->tableInfo.numOfColumns = msg->numOfColumns;

D
dapan1121 已提交
214 215
  memcpy(pTableMeta->schema, msg->pSchema, sizeof(SSchema) * total);

D
dapan1121 已提交
216 217 218 219 220 221 222 223 224 225 226
  for(int32_t i = 0; i < msg->numOfColumns; ++i) {
    pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes;
  }

  *pMeta = pTableMeta;
  
  return TSDB_CODE_SUCCESS;
}


int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
S
Shengliang Guan 已提交
227
  STableMetaRsp *pMetaMsg = (STableMetaRsp *)msg;
D
dapan1121 已提交
228 229 230 231 232 233 234 235 236 237 238 239
  int32_t code = queryConvertTableMetaMsg(pMetaMsg);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  STableMetaOutput *pOut = (STableMetaOutput *)output;
  
  if (!tIsValidSchema(pMetaMsg->pSchema, pMetaMsg->numOfColumns, pMetaMsg->numOfTags)) {
    qError("validate table meta schema in rsp msg failed");
    return TSDB_CODE_TSC_INVALID_VALUE;
  }

D
dapan1121 已提交
240
  strcpy(pOut->dbFName, pMetaMsg->dbFName);
D
dapan1121 已提交
241 242
  
  pOut->dbId = pMetaMsg->dbId;
D
dapan1121 已提交
243

D
dapan1121 已提交
244
  if (pMetaMsg->tableType == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
245
    SET_META_TYPE_BOTH_TABLE(pOut->metaType);
D
dapan1121 已提交
246

D
dapan1121 已提交
247 248
    strcpy(pOut->ctbName, pMetaMsg->tbName);
    strcpy(pOut->tbName, pMetaMsg->stbName);
D
dapan1121 已提交
249 250 251 252 253 254 255 256
    
    pOut->ctbMeta.vgId = pMetaMsg->vgId;
    pOut->ctbMeta.tableType = pMetaMsg->tableType;
    pOut->ctbMeta.uid = pMetaMsg->tuid;
    pOut->ctbMeta.suid = pMetaMsg->suid;

    code = queryCreateTableMetaFromMsg(pMetaMsg, true, &pOut->tbMeta);
  } else {
D
dapan1121 已提交
257
    SET_META_TYPE_TABLE(pOut->metaType);
D
dapan1121 已提交
258
    
D
dapan1121 已提交
259
    strcpy(pOut->tbName, pMetaMsg->tbName);
D
dapan1121 已提交
260
    
261
    code = queryCreateTableMetaFromMsg(pMetaMsg, (pMetaMsg->tableType == TSDB_SUPER_TABLE), &pOut->tbMeta);
D
dapan1121 已提交
262 263 264 265 266
  }
  
  return code;
}

D
dapan1121 已提交
267

268
void initQueryModuleMsgHandle() {
D
catalog  
dapan1121 已提交
269 270 271
  queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg;
  queryBuildMsg[TMSG_INDEX(TDMT_MND_STB_META)] = queryBuildTableMetaReqMsg;
  queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg;
D
dapan1121 已提交
272

D
catalog  
dapan1121 已提交
273 274 275
  queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
  queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_STB_META)] = queryProcessTableMetaRsp;
  queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp;
D
dapan 已提交
276 277
}

D
dapan1121 已提交
278
#pragma GCC diagnostic pop