querymsg.c 8.9 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tmsg.h"
D
dapan1121 已提交
17
#include "queryInt.h"
D
dapan1121 已提交
18
#include "query.h"
D
catalog  
dapan1121 已提交
19
#include "trpc.h"
D
dapan 已提交
20

S
Shengliang Guan 已提交
21 22 23
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wformat-truncation"

H
Hongze Cheng 已提交
24
int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen) = {0};
D
dapan 已提交
25

H
Hongze Cheng 已提交
26
int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize) = {0};
D
dapan 已提交
27

D
dapan1121 已提交
28
int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) {
D
dapan1121 已提交
29 30 31 32 33 34
  if (NULL == input || NULL == msg || NULL == msgLen) {
    return TSDB_CODE_TSC_INVALID_INPUT;
  }

  SBuildTableMetaInput* bInput = (SBuildTableMetaInput *)input;

S
Shengliang Guan 已提交
35
  int32_t estimateSize = sizeof(STableInfoReq);
D
dapan1121 已提交
36 37
  if (NULL == *msg || msgSize < estimateSize) {
    tfree(*msg);
D
catalog  
dapan1121 已提交
38
    *msg = rpcMallocCont(estimateSize);
D
dapan1121 已提交
39 40 41 42 43
    if (NULL == *msg) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }

S
Shengliang Guan 已提交
44
  STableInfoReq *bMsg = (STableInfoReq *)*msg;
D
dapan1121 已提交
45

D
dapan1121 已提交
46
  bMsg->header.vgId = htonl(bInput->vgId);
D
dapan1121 已提交
47

D
dapan1121 已提交
48 49
  if (bInput->dbFName) {
    tstrncpy(bMsg->dbFName, bInput->dbFName, tListLen(bMsg->dbFName));
D
dapan1121 已提交
50 51
  }

D
dapan1121 已提交
52
  tstrncpy(bMsg->tbName, bInput->tbName, tListLen(bMsg->tbName));
D
dapan1121 已提交
53 54 55 56 57

  *msgLen = (int32_t)sizeof(*bMsg);
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
58 59 60 61 62 63 64
int32_t queryBuildUseDbMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) {
  if (NULL == input || NULL == msg || NULL == msgLen) {
    return TSDB_CODE_TSC_INVALID_INPUT;
  }

  SBuildUseDBInput* bInput = (SBuildUseDBInput *)input;

S
Shengliang Guan 已提交
65
  int32_t estimateSize = sizeof(SUseDbReq);
D
dapan1121 已提交
66 67
  if (NULL == *msg || msgSize < estimateSize) {
    tfree(*msg);
D
catalog  
dapan1121 已提交
68
    *msg = rpcMallocCont(estimateSize);
D
dapan1121 已提交
69 70 71 72 73
    if (NULL == *msg) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }

S
Shengliang Guan 已提交
74
  SUseDbReq *bMsg = (SUseDbReq *)*msg;
D
dapan1121 已提交
75 76 77 78

  strncpy(bMsg->db, bInput->db, sizeof(bMsg->db));
  bMsg->db[sizeof(bMsg->db) - 1] = 0;

79
  bMsg->vgVersion = bInput->vgVersion;
D
dapan1121 已提交
80 81 82 83 84 85

  *msgLen = (int32_t)sizeof(*bMsg);

  return TSDB_CODE_SUCCESS;  
}

S
Shengliang Guan 已提交
86
int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
D
dapan1121 已提交
87 88 89 90 91
  if (NULL == output || NULL == msg || msgSize <= 0) {
    return TSDB_CODE_TSC_INVALID_INPUT;
  }

  SUseDbOutput *pOut = (SUseDbOutput *)output;
S
Shengliang Guan 已提交
92
  int32_t       code = 0;
D
dapan1121 已提交
93

S
Shengliang Guan 已提交
94 95 96 97
  SUseDbRsp usedbRsp = {0};
  if (tDeserializeSUseDbRsp(msg, msgSize, &usedbRsp) != 0) {
    qError("invalid use db rsp msg, msgSize:%d", msgSize);
    return TSDB_CODE_INVALID_MSG;
D
dapan1121 已提交
98 99
  }

S
Shengliang Guan 已提交
100 101
  if (usedbRsp.vgNum < 0) {
    qError("invalid db[%s] vgroup number[%d]", usedbRsp.db, usedbRsp.vgNum);
D
dapan1121 已提交
102 103 104
    return TSDB_CODE_TSC_INVALID_VALUE;
  }

D
dapan1121 已提交
105 106 107 108 109 110
  pOut->dbVgroup = calloc(1, sizeof(SDBVgroupInfo));
  if (NULL == pOut->dbVgroup) {
    qError("calloc %d failed", (int32_t)sizeof(SDBVgroupInfo));
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

S
Shengliang Guan 已提交
111 112 113 114 115
  pOut->dbId = usedbRsp.uid;
  pOut->dbVgroup->vgVersion = usedbRsp.vgVersion;
  pOut->dbVgroup->hashMethod = usedbRsp.hashMethod;
  pOut->dbVgroup->vgHash =
      taosHashInit(usedbRsp.vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
116
  if (NULL == pOut->dbVgroup->vgHash) {
S
Shengliang Guan 已提交
117
    qError("taosHashInit %d failed", usedbRsp.vgNum);
D
dapan1121 已提交
118
    tfree(pOut->dbVgroup);
119
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
D
dapan1121 已提交
120 121
  }

S
Shengliang Guan 已提交
122 123
  for (int32_t i = 0; i < usedbRsp.vgNum; ++i) {
    SVgroupInfo *pVgInfo = taosArrayGet(usedbRsp.pVgroupInfos, i);
D
dapan1121 已提交
124

S
Shengliang Guan 已提交
125
    if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pVgInfo->vgId, sizeof(int32_t), pVgInfo, sizeof(SVgroupInfo))) {
D
dapan1121 已提交
126
      qError("taosHashPut failed");
127
      goto _return;
D
dapan1121 已提交
128 129 130
    }
  }

S
Shengliang Guan 已提交
131
  memcpy(pOut->db, usedbRsp.db, TSDB_DB_FNAME_LEN);
132

D
dapan1121 已提交
133 134
  return code;

135
_return:
S
Shengliang Guan 已提交
136
  taosArrayDestroy(usedbRsp.pVgroupInfos);
D
dapan1121 已提交
137

138
  if (pOut) {
D
dapan1121 已提交
139 140
    taosHashCleanup(pOut->dbVgroup->vgHash);
    tfree(pOut->dbVgroup);
D
dapan1121 已提交
141
  }
S
Shengliang Guan 已提交
142

D
dapan1121 已提交
143 144 145
  return code;
}

S
Shengliang Guan 已提交
146
static int32_t queryConvertTableMetaMsg(STableMetaRsp* pMetaMsg) {
D
dapan1121 已提交
147
  pMetaMsg->dbId = be64toh(pMetaMsg->dbId);
D
dapan1121 已提交
148 149 150 151
  pMetaMsg->numOfTags = ntohl(pMetaMsg->numOfTags);
  pMetaMsg->numOfColumns = ntohl(pMetaMsg->numOfColumns);
  pMetaMsg->sversion = ntohl(pMetaMsg->sversion);
  pMetaMsg->tversion = ntohl(pMetaMsg->tversion);
D
dapan1121 已提交
152 153
  pMetaMsg->tuid = be64toh(pMetaMsg->tuid);
  pMetaMsg->suid = be64toh(pMetaMsg->suid);
D
dapan1121 已提交
154
  pMetaMsg->vgId = ntohl(pMetaMsg->vgId);
D
dapan1121 已提交
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184

  if (pMetaMsg->numOfTags < 0 || pMetaMsg->numOfTags > TSDB_MAX_TAGS) {
    qError("invalid numOfTags[%d] in table meta rsp msg", pMetaMsg->numOfTags);
    return TSDB_CODE_TSC_INVALID_VALUE;
  }

  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    qError("invalid numOfColumns[%d] in table meta rsp msg", pMetaMsg->numOfColumns);
    return TSDB_CODE_TSC_INVALID_VALUE;
  }

  if (pMetaMsg->tableType != TSDB_SUPER_TABLE && pMetaMsg->tableType != TSDB_CHILD_TABLE && pMetaMsg->tableType != TSDB_NORMAL_TABLE) {
    qError("invalid tableType[%d] in table meta rsp msg", pMetaMsg->tableType);
    return TSDB_CODE_TSC_INVALID_VALUE;
  }

  if (pMetaMsg->sversion < 0) {
    qError("invalid sversion[%d] in table meta rsp msg", pMetaMsg->sversion);
    return TSDB_CODE_TSC_INVALID_VALUE;
  }

  if (pMetaMsg->tversion < 0) {
    qError("invalid tversion[%d] in table meta rsp msg", pMetaMsg->tversion);
    return TSDB_CODE_TSC_INVALID_VALUE;
  }
  
  SSchema* pSchema = pMetaMsg->pSchema;

  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
  for (int i = 0; i < numOfTotalCols; ++i) {
D
dapan1121 已提交
185 186
    pSchema->bytes = ntohl(pSchema->bytes);
    pSchema->colId = ntohl(pSchema->colId);
D
dapan1121 已提交
187 188 189 190 191 192 193 194 195 196 197 198

    pSchema++;
  }

  if (pMetaMsg->pSchema[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
    qError("invalid colId[%d] for the first column in table meta rsp msg", pMetaMsg->pSchema[0].colId);
    return TSDB_CODE_TSC_INVALID_VALUE;
  }

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
199
int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta **pMeta) {
D
dapan1121 已提交
200 201 202 203 204 205 206 207
  int32_t total = msg->numOfColumns + msg->numOfTags;
  int32_t metaSize = sizeof(STableMeta) + sizeof(SSchema) * total;
  
  STableMeta* pTableMeta = calloc(1, metaSize);
  if (NULL == pTableMeta) {
    qError("calloc size[%d] failed", metaSize);
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
D
dapan1121 已提交
208 209

  pTableMeta->vgId = isSuperTable ? 0 : msg->vgId;
D
dapan1121 已提交
210
  pTableMeta->tableType = isSuperTable ? TSDB_SUPER_TABLE : msg->tableType;
D
dapan1121 已提交
211
  pTableMeta->uid  = isSuperTable ? msg->suid : msg->tuid;
D
dapan1121 已提交
212 213 214 215 216 217 218 219
  pTableMeta->suid = msg->suid;
  pTableMeta->sversion = msg->sversion;
  pTableMeta->tversion = msg->tversion;

  pTableMeta->tableInfo.numOfTags = msg->numOfTags;
  pTableMeta->tableInfo.precision = msg->precision;
  pTableMeta->tableInfo.numOfColumns = msg->numOfColumns;

D
dapan1121 已提交
220 221
  memcpy(pTableMeta->schema, msg->pSchema, sizeof(SSchema) * total);

D
dapan1121 已提交
222 223 224 225 226 227 228 229 230 231 232
  for(int32_t i = 0; i < msg->numOfColumns; ++i) {
    pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes;
  }

  *pMeta = pTableMeta;
  
  return TSDB_CODE_SUCCESS;
}


int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
S
Shengliang Guan 已提交
233
  STableMetaRsp *pMetaMsg = (STableMetaRsp *)msg;
D
dapan1121 已提交
234 235 236 237 238 239 240 241 242 243 244 245
  int32_t code = queryConvertTableMetaMsg(pMetaMsg);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  STableMetaOutput *pOut = (STableMetaOutput *)output;
  
  if (!tIsValidSchema(pMetaMsg->pSchema, pMetaMsg->numOfColumns, pMetaMsg->numOfTags)) {
    qError("validate table meta schema in rsp msg failed");
    return TSDB_CODE_TSC_INVALID_VALUE;
  }

D
dapan1121 已提交
246
  strcpy(pOut->dbFName, pMetaMsg->dbFName);
D
dapan1121 已提交
247 248
  
  pOut->dbId = pMetaMsg->dbId;
D
dapan1121 已提交
249

D
dapan1121 已提交
250
  if (pMetaMsg->tableType == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
251
    SET_META_TYPE_BOTH_TABLE(pOut->metaType);
D
dapan1121 已提交
252

D
dapan1121 已提交
253 254
    strcpy(pOut->ctbName, pMetaMsg->tbName);
    strcpy(pOut->tbName, pMetaMsg->stbName);
D
dapan1121 已提交
255 256 257 258 259 260 261 262
    
    pOut->ctbMeta.vgId = pMetaMsg->vgId;
    pOut->ctbMeta.tableType = pMetaMsg->tableType;
    pOut->ctbMeta.uid = pMetaMsg->tuid;
    pOut->ctbMeta.suid = pMetaMsg->suid;

    code = queryCreateTableMetaFromMsg(pMetaMsg, true, &pOut->tbMeta);
  } else {
D
dapan1121 已提交
263
    SET_META_TYPE_TABLE(pOut->metaType);
D
dapan1121 已提交
264
    
D
dapan1121 已提交
265
    strcpy(pOut->tbName, pMetaMsg->tbName);
D
dapan1121 已提交
266
    
267
    code = queryCreateTableMetaFromMsg(pMetaMsg, (pMetaMsg->tableType == TSDB_SUPER_TABLE), &pOut->tbMeta);
D
dapan1121 已提交
268 269 270 271 272
  }
  
  return code;
}

D
dapan1121 已提交
273

274
void initQueryModuleMsgHandle() {
D
catalog  
dapan1121 已提交
275 276 277
  queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg;
  queryBuildMsg[TMSG_INDEX(TDMT_MND_STB_META)] = queryBuildTableMetaReqMsg;
  queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg;
D
dapan1121 已提交
278

D
catalog  
dapan1121 已提交
279 280 281
  queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
  queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_STB_META)] = queryProcessTableMetaRsp;
  queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp;
D
dapan 已提交
282 283
}

D
dapan1121 已提交
284
#pragma GCC diagnostic pop