tscServer.c 87.9 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
Haojun Liao 已提交
17
#include "tcmdtype.h"
H
hzcheng 已提交
18
#include "trpc.h"
19 20
#include "tscLocalMerge.h"
#include "tscLog.h"
H
hzcheng 已提交
21 22 23 24 25
#include "tscProfile.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "ttimer.h"
dengyihao's avatar
dengyihao 已提交
26
#include "tlockfree.h"
H
hzcheng 已提交
27

28 29
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
30 31 32
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
33

B
Bomin Zhang 已提交
34
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt);
35 36
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
37

S
slguan 已提交
38
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
Haojun Liao 已提交
39 40 41 42 43 44
static int32_t getWaitingTimeInterval(int32_t count) {
  int32_t initial = 100; // 100 ms by default
  if (count <= 1) {
    return 0;
  }

45
  return initial * ((2u)<<(count - 2));
H
Haojun Liao 已提交
46
}
H
hzcheng 已提交
47

48 49
static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupInfo* pVgroupInfo) {
  assert(pEpSet != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);
H
Haojun Liao 已提交
50 51 52 53

  // Issue the query to one of the vnode among a vgroup randomly.
  // change the inUse property would not affect the isUse attribute of STableMeta
  pEpSet->inUse = rand() % pVgroupInfo->numOfEps;
54 55

  // apply the FQDN string length check here
56
  bool existed = false;
dengyihao's avatar
bugfix  
dengyihao 已提交
57

58 59 60
  pEpSet->numOfEps = pVgroupInfo->numOfEps;
  for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
61

62 63 64 65
    int32_t len = (int32_t) strnlen(pVgroupInfo->epAddr[i].fqdn, TSDB_FQDN_LEN);
    if (len > 0) {
      tstrncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(pEpSet->fqdn[i]));
      existed = true;
66
    }
67
  }
68
  assert(existed);
69
}
70

dengyihao's avatar
TD-2257  
dengyihao 已提交
71 72 73 74 75
static void tscDumpMgmtEpSet(SSqlObj *pSql) {
  SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet;
  taosCorBeginRead(&pCorEpSet->version);
  pSql->epSet = pCorEpSet->epSet;
  taosCorEndRead(&pCorEpSet->version);
76
}  
dengyihao's avatar
dengyihao 已提交
77 78
static void tscEpSetHtons(SRpcEpSet *s) {
   for (int32_t i = 0; i < s->numOfEps; i++) {
dengyihao's avatar
fixbug  
dengyihao 已提交
79 80 81
      s->port[i] = htons(s->port[i]);    
   }
} 
dengyihao's avatar
dengyihao 已提交
82 83
bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
   if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
84
     return false;
85 86
   }

dengyihao's avatar
dengyihao 已提交
87
   for (int32_t i = 0; i < s1->numOfEps; i++) {
88 89 90 91 92
     if (s1->port[i] != s2->port[i] 
        || strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0)
        return false;
   }
   return true;
S
slguan 已提交
93
}
94

dengyihao's avatar
TD-2257  
dengyihao 已提交
95
void tscUpdateMgmtEpSet(SSqlObj *pSql, SRpcEpSet *pEpSet) {
96
  // no need to update if equal
dengyihao's avatar
TD-2257  
dengyihao 已提交
97 98 99 100
  SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet;
  taosCorBeginWrite(&pCorEpSet->version);
  pCorEpSet->epSet = *pEpSet;
  taosCorEndWrite(&pCorEpSet->version);
101
}
102

103
static void tscDumpEpSetFromVgroupInfo(SRpcEpSet *pEpSet, SNewVgroupInfo *pVgroupInfo) {
dengyihao's avatar
dengyihao 已提交
104
  if (pVgroupInfo == NULL) { return;}
dengyihao's avatar
dengyihao 已提交
105
  int8_t inUse = pVgroupInfo->inUse;
dengyihao's avatar
dengyihao 已提交
106 107 108
  pEpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0; 
  pEpSet->numOfEps = pVgroupInfo->numOfEps;  
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
109 110
    tstrncpy(pEpSet->fqdn[i], pVgroupInfo->ep[i].fqdn, sizeof(pEpSet->fqdn[i]));
    pEpSet->port[i] = pVgroupInfo->ep[i].port;
S
slguan 已提交
111 112 113
  }
}

dengyihao's avatar
dengyihao 已提交
114
static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
115 116
  SSqlCmd *pCmd = &pObj->cmd;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
117 118 119
  if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
120

121
  int32_t vgId = pTableMetaInfo->pTableMeta->vgId;
122 123 124 125
  if (pTableMetaInfo->pTableMeta->tableType == TSDB_SUPER_TABLE) {
    assert(vgId == 0);
    return;
  }
126 127

  SNewVgroupInfo vgroupInfo = {.vgId = -1};
128
  taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
129 130 131 132 133 134 135 136
  assert(vgroupInfo.numOfEps > 0 && vgroupInfo.vgId > 0);

  tscDebug("before: Endpoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps);
  vgroupInfo.inUse    = pEpSet->inUse;
  vgroupInfo.numOfEps = pEpSet->numOfEps;
  for (int32_t i = 0; i < vgroupInfo.numOfEps; i++) {
    strncpy(vgroupInfo.ep[i].fqdn, pEpSet->fqdn[i], TSDB_FQDN_LEN);
    vgroupInfo.ep[i].port = pEpSet->port[i];
dengyihao's avatar
dengyihao 已提交
137
  }
H
Haojun Liao 已提交
138

139
  tscDebug("after: EndPoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps);
140
  taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(SNewVgroupInfo));
dengyihao's avatar
dengyihao 已提交
141
}
H
Haojun Liao 已提交
142

H
hzcheng 已提交
143 144 145
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
H
Haojun Liao 已提交
146

H
hzcheng 已提交
147
  if (pObj != pObj->signature) {
H
Haojun Liao 已提交
148
    tscError("heartbeat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
H
hzcheng 已提交
149 150 151
    return;
  }

H
Haojun Liao 已提交
152
  SSqlObj *pSql = tres;
H
hzcheng 已提交
153 154
  SSqlRes *pRes = &pSql->res;

155
  if (code == TSDB_CODE_SUCCESS) {
S
TD-1732  
Shengliang Guan 已提交
156
    SHeartBeatRsp *pRsp = (SHeartBeatRsp *)pRes->pRsp;
157
    SRpcEpSet     *epSet = &pRsp->epSet;
dengyihao's avatar
dengyihao 已提交
158 159
    if (epSet->numOfEps > 0) {
      tscEpSetHtons(epSet);
160 161 162
      if (!tscEpSetIsEqual(&pSql->pTscObj->tscCorMgmtEpSet->epSet, epSet)) {
        tscTrace("%p updating epset: numOfEps: %d, inUse: %d", pSql, epSet->numOfEps, epSet->inUse);
        for (int8_t i = 0; i < epSet->numOfEps; i++) {
163
          tscTrace("endpoint %d: fqdn=%s, port=%d", i, epSet->fqdn[i], epSet->port[i]);
164 165 166
        }
        tscUpdateMgmtEpSet(pSql, epSet);
      }
S
TD-1732  
Shengliang Guan 已提交
167
    }
S
slguan 已提交
168

S
Shengliang Guan 已提交
169 170
    pSql->pTscObj->connId = htonl(pRsp->connId);

H
hzcheng 已提交
171 172
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
173
      return;
H
hzcheng 已提交
174
    } else {
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
      if (pRsp->queryId) {
        tscKillQuery(pObj, htonl(pRsp->queryId));
      }

      if (pRsp->streamId) {
        tscKillStream(pObj, htonl(pRsp->streamId));
      }
    }

    int32_t total  = htonl(pRsp->totalDnodes);
    int32_t online = htonl(pRsp->onlineDnodes);
    assert(online <= total);

    if (online < total) {
      tscError("HB:%p, total dnode:%d, online dnode:%d", pSql, total, online);
      pSql->res.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hzcheng 已提交
191
    }
192

193
    if (pRes->length == NULL) {
194 195 196 197 198
      pRes->length = calloc(2,  sizeof(int32_t));
    }

    pRes->length[0] = total;
    pRes->length[1] = online;
H
hzcheng 已提交
199
  } else {
200
    tscDebug("%" PRId64 " heartbeat failed, code:%s", pObj->hbrid, tstrerror(code));
201
    if (pRes->length == NULL) {
202 203 204 205 206 207 208
      pRes->length = calloc(2, sizeof(int32_t));
    }

    pRes->length[1] = 0;
    if (pRes->length[0] == 0) {
      pRes->length[0] = 1; // make sure that the value of the total node is greater than the online node
    }
H
hzcheng 已提交
209 210
  }

211
  if (pObj->hbrid != 0) {
212
    int32_t waitingDuring = tsShellActivityTimer * 500;
H
Haojun Liao 已提交
213
    tscDebug("%p send heartbeat in %dms", pSql, waitingDuring);
H
Haojun Liao 已提交
214

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
215
    taosTmrReset(tscProcessActivityTimer, waitingDuring, (void *)pObj->rid, tscTmr, &pObj->pTimer);
H
Haojun Liao 已提交
216 217 218
  } else {
    tscDebug("%p start to close tscObj:%p, not send heartbeat again", pSql, pObj);
  }
H
hzcheng 已提交
219 220 221
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
222 223
  int64_t rid = (int64_t) handle;
  STscObj *pObj = taosAcquireRef(tscRefId, rid);
224 225 226
  if (pObj == NULL) {
    return;
  }
H
hzcheng 已提交
227

228
  SSqlObj* pHB = taosAcquireRef(tscObjRef, pObj->hbrid);
229 230 231 232 233
  if (pHB == NULL) {
    taosReleaseRef(tscRefId, rid);
    return;
  }

234
  assert(pHB->self == pObj->hbrid);
H
Haojun Liao 已提交
235

236
  pHB->retry = 0;
H
Haojun Liao 已提交
237
  int32_t code = tscProcessSql(pHB);
238
  taosReleaseRef(tscObjRef, pObj->hbrid);
H
Haojun Liao 已提交
239 240 241

  if (code != TSDB_CODE_SUCCESS) {
    tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
H
hzcheng 已提交
242
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
243

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
244
  taosReleaseRef(tscRefId, rid);
H
hzcheng 已提交
245 246 247
}

int tscSendMsgToServer(SSqlObj *pSql) {
248
  STscObj* pObj = pSql->pTscObj;
H
hjxilinx 已提交
249 250
  SSqlCmd* pCmd = &pSql->cmd;
  
251
  char *pMsg = rpcMallocCont(pCmd->payloadLen);
S
slguan 已提交
252
  if (NULL == pMsg) {
253
    tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]);
254
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
255 256
  }

257 258
  // set the mgmt ip list
  if (pSql->cmd.command >= TSDB_SQL_MGMT) {
dengyihao's avatar
TD-2257  
dengyihao 已提交
259
    tscDumpMgmtEpSet(pSql);
J
jtao1735 已提交
260 261
  }

262
  memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
263

J
jtao1735 已提交
264
  SRpcMsg rpcMsg = {
265 266
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
267
      .contLen = pSql->cmd.payloadLen,
268
      .ahandle = (void*)pSql->self,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
269
      .handle  = NULL,
H
hjxilinx 已提交
270
      .code    = 0
J
jtao1735 已提交
271
  };
H
Haojun Liao 已提交
272

273
  rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid);
H
Haojun Liao 已提交
274
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
275 276
}

277
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
S
TD-1530  
Shengliang Guan 已提交
278
  TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
279 280
  SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
  if (pSql == NULL) {
H
Haojun Liao 已提交
281
    rpcFreeCont(rpcMsg->pCont);
282 283
    return;
  }
284
  assert(pSql->self == handle);
H
Haojun Liao 已提交
285

H
Haojun Liao 已提交
286
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
287 288
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
289

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
290
  pSql->rpcRid = -1;
H
Haojun Liao 已提交
291

H
Haojun Liao 已提交
292
  if (pObj->signature != pObj) {
293
    tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
H
Haojun Liao 已提交
294

295 296
    taosRemoveRef(tscObjRef, pSql->self);
    taosReleaseRef(tscObjRef, pSql->self);
H
Haojun Liao 已提交
297 298 299 300 301 302
    rpcFreeCont(rpcMsg->pCont);
    return;
  }

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
303
    tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
304
        pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
H
Haojun Liao 已提交
305

306 307
    taosRemoveRef(tscObjRef, pSql->self);
    taosReleaseRef(tscObjRef, pSql->self);
308
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
309
    return;
H
hzcheng 已提交
310 311
  }

312
  if (pEpSet) { // todo update this
dengyihao's avatar
dengyihao 已提交
313
    if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
H
Haojun Liao 已提交
314 315
      if (pCmd->command < TSDB_SQL_MGMT) {
        tscUpdateVgroupInfo(pSql, pEpSet);
dengyihao's avatar
dengyihao 已提交
316
      } else {
dengyihao's avatar
TD-2257  
dengyihao 已提交
317
        tscUpdateMgmtEpSet(pSql, pEpSet);
H
Haojun Liao 已提交
318
      }
dengyihao's avatar
dengyihao 已提交
319
    }
J
jtao1735 已提交
320 321
  }

322
  int32_t cmd = pCmd->command;
323

324 325 326 327 328 329
  // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
  if (cmd == TSDB_SQL_INSERT && rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
    pSql->cmd.submitSchema = 1;
  }

  if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
330 331 332
      (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
       rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
       rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL ||
333
       rpcMsg->code == TSDB_CODE_APP_NOT_READY)) {
S
TD-2475  
Shengliang Guan 已提交
334 335 336
        
    pSql->retry++;
    tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), pSql->retry);
337 338 339 340 341

    pSql->res.code = rpcMsg->code;  // keep the previous error code
    if (pSql->retry > pSql->maxRetry) {
      tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
    } else {
H
Haojun Liao 已提交
342 343
      // wait for a little bit moment and then retry
      // todo do not sleep in rpc callback thread, add this process into queueu to process
H
Haojun Liao 已提交
344 345 346 347 348
      if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
        int32_t duration = getWaitingTimeInterval(pSql->retry);
        taosMsleep(duration);
      }

B
Bomin Zhang 已提交
349
      rpcMsg->code = tscRenewTableMeta(pSql, 0);
350 351 352

      // if there is an error occurring, proceed to the following error handling procedure.
      if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
353
        taosReleaseRef(tscObjRef, pSql->self);
354 355
        rpcFreeCont(rpcMsg->pCont);
        return;
H
hzcheng 已提交
356 357
      }
    }
S
slguan 已提交
358
  }
359

H
hzcheng 已提交
360
  pRes->rspLen = 0;
361
  
H
Haojun Liao 已提交
362
  if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
363
    tscDebug("%p query is cancelled, code:%s", pSql, tstrerror(pRes->code));
H
Haojun Liao 已提交
364 365
  } else {
    pRes->code = rpcMsg->code;
H
hzcheng 已提交
366 367
  }

S
slguan 已提交
368
  if (pRes->code == TSDB_CODE_SUCCESS) {
369
    tscDebug("%p reset retry counter to be 0 due to success rsp, old:%d", pSql, pSql->retry);
S
slguan 已提交
370 371 372
    pSql->retry = 0;
  }

373
  if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
374
    assert(rpcMsg->msgType == pCmd->msgType + 1);
375
    pRes->code    = rpcMsg->code;
376
    pRes->rspType = rpcMsg->msgType;
377
    pRes->rspLen  = rpcMsg->contLen;
H
hzcheng 已提交
378

379
    if (pRes->rspLen > 0 && rpcMsg->pCont) {
380 381
      char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
      if (tmp == NULL) {
382
        pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
383 384
      } else {
        pRes->pRsp = tmp;
385
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
386
      }
387
    } else {
H
Haojun Liao 已提交
388
      tfree(pRes->pRsp);
S
slguan 已提交
389 390
    }

H
hzcheng 已提交
391 392 393 394
    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
395
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
396
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
397 398 399 400 401 402 403
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
404
      tscDebug("%p SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql, sqlCmd[pCmd->command], 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
405
          tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
S
slguan 已提交
406
    } else {
407
      tscDebug("%p SQL cmd:%s, code:%s rspLen:%d", pSql, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
H
hzcheng 已提交
408 409
    }
  }
410
  
H
Haojun Liao 已提交
411
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
412
    rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
Haojun Liao 已提交
413
  }
S
Shengliang Guan 已提交
414

H
Haojun Liao 已提交
415
  bool shouldFree = tscShouldBeFreed(pSql);
416
  if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
S
TD-1057  
Shengliang Guan 已提交
417
    rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
H
Haojun Liao 已提交
418
    (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
H
hzcheng 已提交
419 420
  }

421
  taosReleaseRef(tscObjRef, pSql->self);
H
Haojun Liao 已提交
422

H
Haojun Liao 已提交
423
  if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it
424
    taosRemoveRef(tscObjRef, pSql->self);
D
TD-2516  
dapan1121 已提交
425
    tscDebug("%p sqlObj is automatically freed", pSql); 
H
Haojun Liao 已提交
426 427
  }

428
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
429 430
}

S
slguan 已提交
431 432 433
int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
434

H
hjxilinx 已提交
435 436 437 438 439 440 441
  if (pCmd->command == TSDB_SQL_SELECT ||
      pCmd->command == TSDB_SQL_FETCH ||
      pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT ||
      pCmd->command == TSDB_SQL_CONNECT ||
      pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META ||
442
      pCmd->command == TSDB_SQL_STABLEVGROUP) {
443 444 445 446
    pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
  }
  
  if (pRes->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
447
    tscAsyncResultOnError(pSql);
448
    return pRes->code;
S
slguan 已提交
449
  }
450

451 452 453
  int32_t code = tscSendMsgToServer(pSql);

  // NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads.
454
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
455
    pRes->code = code;
H
Haojun Liao 已提交
456
    tscAsyncResultOnError(pSql);
H
Haojun Liao 已提交
457
    return code;
S
slguan 已提交
458
  }
H
hjxilinx 已提交
459 460
  
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
461 462 463
}

int tscProcessSql(SSqlObj *pSql) {
464
  char    *name = NULL;
465
  SSqlCmd *pCmd = &pSql->cmd;
466
  
467
  SQueryInfo     *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
468
  STableMetaInfo *pTableMetaInfo = NULL;
469
  uint32_t        type = 0;
470

471
  if (pQueryInfo != NULL) {
H
hjxilinx 已提交
472
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
473
    name = (pTableMetaInfo != NULL)? pTableMetaInfo->name:NULL;
474
    type = pQueryInfo->type;
475

H
hjxilinx 已提交
476
    // while numOfTables equals to 0, it must be Heartbeat
H
hjxilinx 已提交
477
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
478
  }
479

480
  tscDebug("%p SQL cmd:%s will be processed, name:%s, type:%d", pSql, sqlCmd[pCmd->command], name, type);
H
hjxilinx 已提交
481
  if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL
H
hjxilinx 已提交
482
    if (pTableMetaInfo == NULL) {
483
      pSql->res.code = TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
484 485
      return pSql->res.code;
    }
H
Haojun Liao 已提交
486
  } else if (pCmd->command >= TSDB_SQL_LOCAL) {
487
    //pSql->epSet = tscMgmtEpSet;
H
Haojun Liao 已提交
488
//  } else {  // local handler
H
hzcheng 已提交
489 490
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }
491
  
S
slguan 已提交
492 493
  return doProcessSql(pSql);
}
H
hzcheng 已提交
494

J
jtao1735 已提交
495
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
496
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
S
slguan 已提交
497

H
Haojun Liao 已提交
498
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
499 500
  pRetrieveMsg->free    = htons(pQueryInfo->type);
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
H
hzcheng 已提交
501

502
  // todo valid the vgroupId at the client side
H
hjxilinx 已提交
503 504
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  
weixin_48148422's avatar
weixin_48148422 已提交
505
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
506
    int32_t vgIndex = pTableMetaInfo->vgroupIndex;
H
Haojun Liao 已提交
507 508 509 510 511 512 513 514 515
    if (pTableMetaInfo->pVgroupTables == NULL) {
      SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList;
      assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);

      pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
      tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex);
    } else {
      int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
      assert(vgIndex >= 0 && vgIndex < numOfVgroups);
H
Haojun Liao 已提交
516

H
Haojun Liao 已提交
517 518 519 520 521
      SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);

      pRetrieveMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId);
      tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex);
    }
522
  } else {
H
hjxilinx 已提交
523
    STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
524 525
    pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId);
    tscDebug("%p build fetch msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgId);
526
  }
527 528

  pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg);
J
jtao1735 已提交
529
  pSql->cmd.msgType = TSDB_MSG_TYPE_FETCH;
530 531 532

  pRetrieveMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));

533
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
534 535
}

536
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hjxilinx 已提交
537
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
538
  STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
H
hjxilinx 已提交
539
  
540
  char* pMsg = pSql->cmd.payload;
541 542 543
  
  // NOTE: shell message size should not include SMsgDesc
  int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
544

545
  SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
546 547
  pMsgDesc->numOfVnodes = htonl(1); // always one vnode

548
  pMsg += sizeof(SMsgDesc);
549
  SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
550

551
  pShellMsg->header.vgId = htonl(pTableMeta->vgId);
H
Haojun Liao 已提交
552
  pShellMsg->header.contLen = htonl(size);      // the length not includes the size of SMsgDesc
553
  pShellMsg->length = pShellMsg->header.contLen;
H
hjxilinx 已提交
554
  
H
Haojun Liao 已提交
555
  pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit);  // number of tables to be inserted
H
hzcheng 已提交
556

H
hjxilinx 已提交
557
  // pSql->cmd.payloadLen is set during copying data into payload
S
slguan 已提交
558
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
559 560

  SNewVgroupInfo vgroupInfo = {0};
561
  taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
562
  tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
563

564
  tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, pTableMeta->vgId, pSql->cmd.numOfTablesInSubmit,
565
      pSql->epSet.numOfEps);
566
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
567 568 569
}

/*
570
 * for table query, simply return the size <= 1k
H
hzcheng 已提交
571
 */
572
static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql, int32_t clauseIndex) {
H
hzcheng 已提交
573
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
H
Haojun Liao 已提交
574 575 576

  SSqlCmd* pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
577

S
TD-1057  
Shengliang Guan 已提交
578
  int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo));
H
Haojun Liao 已提交
579 580

  size_t  numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
H
Haojun Liao 已提交
581
  int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs * 2);
H
Haojun Liao 已提交
582

H
Haojun Liao 已提交
583
  int32_t tsBufSize = (pQueryInfo->tsBuf != NULL) ? pQueryInfo->tsBuf->fileSize : 0;
H
Haojun Liao 已提交
584 585
  int32_t sqlLen = (int32_t) strlen(pSql->sqlstr) + 1;

H
Haojun Liao 已提交
586 587 588 589 590 591 592 593 594

  int32_t tableSerialize = 0;
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  if (pTableMetaInfo->pVgroupTables != NULL) {
    size_t numOfGroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables);

    int32_t totalTables = 0;
    for (int32_t i = 0; i < numOfGroups; ++i) {
      SVgroupTableInfo *pTableInfo = taosArrayGet(pTableMetaInfo->pVgroupTables, i);
H
Haojun Liao 已提交
595
      totalTables += (int32_t) taosArrayGetSize(pTableInfo->itemList);
H
Haojun Liao 已提交
596 597 598 599
    }

    tableSerialize = totalTables * sizeof(STableIdInfo);
  }
H
Haojun Liao 已提交
600

H
Haojun Liao 已提交
601
  return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + tsBufSize +
602
         tableSerialize + sqlLen + 4096;
H
hzcheng 已提交
603 604
}

605
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
606
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
B
Bomin Zhang 已提交
607
  TSKEY dfltKey = htobe64(pQueryMsg->window.skey);
608

H
hjxilinx 已提交
609
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
weixin_48148422's avatar
weixin_48148422 已提交
610
  if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
611
    
612
    int32_t vgId = -1;
weixin_48148422's avatar
weixin_48148422 已提交
613
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
614 615
      int32_t index = pTableMetaInfo->vgroupIndex;
      assert(index >= 0);
616 617

      SVgroupInfo* pVgroupInfo = NULL;
B
Bomin Zhang 已提交
618
      if (pTableMetaInfo->vgroupList->numOfVgroups > 0) {
H
Haojun Liao 已提交
619
        assert(index < pTableMetaInfo->vgroupList->numOfVgroups);
B
Bomin Zhang 已提交
620 621
        pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
      }
622 623 624

      vgId = pVgroupInfo->vgId;
      tscSetDnodeEpSet(&pSql->epSet, pVgroupInfo);
H
Haojun Liao 已提交
625
      tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups);
H
hjxilinx 已提交
626
    } else {
627
      vgId = pTableMeta->vgId;
628 629

      SNewVgroupInfo vgroupInfo = {0};
630
      taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
631
      tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
632
    }
weixin_48148422's avatar
weixin_48148422 已提交
633

634
    pSql->epSet.inUse = rand()%pSql->epSet.numOfEps;
635

636
    pQueryMsg->head.vgId = htonl(vgId);
weixin_48148422's avatar
weixin_48148422 已提交
637

638
    STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
639 640 641
    pTableIdInfo->tid = htonl(pTableMeta->id.tid);
    pTableIdInfo->uid = htobe64(pTableMeta->id.uid);
    pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->id.uid, dfltKey));
weixin_48148422's avatar
weixin_48148422 已提交
642

643 644
    pQueryMsg->numOfTables = htonl(1);  // set the number of tables
    pMsg += sizeof(STableIdInfo);
645
  } else { // it is a subquery of the super table query, this EP info is acquired from vgroupInfo
646
    int32_t index = pTableMetaInfo->vgroupIndex;
S
TD-1057  
Shengliang Guan 已提交
647
    int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
648
    assert(index >= 0 && index < numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
649

650
    tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
651

652
    SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
653

dengyihao's avatar
bugfix  
dengyihao 已提交
654
    // set the vgroup info 
655
    tscSetDnodeEpSet(&pSql->epSet, &pTableIdList->vgInfo);
656 657
    pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
    
S
TD-1057  
Shengliang Guan 已提交
658
    int32_t numOfTables = (int32_t)taosArrayGetSize(pTableIdList->itemList);
659 660 661 662 663 664 665 666 667
    pQueryMsg->numOfTables = htonl(numOfTables);  // set the number of tables
  
    // serialize each table id info
    for(int32_t i = 0; i < numOfTables; ++i) {
      STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i);
      
      STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
      pTableIdInfo->tid = htonl(pItem->tid);
      pTableIdInfo->uid = htobe64(pItem->uid);
B
Bomin Zhang 已提交
668
      pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pItem->uid, dfltKey));
669 670 671 672
      pMsg += sizeof(STableIdInfo);
    }
  }
  
H
Haojun Liao 已提交
673 674
  tscDebug("%p vgId:%d, query on table:%s, tid:%d, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name,
      pTableMeta->id.tid, pTableMeta->id.uid);
H
hjxilinx 已提交
675
  
676 677 678
  return pMsg;
}

679
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
680 681
  SSqlCmd *pCmd = &pSql->cmd;

682
  int32_t size = tscEstimateQueryMsgSize(pSql, pCmd->clauseIndex);
H
hzcheng 已提交
683

S
slguan 已提交
684 685
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
H
Haojun Liao 已提交
686
    return TSDB_CODE_TSC_INVALID_SQL;  // todo add test for this
S
slguan 已提交
687
  }
688
  
689
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
690
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
691
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
692 693 694

  size_t numOfSrcCols = taosArrayGetSize(pQueryInfo->colList);
  if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo)) {
S
Shengliang Guan 已提交
695
    tscError("%p illegal value of numOfCols in query msg: %" PRIu64 ", table cols:%d", pSql, (uint64_t)numOfSrcCols,
696 697
        tscGetNumOfColumns(pTableMeta));

H
Haojun Liao 已提交
698
    return TSDB_CODE_TSC_INVALID_SQL;
699
  }
700
  
701
  if (pQueryInfo->interval.interval < 0) {
S
TD-1530  
Shengliang Guan 已提交
702
    tscError("%p illegal value of aggregation time interval in query msg: %" PRId64, pSql, (int64_t)pQueryInfo->interval.interval);
H
Haojun Liao 已提交
703
    return TSDB_CODE_TSC_INVALID_SQL;
704 705 706 707
  }
  
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
H
Haojun Liao 已提交
708
    return TSDB_CODE_TSC_INVALID_SQL;
709
  }
710

711
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
712
  tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version));
H
hzcheng 已提交
713

S
TD-1057  
Shengliang Guan 已提交
714
  int32_t numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList);
715 716
  int32_t sqlLen = (int32_t) strlen(pSql->sqlstr);

717
  if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
H
hjxilinx 已提交
718 719
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey);
H
hzcheng 已提交
720
  } else {
H
hjxilinx 已提交
721 722
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.ekey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey);
H
hzcheng 已提交
723 724
  }

725 726
  pQueryMsg->order          = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId     = htons(pQueryInfo->order.orderColId);
H
Haojun Liao 已提交
727
  pQueryMsg->fillType       = htons(pQueryInfo->fillType);
728 729
  pQueryMsg->limit          = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset         = htobe64(pQueryInfo->limit.offset);
S
TD-1057  
Shengliang Guan 已提交
730
  pQueryMsg->numOfCols      = htons((int16_t)taosArrayGetSize(pQueryInfo->colList));
H
Haojun Liao 已提交
731 732
  pQueryMsg->interval.interval = htobe64(pQueryInfo->interval.interval);
  pQueryMsg->interval.sliding  = htobe64(pQueryInfo->interval.sliding);
733 734
  pQueryMsg->interval.offset   = htobe64(pQueryInfo->interval.offset);
  pQueryMsg->interval.intervalUnit = pQueryInfo->interval.intervalUnit;
H
Haojun Liao 已提交
735 736
  pQueryMsg->interval.slidingUnit  = pQueryInfo->interval.slidingUnit;
  pQueryMsg->interval.offsetUnit   = pQueryInfo->interval.offsetUnit;
737
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
weixin_48148422's avatar
weixin_48148422 已提交
738
  pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
H
Haojun Liao 已提交
739
  pQueryMsg->tbnameCondLen  = htonl(pQueryInfo->tagCond.tbnameCond.len);
740
  pQueryMsg->numOfTags      = htonl(numOfTags);
H
Haojun Liao 已提交
741
  pQueryMsg->queryType      = htonl(pQueryInfo->type);
742 743 744
  pQueryMsg->vgroupLimit    = htobe64(pQueryInfo->vgroupLimit);
  pQueryMsg->sqlstrLen      = htonl(sqlLen);

H
hjxilinx 已提交
745
  size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
H
Haojun Liao 已提交
746
  pQueryMsg->numOfOutput = htons((int16_t)numOfOutput);  // this is the stage one output column number
H
hzcheng 已提交
747 748

  // set column list ids
749 750
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
H
hjxilinx 已提交
751
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
752
  
753 754 755
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
    SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
756

757
    if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || !isValidDataType(pColSchema->type)) {
758 759
      tscError("%p tid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
          pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
760
               pColSchema->name);
761
      return TSDB_CODE_TSC_INVALID_SQL;
762
    }
H
hzcheng 已提交
763 764 765

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
766
    pQueryMsg->colList[i].type  = htons(pColSchema->type);
S
slguan 已提交
767
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
768

S
slguan 已提交
769 770 771
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
772

S
slguan 已提交
773
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
774
      pFilterMsg->filterstr = htons(pColFilter->filterstr);
S
slguan 已提交
775 776

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
777

778
      if (pColFilter->filterstr) {
S
slguan 已提交
779
        pFilterMsg->len = htobe64(pColFilter->len);
780
        memcpy(pMsg, (void *)pColFilter->pz, (size_t)(pColFilter->len + 1));
S
slguan 已提交
781 782 783 784 785 786 787 788
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
789

S
slguan 已提交
790 791
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
H
Haojun Liao 已提交
792
        return TSDB_CODE_TSC_INVALID_SQL;
S
slguan 已提交
793 794
      }
    }
H
hzcheng 已提交
795 796
  }

H
hjxilinx 已提交
797
  SSqlFuncMsg *pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hjxilinx 已提交
798
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
799
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
800

801
    if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
H
hzcheng 已提交
802
      tscError("%p table schema is not matched with parsed sql", pSql);
H
Haojun Liao 已提交
803
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
804 805
    }

H
Haojun Liao 已提交
806 807
    assert(pExpr->resColId < 0);

808 809 810
    pSqlFuncExpr->colInfo.colId    = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
    pSqlFuncExpr->colInfo.flag     = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
811

812
    pSqlFuncExpr->functionId  = htons(pExpr->functionId);
H
hzcheng 已提交
813
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
H
Haojun Liao 已提交
814
    pSqlFuncExpr->resColId    = htons(pExpr->resColId);
H
hjxilinx 已提交
815
    pMsg += sizeof(SSqlFuncMsg);
H
hzcheng 已提交
816 817

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
H
Haojun Liao 已提交
818
      // todo add log
H
hzcheng 已提交
819 820 821 822 823
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
824
        pMsg += pExpr->param[j].nLen;
H
hzcheng 已提交
825
      } else {
826
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64);
H
hzcheng 已提交
827 828 829
      }
    }

H
hjxilinx 已提交
830
    pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hzcheng 已提交
831
  }
H
Haojun Liao 已提交
832

H
Haojun Liao 已提交
833 834
  size_t output = tscNumOfFields(pQueryInfo);

H
Haojun Liao 已提交
835
  if (tscIsSecondStageQuery(pQueryInfo)) {
S
Shengliang Guan 已提交
836
    pQueryMsg->secondStageOutput = htonl((int32_t) output);
H
Haojun Liao 已提交
837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865

    SSqlFuncMsg *pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;

    for (int32_t i = 0; i < output; ++i) {
      SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i);
      SSqlExpr *pExpr = pField->pSqlExpr;
      if (pExpr != NULL) {
        if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
          tscError("%p table schema is not matched with parsed sql", pSql);
          return TSDB_CODE_TSC_INVALID_SQL;
        }

        pSqlFuncExpr1->colInfo.colId    = htons(pExpr->colInfo.colId);
        pSqlFuncExpr1->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
        pSqlFuncExpr1->colInfo.flag     = htons(pExpr->colInfo.flag);

        pSqlFuncExpr1->functionId  = htons(pExpr->functionId);
        pSqlFuncExpr1->numOfParams = htons(pExpr->numOfParams);
        pMsg += sizeof(SSqlFuncMsg);

        for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
          // todo add log
          pSqlFuncExpr1->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
          pSqlFuncExpr1->arg[j].argBytes = htons(pExpr->param[j].nLen);

          if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
            memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
            pMsg += pExpr->param[j].nLen;
          } else {
866
            pSqlFuncExpr1->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64);
H
Haojun Liao 已提交
867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899
          }
        }

        pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;
      } else {
        assert(pField->pArithExprInfo != NULL);
        SExprInfo* pExprInfo = pField->pArithExprInfo;

        pSqlFuncExpr1->colInfo.colId    = htons(pExprInfo->base.colInfo.colId);
        pSqlFuncExpr1->functionId  = htons(pExprInfo->base.functionId);
        pSqlFuncExpr1->numOfParams = htons(pExprInfo->base.numOfParams);
        pMsg += sizeof(SSqlFuncMsg);

        for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
          // todo add log
          pSqlFuncExpr1->arg[j].argType = htons((uint16_t)pExprInfo->base.arg[j].argType);
          pSqlFuncExpr1->arg[j].argBytes = htons(pExprInfo->base.arg[j].argBytes);

          if (pExprInfo->base.arg[j].argType == TSDB_DATA_TYPE_BINARY) {
            memcpy(pMsg, pExprInfo->base.arg[j].argValue.pz, pExprInfo->base.arg[j].argBytes);
            pMsg += pExprInfo->base.arg[j].argBytes;
          } else {
            pSqlFuncExpr1->arg[j].argValue.i64 = htobe64(pExprInfo->base.arg[j].argValue.i64);
          }
        }

        pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;
      }
    }
  } else {
    pQueryMsg->secondStageOutput = 0;
  }

900
  // serialize the table info (sid, uid, tags)
901 902
  pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
  
903
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
904
  if (pGroupbyExpr->numOfGroupCols > 0) {
S
slguan 已提交
905
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
906 907
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
908
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
909 910
      SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
  
D
fix bug  
dapan1121 已提交
911
      *((int16_t *)pMsg) = htons(pCol->colId);
S
slguan 已提交
912 913
      pMsg += sizeof(pCol->colId);

D
fix bug  
dapan1121 已提交
914
      *((int16_t *)pMsg) += htons(pCol->colIndex);
915
      pMsg += sizeof(pCol->colIndex);
S
slguan 已提交
916

D
fix bug  
dapan1121 已提交
917
      *((int16_t *)pMsg) += htons(pCol->flag);
S
slguan 已提交
918
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
919 920 921
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
922 923 924
    }
  }

925
  if (pQueryInfo->fillType != TSDB_FILL_NONE) {
H
Haojun Liao 已提交
926
    for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
927 928
      *((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]);
      pMsg += sizeof(pQueryInfo->fillVal[0]);
S
slguan 已提交
929 930
    }
  }
931 932 933 934 935 936 937 938 939
  
  if (numOfTags != 0) {
    int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
    int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
    int32_t total = numOfTagColumns + numOfColumns;
    
    pSchema = tscGetTableTagSchema(pTableMeta);
    
    for (int32_t i = 0; i < numOfTags; ++i) {
H
hjxilinx 已提交
940
      SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
941 942 943
      SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];

      if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
944
          (!isValidDataType(pColSchema->type))) {
945 946
        tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
                 pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, total, numOfTagColumns,
B
Bomin Zhang 已提交
947
                 pCol->colIndex.columnIndex, pColSchema->name);
948

949
        return TSDB_CODE_TSC_INVALID_SQL;
950 951 952 953 954 955 956 957 958 959 960 961
      }
  
      SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
  
      pTagCol->colId = htons(pColSchema->colId);
      pTagCol->bytes = htons(pColSchema->bytes);
      pTagCol->type  = htons(pColSchema->type);
      pTagCol->numOfFilters = 0;
      
      pMsg += sizeof(SColumnInfo);
    }
  }
S
slguan 已提交
962

H
Haojun Liao 已提交
963 964 965 966
  // serialize tag column query condition
  if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
    STagCond* pTagCond = &pQueryInfo->tagCond;
    
967
    SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid);
H
Haojun Liao 已提交
968 969 970 971 972 973 974
    if (pCond != NULL && pCond->cond != NULL) {
      pQueryMsg->tagCondLen = htons(pCond->len);
      memcpy(pMsg, pCond->cond, pCond->len);
      
      pMsg += pCond->len;
    }
  }
H
Haojun Liao 已提交
975 976 977 978 979

  SCond* pCond = &pQueryInfo->tagCond.tbnameCond;
  if (pCond->len > 0) {
    strncpy(pMsg, pCond->cond, pCond->len);
    pMsg += pCond->len;
H
Haojun Liao 已提交
980 981
  }

S
slguan 已提交
982
  // compressed ts block
S
TD-1057  
Shengliang Guan 已提交
983
  pQueryMsg->tsOffset = htonl((int32_t)(pMsg - pCmd->payload));
S
slguan 已提交
984

985
  if (pQueryInfo->tsBuf != NULL) {
986 987
    // note: here used the index instead of actual vnode id.
    int32_t vnodeIndex = pTableMetaInfo->vgroupIndex;
H
Haojun Liao 已提交
988
    int32_t code = dumpFileBlockByGroupId(pQueryInfo->tsBuf, vnodeIndex, pMsg, &pQueryMsg->tsLen, &pQueryMsg->tsNumOfBlocks);
989
    if (code != TSDB_CODE_SUCCESS) {
B
Bomin Zhang 已提交
990 991
      return code;
    }
S
slguan 已提交
992

993
    pMsg += pQueryMsg->tsLen;
H
hzcheng 已提交
994

995
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
996 997
    pQueryMsg->tsLen   = htonl(pQueryMsg->tsLen);
    pQueryMsg->tsNumOfBlocks = htonl(pQueryMsg->tsNumOfBlocks);
H
hzcheng 已提交
998 999
  }

1000 1001 1002
  memcpy(pMsg, pSql->sqlstr, sqlLen);
  pMsg += sqlLen;

S
TD-1057  
Shengliang Guan 已提交
1003
  int32_t msgLen = (int32_t)(pMsg - pCmd->payload);
H
hzcheng 已提交
1004

H
Haojun Liao 已提交
1005
  tscDebug("%p msg built success, len:%d bytes", pSql, msgLen);
H
hzcheng 已提交
1006
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1007
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hjxilinx 已提交
1008
  
1009
  pQueryMsg->head.contLen = htonl(msgLen);
S
TD-1057  
Shengliang Guan 已提交
1010
  assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize);
1011 1012

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1013 1014
}

1015 1016
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1017
  pCmd->payloadLen = sizeof(SCreateDbMsg);
S
slguan 已提交
1018
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
H
hzcheng 已提交
1019

S
TD-1732  
Shengliang Guan 已提交
1020
  SCreateDbMsg *pCreateDbMsg = (SCreateDbMsg *)pCmd->payload;
1021

1022
  assert(pCmd->numOfClause == 1);
1023
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1024
  tstrncpy(pCreateDbMsg->db, pTableMetaInfo->name, sizeof(pCreateDbMsg->db));
H
hzcheng 已提交
1025

1026
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1027 1028
}

1029 1030
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1031
  pCmd->payloadLen = sizeof(SCreateDnodeMsg);
S
slguan 已提交
1032 1033
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1034
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1035
  }
H
hzcheng 已提交
1036

S
TD-1732  
Shengliang Guan 已提交
1037
  SCreateDnodeMsg *pCreate = (SCreateDnodeMsg *)pCmd->payload;
J
jtao1735 已提交
1038 1039
  strncpy(pCreate->ep, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
  
S
slguan 已提交
1040
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
H
hzcheng 已提交
1041

1042
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1043 1044
}

1045 1046
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1762  
Shengliang Guan 已提交
1047
  pCmd->payloadLen = sizeof(SCreateAcctMsg);
S
slguan 已提交
1048 1049
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1050
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1051
  }
H
hzcheng 已提交
1052

S
TD-1762  
Shengliang Guan 已提交
1053
  SCreateAcctMsg *pAlterMsg = (SCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
1054

H
Haojun Liao 已提交
1055 1056
  SStrToken *pName = &pInfo->pDCLInfo->user.user;
  SStrToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
1057

1058 1059
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
1060

1061
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
1062

1063 1064 1065 1066 1067 1068 1069 1070
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
1071

1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
1085

S
slguan 已提交
1086
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
1087
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1088 1089
}

1090 1091
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1092
  pCmd->payloadLen = sizeof(SCreateUserMsg);
S
slguan 已提交
1093

S
slguan 已提交
1094 1095
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1096
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1097 1098
  }

S
TD-1732  
Shengliang Guan 已提交
1099
  SCreateUserMsg *pAlterMsg = (SCreateUserMsg *)pCmd->payload;
H
hzcheng 已提交
1100

1101 1102
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
S
TD-1057  
Shengliang Guan 已提交
1103
  pAlterMsg->flag = (int8_t)pUser->type;
H
hzcheng 已提交
1104

1105 1106 1107 1108
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
1109 1110
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
1111
  }
H
hzcheng 已提交
1112

1113
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
S
slguan 已提交
1114
    pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
1115
  } else {
S
slguan 已提交
1116
    pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
1117
  }
H
hzcheng 已提交
1118

1119
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1120 1121
}

1122 1123
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1124
  pCmd->payloadLen = sizeof(SCfgDnodeMsg);
guanshengliang's avatar
guanshengliang 已提交
1125
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONFIG_DNODE;
1126 1127
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
1128

1129 1130
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1131
  pCmd->payloadLen = sizeof(SDropDbMsg);
H
hzcheng 已提交
1132

S
slguan 已提交
1133 1134
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1135
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1136 1137
  }

S
TD-1732  
Shengliang Guan 已提交
1138
  SDropDbMsg *pDropDbMsg = (SDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
1139

1140
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1141
  tstrncpy(pDropDbMsg->db, pTableMetaInfo->name, sizeof(pDropDbMsg->db));
1142
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1143

S
slguan 已提交
1144
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
1145
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1146 1147
}

1148 1149
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1150
  pCmd->payloadLen = sizeof(SCMDropTableMsg);
H
hzcheng 已提交
1151

S
slguan 已提交
1152 1153
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1154
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1155
  }
H
hzcheng 已提交
1156

1157
  SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
1158
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
Haojun Liao 已提交
1159
  strcpy(pDropTableMsg->tableFname, pTableMetaInfo->name);
S
slguan 已提交
1160
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1161

S
slguan 已提交
1162
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
1163
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1164 1165
}

1166
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1167
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1168
  pCmd->payloadLen = sizeof(SDropDnodeMsg);
S
slguan 已提交
1169 1170
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1171
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1172
  }
H
hzcheng 已提交
1173

S
TD-1732  
Shengliang Guan 已提交
1174
  SDropDnodeMsg * pDrop = (SDropDnodeMsg *)pCmd->payload;
1175
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1176
  tstrncpy(pDrop->ep, pTableMetaInfo->name, sizeof(pDrop->ep));
S
slguan 已提交
1177
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
H
hzcheng 已提交
1178

1179
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1180 1181
}

H
Haojun Liao 已提交
1182
int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo * UNUSED_PARAM(pInfo)) {
1183
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1184
  pCmd->payloadLen = sizeof(SDropUserMsg);
S
slguan 已提交
1185
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
H
hzcheng 已提交
1186

S
slguan 已提交
1187 1188
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1189
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1190
  }
H
hzcheng 已提交
1191

S
TD-1732  
Shengliang Guan 已提交
1192
  SDropUserMsg *  pDropMsg = (SDropUserMsg *)pCmd->payload;
1193
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1194
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
H
hzcheng 已提交
1195

1196
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1197 1198
}

S
[TD-16]  
slguan 已提交
1199 1200
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1201
  pCmd->payloadLen = sizeof(SDropUserMsg);
S
[TD-16]  
slguan 已提交
1202 1203 1204 1205
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_ACCT;

  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1206
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
[TD-16]  
slguan 已提交
1207 1208
  }

S
TD-1732  
Shengliang Guan 已提交
1209
  SDropUserMsg *  pDropMsg = (SDropUserMsg *)pCmd->payload;
S
[TD-16]  
slguan 已提交
1210
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1211
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
S
[TD-16]  
slguan 已提交
1212 1213 1214 1215

  return TSDB_CODE_SUCCESS;
}

1216 1217
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1218
  pCmd->payloadLen = sizeof(SUseDbMsg);
H
hzcheng 已提交
1219

S
slguan 已提交
1220 1221
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1222
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1223
  }
1224

S
TD-1732  
Shengliang Guan 已提交
1225
  SUseDbMsg *pUseDbMsg = (SUseDbMsg *)pCmd->payload;
1226
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1227
  strcpy(pUseDbMsg->db, pTableMetaInfo->name);
S
slguan 已提交
1228
  pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
H
hzcheng 已提交
1229

1230
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1231 1232
}

1233
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1234
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1235
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1236
  pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
S
TD-1732  
Shengliang Guan 已提交
1237
  pCmd->payloadLen = sizeof(SShowMsg) + 100;
H
hzcheng 已提交
1238

S
slguan 已提交
1239 1240
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1241
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1242
  }
H
hzcheng 已提交
1243

S
TD-1732  
Shengliang Guan 已提交
1244
  SShowMsg *pShowMsg = (SShowMsg *)pCmd->payload;
S
slguan 已提交
1245

1246
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1247
  size_t nameLen = strlen(pTableMetaInfo->name);
S
slguan 已提交
1248
  if (nameLen > 0) {
B
Bomin Zhang 已提交
1249
    tstrncpy(pShowMsg->db, pTableMetaInfo->name, sizeof(pShowMsg->db));  // prefix is set here
H
hzcheng 已提交
1250
  } else {
B
Bomin Zhang 已提交
1251
    tstrncpy(pShowMsg->db, pObj->db, sizeof(pShowMsg->db));
H
hzcheng 已提交
1252 1253
  }

1254 1255 1256 1257
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
H
Haojun Liao 已提交
1258
    SStrToken *pPattern = &pShowInfo->pattern;
1259 1260 1261 1262 1263
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
H
Haojun Liao 已提交
1264
    SStrToken *pEpAddr = &pShowInfo->prefix;
dengyihao's avatar
dengyihao 已提交
1265
    assert(pEpAddr->n > 0 && pEpAddr->type > 0);
H
hzcheng 已提交
1266

dengyihao's avatar
dengyihao 已提交
1267 1268
    strncpy(pShowMsg->payload, pEpAddr->z, pEpAddr->n);
    pShowMsg->payloadLen = htons(pEpAddr->n);
1269 1270
  }

1271
  pCmd->payloadLen = sizeof(SShowMsg) + htons(pShowMsg->payloadLen);
1272
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1273 1274
}

1275
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1276
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1277
  pCmd->payloadLen = sizeof(SKillQueryMsg);
H
hzcheng 已提交
1278

1279 1280
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
S
slguan 已提交
1281
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
1282 1283
      break;
    case TSDB_SQL_KILL_CONNECTION:
S
slguan 已提交
1284
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
1285 1286
      break;
    case TSDB_SQL_KILL_STREAM:
S
slguan 已提交
1287
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
1288 1289 1290
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1291 1292
}

1293
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1294
  SSqlCmd *pCmd = &(pSql->cmd);
1295
  int32_t size = minMsgSize() + sizeof(SCMCreateTableMsg) + sizeof(SCreateTableMsg);
H
hzcheng 已提交
1296

1297
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1298
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
1299 1300
    int32_t numOfTables = (int32_t)taosArrayGetSize(pInfo->pCreateTableInfo->childTableInfo);
    size += numOfTables * (sizeof(SCreateTableMsg) + TSDB_MAX_TAGS_LEN);
H
hzcheng 已提交
1301
  } else {
S
slguan 已提交
1302
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1303
  }
1304

1305 1306 1307
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1308 1309 1310 1311

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1312
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1313
  int              msgLen = 0;
S
slguan 已提交
1314
  SSchema *        pSchema;
H
hzcheng 已提交
1315
  int              size = 0;
1316 1317 1318
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1319
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1320 1321

  // Reallocate the payload size
1322
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1323 1324
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1325
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1326
  }
H
hzcheng 已提交
1327

1328
  SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
1329

1330 1331
  SCreateTableMsg* pCreateMsg = (SCreateTableMsg*)((char*) pCreateTableMsg + sizeof(SCMCreateTableMsg));
  char* pMsg = NULL;
H
hzcheng 已提交
1332

1333 1334 1335
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
    SArray* list = pInfo->pCreateTableInfo->childTableInfo;
1336

H
Haojun Liao 已提交
1337
    int32_t numOfTables = (int32_t) taosArrayGetSize(list);
1338
    pCreateTableMsg->numOfTables = htonl(numOfTables);
H
hzcheng 已提交
1339

1340 1341 1342
    pMsg = (char*) pCreateMsg;
    for(int32_t i = 0; i < numOfTables; ++i) {
      SCreateTableMsg* pCreate = (SCreateTableMsg*) pMsg;
H
hzcheng 已提交
1343

1344 1345 1346 1347 1348
      pCreate->numOfColumns = htons(pCmd->numOfCols);
      pCreate->numOfTags = htons(pCmd->count);
      pMsg += sizeof(SCreateTableMsg);

      SCreatedTableInfo* p = taosArrayGet(list, i);
H
Haojun Liao 已提交
1349
      strcpy(pCreate->tableFname, p->fullname);
1350 1351 1352 1353 1354 1355
      pCreate->igExists = (p->igExist)? 1 : 0;

      // use dbinfo from table id without modifying current db info
      tscGetDBInfoFromTableFullName(p->fullname, pCreate->db);
      pMsg = serializeTagData(&p->tagdata, pMsg);

H
Haojun Liao 已提交
1356
      int32_t len = (int32_t)(pMsg - (char*) pCreate);
1357 1358
      pCreate->len = htonl(len);
    }
1359
  } else {  // create (super) table
1360 1361
    pCreateTableMsg->numOfTables = htonl(1); // only one table will be created

H
Haojun Liao 已提交
1362
    strcpy(pCreateMsg->tableFname, pTableMetaInfo->name);
1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376

    // use dbinfo from table id without modifying current db info
    tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pCreateMsg->db);

    SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

    pCreateMsg->igExists = pCreateTable->existCheck ? 1 : 0;
    pCreateMsg->numOfColumns = htons(pCmd->numOfCols);
    pCreateMsg->numOfTags = htons(pCmd->count);

    pCreateMsg->sqlLen = 0;
    pMsg = (char *)pCreateMsg->schema;

    pSchema = (SSchema *)pCreateMsg->schema;
1377

H
hzcheng 已提交
1378
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
H
hjxilinx 已提交
1379
      TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hzcheng 已提交
1380 1381 1382 1383

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
1384

H
hzcheng 已提交
1385 1386 1387 1388
      pSchema++;
    }

    pMsg = (char *)pSchema;
1389 1390
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
1391

1392
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
1393
      pCreateMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
1394
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
1395 1396 1397
    }
  }

H
hjxilinx 已提交
1398
  tscFieldInfoClear(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
1399

S
TD-1057  
Shengliang Guan 已提交
1400
  msgLen = (int32_t)(pMsg - (char*)pCreateTableMsg);
S
slguan 已提交
1401
  pCreateTableMsg->contLen = htonl(msgLen);
H
hzcheng 已提交
1402
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1403
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
H
hzcheng 已提交
1404 1405

  assert(msgLen + minMsgSize() <= size);
1406
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1407 1408 1409
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
1410
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
S
TD-1732  
Shengliang Guan 已提交
1411
  return minMsgSize() + sizeof(SAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) + TSDB_EXTRA_PAYLOAD_SIZE;
H
hzcheng 已提交
1412 1413
}

1414
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1415 1416
  char *pMsg;
  int   msgLen = 0;
H
hzcheng 已提交
1417

1418
  SSqlCmd    *pCmd = &pSql->cmd;
1419 1420
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
1421
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1422 1423 1424
  
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;
  int size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
1425 1426
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
H
Haojun Liao 已提交
1427
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1428
  }
1429
  
S
TD-1732  
Shengliang Guan 已提交
1430
  SAlterTableMsg *pAlterTableMsg = (SAlterTableMsg *)pCmd->payload;
H
Haojun Liao 已提交
1431
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
1432

H
Haojun Liao 已提交
1433
  strcpy(pAlterTableMsg->tableFname, pTableMetaInfo->name);
1434
  pAlterTableMsg->type = htons(pAlterInfo->type);
1435

1436
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
S
slguan 已提交
1437
  SSchema *pSchema = pAlterTableMsg->schema;
1438
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
H
hjxilinx 已提交
1439
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
1440
  
H
hzcheng 已提交
1441 1442 1443 1444 1445 1446 1447
    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;
1448 1449 1450
  pAlterTableMsg->tagValLen = htonl(pAlterInfo->tagData.dataLen);
  memcpy(pMsg, pAlterInfo->tagData.data, pAlterInfo->tagData.dataLen);
  pMsg += pAlterInfo->tagData.dataLen;
H
hzcheng 已提交
1451

S
TD-1057  
Shengliang Guan 已提交
1452
  msgLen = (int32_t)(pMsg - (char*)pAlterTableMsg);
1453

H
hzcheng 已提交
1454
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1455
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
H
hzcheng 已提交
1456 1457

  assert(msgLen + minMsgSize() <= size);
1458

1459
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1460 1461
}

1462 1463 1464 1465
int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
  SSqlCmd* pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL;
  
1466
  SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
1467
  pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
1468

1469 1470 1471 1472
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  STableMeta *pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;

  SNewVgroupInfo vgroupInfo = {.vgId = -1};
1473
  taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
1474
  assert(vgroupInfo.vgId > 0);
1475

1476
  tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
1477

1478 1479 1480
  return TSDB_CODE_SUCCESS;
}

1481
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1482
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1483
  pCmd->payloadLen = sizeof(SAlterDbMsg);
S
slguan 已提交
1484
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
H
hzcheng 已提交
1485

S
TD-1732  
Shengliang Guan 已提交
1486
  SAlterDbMsg *pAlterDbMsg = (SAlterDbMsg* )pCmd->payload;
1487
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1488
  tstrncpy(pAlterDbMsg->db, pTableMetaInfo->name, sizeof(pAlterDbMsg->db));
H
hzcheng 已提交
1489

1490
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1491 1492
}

1493
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1494
  SSqlCmd *pCmd = &pSql->cmd;
J
jtao1735 已提交
1495
  pCmd->msgType = TSDB_MSG_TYPE_CM_RETRIEVE;
S
slguan 已提交
1496
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
1497

S
slguan 已提交
1498 1499
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1500
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
1501
  }
S
slguan 已提交
1502

S
slguan 已提交
1503 1504 1505 1506
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
1507

1508
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
1520

H
hzcheng 已提交
1521 1522 1523 1524 1525 1526
  pRes->code = TSDB_CODE_SUCCESS;
  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

1527 1528 1529 1530 1531 1532
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
    if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
      return pRes->code;
    }

    tscSetResRawPtr(pRes, pQueryInfo);
H
hzcheng 已提交
1533
  } else {
S
slguan 已提交
1534
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
1535 1536 1537 1538 1539 1540 1541
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
H
Haojun Liao 已提交
1542
      tscAsyncResultOnError(pSql);
H
hzcheng 已提交
1543 1544 1545 1546 1547 1548 1549
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
1550
  SSqlCmd *       pCmd = &pSql->cmd;
1551
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
1552

H
hjxilinx 已提交
1553
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1554 1555
  
  int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
H
hzcheng 已提交
1556 1557 1558
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
Haojun Liao 已提交
1559 1560 1561
int tscProcessLocalRetrieveRsp(SSqlObj *pSql) {
  int32_t numOfRes = 1;
  pSql->res.completed = true;
H
hzcheng 已提交
1562 1563 1564
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
hjxilinx 已提交
1565
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
H
hzcheng 已提交
1566
  SSqlRes *pRes = &pSql->res;
H
Haojun Liao 已提交
1567
  SSqlCmd* pCmd = &pSql->cmd;
H
hzcheng 已提交
1568

H
Haojun Liao 已提交
1569 1570
  int32_t code = pRes->code;
  if (pRes->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1571
    tscAsyncResultOnError(pSql);
H
Haojun Liao 已提交
1572 1573 1574
    return code;
  }

1575
  pRes->code = tscDoLocalMerge(pSql);
H
hzcheng 已提交
1576 1577

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
1578
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1579
    tscCreateResPointerInfo(pRes, pQueryInfo);
1580
    tscSetResRawPtr(pRes, pQueryInfo);
H
hzcheng 已提交
1581 1582 1583
  }

  pRes->row = 0;
1584
  pRes->completed = (pRes->numOfRows == 0);
H
hzcheng 已提交
1585

H
Haojun Liao 已提交
1586
  code = pRes->code;
H
hjxilinx 已提交
1587 1588 1589
  if (pRes->code == TSDB_CODE_SUCCESS) {
    (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
  } else {
H
Haojun Liao 已提交
1590
    tscAsyncResultOnError(pSql);
H
hzcheng 已提交
1591 1592 1593 1594 1595
  }

  return code;
}

S
slguan 已提交
1596
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
1597

1598
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1599
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1600
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1601
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
S
TD-1762  
Shengliang Guan 已提交
1602
  pCmd->payloadLen = sizeof(SConnectMsg);
H
hzcheng 已提交
1603

S
slguan 已提交
1604 1605
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1606
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1607 1608
  }

S
TD-1762  
Shengliang Guan 已提交
1609
  SConnectMsg *pConnect = (SConnectMsg*)pCmd->payload;
H
hzcheng 已提交
1610

H
Haojun Liao 已提交
1611
  // TODO refactor full_name
H
hzcheng 已提交
1612 1613 1614
  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
B
Bomin Zhang 已提交
1615 1616 1617
  tstrncpy(pConnect->db, db, sizeof(pConnect->db));
  tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion));
  tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion));
H
hzcheng 已提交
1618

H
Haojun Liao 已提交
1619 1620 1621
  pConnect->pid = htonl(taosGetPId());
  taosGetCurrentAPPName(pConnect->appName, NULL);

1622
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1623 1624
}

H
hjxilinx 已提交
1625
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1626 1627 1628
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
1629
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1630

S
TD-1732  
Shengliang Guan 已提交
1631
  STableInfoMsg *pInfoMsg = (STableInfoMsg *)pCmd->payload;
H
Haojun Liao 已提交
1632
  strcpy(pInfoMsg->tableFname, pTableMetaInfo->name);
H
hjxilinx 已提交
1633
  pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
S
slguan 已提交
1634

S
TD-1732  
Shengliang Guan 已提交
1635
  char *pMsg = (char *)pInfoMsg + sizeof(STableInfoMsg);
H
hzcheng 已提交
1636

1637 1638
  if (pCmd->autoCreated && pCmd->tagData.dataLen != 0) {
    pMsg = serializeTagData(&pCmd->tagData, pMsg);
H
hzcheng 已提交
1639 1640
  }

S
TD-1057  
Shengliang Guan 已提交
1641
  pCmd->payloadLen = (int32_t)(pMsg - (char*)pInfoMsg);
S
slguan 已提交
1642
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
H
hzcheng 已提交
1643

1644
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1645 1646
}

S
slguan 已提交
1647
/**
1648
 *  multi table meta req pkg format:
S
TD-1732  
Shengliang Guan 已提交
1649
 *  | SMgmtHead | SMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
1650 1651
 *      no used         4B
 **/
1652
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
guanshengliang's avatar
guanshengliang 已提交
1653
#if 0
S
slguan 已提交
1654 1655 1656 1657 1658
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
H
Haojun Liao 已提交
1659
    if ((tmpData = calloc(1, pCmd->payloadLen + 1)) == NULL) return -1;
S
slguan 已提交
1660 1661 1662 1663 1664
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
H
Haojun Liao 已提交
1665
  memset(pMgmt->db, 0, TSDB_TABLE_FNAME_LEN);  // server don't need the db
S
slguan 已提交
1666

S
TD-1732  
Shengliang Guan 已提交
1667
  SMultiTableInfoMsg *pInfoMsg = (SMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
S
slguan 已提交
1668
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
1669 1670

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
1671
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
1672 1673
  }

S
TD-1848  
Shengliang Guan 已提交
1674
  tfree(tmpData);
S
slguan 已提交
1675

S
TD-1732  
Shengliang Guan 已提交
1676
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SMultiTableInfoMsg);
S
slguan 已提交
1677
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
S
slguan 已提交
1678 1679 1680

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

1681
  tscDebug("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
1682 1683 1684
           pCmd->payloadLen);

  return pCmd->payloadLen;
guanshengliang's avatar
guanshengliang 已提交
1685 1686
#endif
  return 0;  
S
slguan 已提交
1687 1688
}

H
hjxilinx 已提交
1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705
//static UNUSED_FUNC int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
////  const int32_t defaultSize =
////      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
////  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
////
////  int32_t n = 0;
////  size_t size = taosArrayGetSize(pQueryInfo->tagCond.pCond);
////  for (int32_t i = 0; i < size; ++i) {
////    assert(0);
//////    n += strlen(pQueryInfo->tagCond.cond[i].cond);
////  }
////
////  int32_t tagLen = n * TSDB_NCHAR_SIZE;
////  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
////    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
////  }
////
H
Haojun Liao 已提交
1706
////  int32_t joinCondLen = (TSDB_TABLE_FNAME_LEN + sizeof(int16_t)) * 2;
H
hjxilinx 已提交
1707 1708 1709 1710 1711 1712 1713 1714
////  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
////
////  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndex);
////
////  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
////
////  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
//}
H
hzcheng 已提交
1715

H
hjxilinx 已提交
1716 1717
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hjxilinx 已提交
1718 1719 1720
  
  char* pMsg = pCmd->payload;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
S
TD-1732  
Shengliang Guan 已提交
1721 1722

  SSTableVgroupMsg *pStableVgroupMsg = (SSTableVgroupMsg *)pMsg;
H
hjxilinx 已提交
1723
  pStableVgroupMsg->numOfTables = htonl(pQueryInfo->numOfTables);
S
TD-1732  
Shengliang Guan 已提交
1724 1725 1726
  pMsg += sizeof(SSTableVgroupMsg);

  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
1727
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
1728 1729 1730
    size_t size = sizeof(pTableMetaInfo->name);
    tstrncpy(pMsg, pTableMetaInfo->name, size);
    pMsg += size;
H
hjxilinx 已提交
1731
  }
H
hjxilinx 已提交
1732 1733

  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
S
TD-1057  
Shengliang Guan 已提交
1734
  pCmd->payloadLen = (int32_t)(pMsg - pCmd->payload);
H
hjxilinx 已提交
1735

1736
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1737 1738
}

1739 1740
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1741 1742
  STscObj *pObj = pSql->pTscObj;

1743
  pthread_mutex_lock(&pObj->mutex);
H
hzcheng 已提交
1744

S
Shengliang Guan 已提交
1745
  int32_t numOfQueries = 2;
H
hzcheng 已提交
1746 1747 1748
  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
    tpSql = tpSql->next;
1749
    numOfQueries++;
H
hzcheng 已提交
1750 1751
  }

S
Shengliang Guan 已提交
1752
  int32_t numOfStreams = 2;
H
hzcheng 已提交
1753 1754 1755
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
    pStream = pStream->next;
1756
    numOfStreams++;
H
hzcheng 已提交
1757 1758
  }

S
TD-1732  
Shengliang Guan 已提交
1759
  int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SHeartBeatMsg) + 100;
S
slguan 已提交
1760
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
1761
    pthread_mutex_unlock(&pObj->mutex);
H
Haojun Liao 已提交
1762
    tscError("%p failed to create heartbeat msg", pSql);
H
Haojun Liao 已提交
1763
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1764
  }
H
hzcheng 已提交
1765

H
Haojun Liao 已提交
1766
  // TODO the expired hb and client can not be identified by server till now.
S
TD-1732  
Shengliang Guan 已提交
1767
  SHeartBeatMsg *pHeartbeat = (SHeartBeatMsg *)pCmd->payload;
H
Haojun Liao 已提交
1768 1769
  tstrncpy(pHeartbeat->clientVer, version, tListLen(pHeartbeat->clientVer));

1770 1771
  pHeartbeat->numOfQueries = numOfQueries;
  pHeartbeat->numOfStreams = numOfStreams;
H
Haojun Liao 已提交
1772 1773 1774 1775

  pHeartbeat->pid = htonl(taosGetPId());
  taosGetCurrentAPPName(pHeartbeat->appName, NULL);

1776
  int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj);
H
hzcheng 已提交
1777 1778 1779 1780

  pthread_mutex_unlock(&pObj->mutex);

  pCmd->payloadLen = msgLen;
S
slguan 已提交
1781
  pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
H
hzcheng 已提交
1782

1783
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1784 1785
}

1786 1787
int tscProcessTableMetaRsp(SSqlObj *pSql) {
  STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
H
hzcheng 已提交
1788

H
Haojun Liao 已提交
1789
  pMetaMsg->tid = htonl(pMetaMsg->tid);
1790
  pMetaMsg->sversion = htons(pMetaMsg->sversion);
1791
  pMetaMsg->tversion = htons(pMetaMsg->tversion);
H
hjxilinx 已提交
1792 1793
  pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
  
1794 1795
  pMetaMsg->uid = htobe64(pMetaMsg->uid);
  pMetaMsg->contLen = htons(pMetaMsg->contLen);
1796
  pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1797

H
Haojun Liao 已提交
1798
  if ((pMetaMsg->tableType != TSDB_SUPER_TABLE) &&
H
Haojun Liao 已提交
1799
      (pMetaMsg->tid <= 0 || pMetaMsg->vgroup.vgId < 2 || pMetaMsg->vgroup.numOfEps <= 0)) {
H
Haojun Liao 已提交
1800
    tscError("invalid value in table numOfEps:%d, vgId:%d tid:%d, name:%s", pMetaMsg->vgroup.numOfEps, pMetaMsg->vgroup.vgId,
H
Haojun Liao 已提交
1801
             pMetaMsg->tid, pMetaMsg->tableFname);
1802
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1803 1804
  }

B
Bomin Zhang 已提交
1805
  if (pMetaMsg->numOfTags > TSDB_MAX_TAGS) {
1806
    tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
1807
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1808 1809
  }

1810 1811
  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
1812
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1813 1814
  }

1815 1816
  for (int i = 0; i < pMetaMsg->vgroup.numOfEps; ++i) {
    pMetaMsg->vgroup.epAddr[i].port = htons(pMetaMsg->vgroup.epAddr[i].port);
H
hzcheng 已提交
1817 1818
  }

1819
  SSchema* pSchema = pMetaMsg->schema;
H
hzcheng 已提交
1820

1821
  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
H
hzcheng 已提交
1822 1823 1824
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);
1825 1826 1827 1828 1829

    if (pSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
      assert(i == 0);
    }

1830
    assert(isValidDataType(pSchema->type));
H
hzcheng 已提交
1831 1832
    pSchema++;
  }
1833
  
1834
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1835
  assert(pTableMetaInfo->pTableMeta == NULL);
H
hzcheng 已提交
1836

H
Haojun Liao 已提交
1837 1838 1839 1840 1841 1842
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg);
  if (!isValidSchema(pTableMeta->schema, pTableMeta->tableInfo.numOfColumns, pTableMeta->tableInfo.numOfTags)) {
    tscError("%p invalid table meta from mnode, name:%s", pSql, pTableMetaInfo->name);
    return TSDB_CODE_TSC_INVALID_VALUE;
  }

1843 1844 1845
  if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
    // check if super table hashmap or not
    int32_t len = (int32_t) strnlen(pTableMeta->sTableName, TSDB_TABLE_FNAME_LEN);
H
Haojun Liao 已提交
1846

1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864
    // super tableMeta data alreay exists, create it according to tableMeta and add it to hash map
    STableMeta* pSupTableMeta = createSuperTableMeta(pMetaMsg);

    uint32_t size = tscGetTableMetaSize(pSupTableMeta);
    int32_t code = taosHashPut(tscTableMetaInfo, pTableMeta->sTableName, len, pSupTableMeta, size);
    assert(code == TSDB_CODE_SUCCESS);

    tfree(pSupTableMeta);

    CChildTableMeta* cMeta = tscCreateChildMeta(pTableMeta);
    taosHashPut(tscTableMetaInfo, pTableMetaInfo->name, strlen(pTableMetaInfo->name), cMeta, sizeof(CChildTableMeta));
    tfree(cMeta);
  } else {
    uint32_t s = tscGetTableMetaSize(pTableMeta);
    taosHashPut(tscTableMetaInfo, pTableMetaInfo->name, strlen(pTableMetaInfo->name), pTableMeta, s);
  }

  // update the vgroupInfo if needed
1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877
  if (pTableMeta->vgId > 0) {
    int32_t vgId = pTableMeta->vgId;
    assert(pTableMeta->tableType != TSDB_SUPER_TABLE);

    SNewVgroupInfo vgroupInfo = {.inUse = -1};
    taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));

    if (((vgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&vgroupInfo, &pMetaMsg->vgroup)) ||
        (vgroupInfo.inUse < 0)) {  // vgroup info exists, compare with it
      vgroupInfo = createNewVgroupInfo(&pMetaMsg->vgroup);
      taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo));
      tscDebug("add new VgroupInfo, vgId:%d, total:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap));
    }
1878
  }
H
hzcheng 已提交
1879

1880
  tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->id.uid, pTableMeta->id.tid, pTableMetaInfo->name);
1881
  free(pTableMeta);
1882
  
H
hjxilinx 已提交
1883
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1884 1885
}

S
slguan 已提交
1886
/**
1887
 *  multi table meta rsp pkg format:
1888 1889
 *  | STaosRsp | SMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
 *  |...... 1B                    4B
S
slguan 已提交
1890 1891
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
1892
#if 0
S
slguan 已提交
1893 1894 1895 1896 1897
  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
1898
    pSql->res.code = TSDB_CODE_TSC_INVALID_IE;
S
slguan 已提交
1899
    pSql->res.numOfTotal = 0;
1900
    return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1901 1902 1903 1904
  }

  rsp++;

S
TD-1732  
Shengliang Guan 已提交
1905
  SMultiTableInfoMsg *pInfo = (SMultiTableInfoMsg *)rsp;
S
slguan 已提交
1906
  totalNum = htonl(pInfo->numOfTables);
S
TD-1732  
Shengliang Guan 已提交
1907
  rsp += sizeof(SMultiTableInfoMsg);
S
slguan 已提交
1908 1909

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
1910
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
1911
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
1912 1913 1914

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
H
hjxilinx 已提交
1915
    pMeta->vgId = htonl(pMeta->vgId);
S
slguan 已提交
1916 1917
    pMeta->uid = htobe64(pMeta->uid);

H
hjxilinx 已提交
1918 1919
    if (pMeta->sid <= 0 || pMeta->vgId < 0) {
      tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
1920
      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
S
slguan 已提交
1921
      pSql->res.numOfTotal = i;
1922
      return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1923 1924
    }

H
hjxilinx 已提交
1925 1926 1927 1928
    //    pMeta->numOfColumns = htons(pMeta->numOfColumns);
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid tag value count:%d", pMeta->numOfTags);
1929
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1930
    //      pSql->res.numOfTotal = i;
1931
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1932 1933 1934 1935
    //    }
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid numOfTags:%d", pMeta->numOfTags);
1936
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1937
    //      pSql->res.numOfTotal = i;
1938
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1939 1940 1941 1942
    //    }
    //
    //    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
    //      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
1943
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1944
    //      pSql->res.numOfTotal = i;
1945
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1946 1947
    //    }
    //
H
hjxilinx 已提交
1948
    //    for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) {
H
hjxilinx 已提交
1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981
    //      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    //    }
    //
    //    pMeta->rowSize = 0;
    //    rsp += sizeof(SMultiTableMeta);
    //    pSchema = (SSchema *)rsp;
    //
    //    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    //    for (int j = 0; j < numOfTotalCols; ++j) {
    //      pSchema->bytes = htons(pSchema->bytes);
    //      pSchema->colId = htons(pSchema->colId);
    //
    //      // ignore the tags length
    //      if (j < pMeta->numOfColumns) {
    //        pMeta->rowSize += pSchema->bytes;
    //      }
    //      pSchema++;
    //    }
    //
    //    rsp += numOfTotalCols * sizeof(SSchema);
    //
    //    int32_t  tagLen = 0;
    //    SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
    //
    //    if (pMeta->tableType == TSDB_CHILD_TABLE) {
    //      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
    //        tagLen += pTagsSchema[j].bytes;
    //      }
    //    }
    //
    //    rsp += tagLen;
    //    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
    //  }
S
slguan 已提交
1982
  }
H
hjxilinx 已提交
1983
  
S
slguan 已提交
1984 1985
  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
1986
  tscDebug("%p load multi-metermeta resp from complete num:%d", pSql, pSql->res.numOfTotal);
1987 1988
#endif
  
S
slguan 已提交
1989 1990 1991
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
1992
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
D
TD-2516  
dapan1121 已提交
1993 1994 1995 1996 1997 1998 1999 2000
  // master sqlObj locates in param
  SSqlObj* parent = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pSql->param);
  if(parent == NULL) {
    return pSql->res.code;
  }

  assert(parent->signature == parent && (int64_t)pSql->param == parent->self);
  
2001
  SSqlRes* pRes = &pSql->res;
H
hjxilinx 已提交
2002
  
H
hjxilinx 已提交
2003
  // NOTE: the order of several table must be preserved.
S
TD-1732  
Shengliang Guan 已提交
2004
  SSTableVgroupRspMsg *pStableVgroup = (SSTableVgroupRspMsg *)pRes->pRsp;
H
hjxilinx 已提交
2005
  pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables);
S
TD-1732  
Shengliang Guan 已提交
2006
  char *pMsg = pRes->pRsp + sizeof(SSTableVgroupRspMsg);
H
hjxilinx 已提交
2007
  
2008
  SSqlCmd* pCmd = &parent->cmd;
H
hjxilinx 已提交
2009 2010 2011
  for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) {
    STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);

H
Haojun Liao 已提交
2012 2013 2014
    SVgroupsMsg *  pVgroupMsg = (SVgroupsMsg *) pMsg;
    pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups);

S
TD-1732  
Shengliang Guan 已提交
2015
    size_t size = sizeof(SVgroupMsg) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsMsg);
H
Haojun Liao 已提交
2016

S
TD-1732  
Shengliang Guan 已提交
2017
    size_t vgroupsz = sizeof(SVgroupInfo) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsInfo);
H
Haojun Liao 已提交
2018
    pInfo->vgroupList = calloc(1, vgroupsz);
H
hjxilinx 已提交
2019 2020
    assert(pInfo->vgroupList != NULL);

H
Haojun Liao 已提交
2021
    pInfo->vgroupList->numOfVgroups = pVgroupMsg->numOfVgroups;
H
hjxilinx 已提交
2022
    for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
dengyihao's avatar
dengyihao 已提交
2023
      //just init, no need to lock
S
TD-1732  
Shengliang Guan 已提交
2024
      SVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
H
Haojun Liao 已提交
2025

S
TD-1732  
Shengliang Guan 已提交
2026
      SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j];
H
Haojun Liao 已提交
2027 2028
      pVgroups->vgId = htonl(vmsg->vgId);
      pVgroups->numOfEps = vmsg->numOfEps;
H
Haojun Liao 已提交
2029

H
Haojun Liao 已提交
2030
      assert(pVgroups->numOfEps >= 1 && pVgroups->vgId >= 1);
H
hjxilinx 已提交
2031

2032
      for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
H
Haojun Liao 已提交
2033 2034
        pVgroups->epAddr[k].port = htons(vmsg->epAddr[k].port);
        pVgroups->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, tListLen(vmsg->epAddr[k].fqdn));
H
hjxilinx 已提交
2035
      }
2036
    }
2037 2038

    pMsg += size;
H
hjxilinx 已提交
2039
  }
D
TD-2516  
dapan1121 已提交
2040 2041

  taosReleaseRef(tscObjRef, parent->self);
H
hjxilinx 已提交
2042
  
S
slguan 已提交
2043
  return pSql->res.code;
H
hzcheng 已提交
2044 2045 2046
}

int tscProcessShowRsp(SSqlObj *pSql) {
S
TD-1732  
Shengliang Guan 已提交
2047 2048 2049
  STableMetaMsg *pMetaMsg;
  SShowRsp *     pShow;
  SSchema *      pSchema;
H
hzcheng 已提交
2050

2051 2052 2053
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
2054
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2055

H
hjxilinx 已提交
2056
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
2057

S
TD-1732  
Shengliang Guan 已提交
2058
  pShow = (SShowRsp *)pRes->pRsp;
S
slguan 已提交
2059
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
2060 2061
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
2062
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
2063
  pMetaMsg = &(pShow->tableMeta);
H
hzcheng 已提交
2064

H
hjxilinx 已提交
2065
  pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
H
hzcheng 已提交
2066

H
hjxilinx 已提交
2067
  pSchema = pMetaMsg->schema;
H
Haojun Liao 已提交
2068
  pMetaMsg->tid = ntohs(pMetaMsg->tid);
H
hjxilinx 已提交
2069
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
hzcheng 已提交
2070 2071 2072 2073
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

2074 2075
  tfree(pTableMetaInfo->pTableMeta);
  pTableMetaInfo->pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg);
H
hzcheng 已提交
2076

H
hjxilinx 已提交
2077
  SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
2078 2079 2080 2081
  if (pQueryInfo->colList == NULL) {
    pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
  }
  
H
hjxilinx 已提交
2082 2083
  SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
  
S
slguan 已提交
2084
  SColumnIndex index = {0};
H
hjxilinx 已提交
2085 2086 2087
  pSchema = pMetaMsg->schema;
  
  for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) {
S
slguan 已提交
2088
    index.columnIndex = i;
2089 2090
    tscColumnListInsert(pQueryInfo->colList, &index);
    
H
hjxilinx 已提交
2091
    TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes);
H
Haojun Liao 已提交
2092
    SInternalField* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
H
hjxilinx 已提交
2093
    
H
hjxilinx 已提交
2094
    pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
H
Haojun Liao 已提交
2095
                     pTableSchema[i].type, pTableSchema[i].bytes, getNewResColId(pQueryInfo), pTableSchema[i].bytes, false);
H
hzcheng 已提交
2096
  }
H
hjxilinx 已提交
2097 2098
  
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2099
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hzcheng 已提交
2100 2101 2102
  return 0;
}

H
Haojun Liao 已提交
2103
static void createHbObj(STscObj* pObj) {
2104
  if (pObj->hbrid != 0) {
2105 2106 2107 2108 2109 2110 2111 2112
    return;
  }

  SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
  if (NULL == pSql) return;

  pSql->fp = tscProcessHeartBeatRsp;

2113 2114
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetailSafely(&pSql->cmd, 0);
  if (pQueryInfo == NULL) {
H
Haojun Liao 已提交
2115 2116
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    tfree(pSql);
2117 2118 2119
    return;
  }

2120 2121 2122 2123
  pQueryInfo->command = TSDB_SQL_HB;

  pSql->cmd.command = pQueryInfo->command;
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
H
Haojun Liao 已提交
2124
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
S
TD-1848  
Shengliang Guan 已提交
2125
    tfree(pSql);
2126 2127 2128 2129 2130 2131 2132
    return;
  }

  pSql->param = pObj;
  pSql->pTscObj = pObj;
  pSql->signature = pSql;

2133 2134
  registerSqlObj(pSql);
  tscDebug("%p HB is allocated, pObj:%p", pSql, pObj);
2135

2136
  pObj->hbrid = pSql->self;
2137 2138
}

H
hzcheng 已提交
2139 2140 2141 2142
int tscProcessConnectRsp(SSqlObj *pSql) {
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

H
Haojun Liao 已提交
2143 2144
  char temp[TSDB_TABLE_FNAME_LEN * 2] = {0};

S
TD-1762  
Shengliang Guan 已提交
2145
  SConnectRsp *pConnect = (SConnectRsp *)pRes->pRsp;
B
Bomin Zhang 已提交
2146
  tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId));  // copy acctId from response
2147 2148
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

B
Bomin Zhang 已提交
2149 2150
  assert(len <= sizeof(pObj->db));
  tstrncpy(pObj->db, temp, sizeof(pObj->db));
H
hjxilinx 已提交
2151
  
dengyihao's avatar
dengyihao 已提交
2152 2153
  if (pConnect->epSet.numOfEps > 0) {
    tscEpSetHtons(&pConnect->epSet);
dengyihao's avatar
TD-2257  
dengyihao 已提交
2154
    tscUpdateMgmtEpSet(pSql, &pConnect->epSet);
2155 2156 2157 2158

    for (int i = 0; i < pConnect->epSet.numOfEps; ++i) {
      tscDebug("%p epSet.fqdn[%d]: %s, pObj:%p", pSql, i, pConnect->epSet.fqdn[i], pObj);
    }
dengyihao's avatar
dengyihao 已提交
2159
  } 
H
hzcheng 已提交
2160

S
slguan 已提交
2161
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
2162 2163
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
S
Shengliang Guan 已提交
2164
  pObj->connId = htonl(pConnect->connId);
2165

H
Haojun Liao 已提交
2166
  createHbObj(pObj);
H
Haojun Liao 已提交
2167 2168

  //launch a timer to send heartbeat to maintain the connection and send status to mnode
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
2169
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, (void *)pObj->rid, tscTmr, &pObj->pTimer);
H
hzcheng 已提交
2170 2171 2172 2173 2174

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
2175
  STscObj *       pObj = pSql->pTscObj;
2176
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2177

B
Bomin Zhang 已提交
2178
  tstrncpy(pObj->db, pTableMetaInfo->name, sizeof(pObj->db));
H
hzcheng 已提交
2179 2180 2181
  return 0;
}

Y
TD-1039  
yihaoDeng 已提交
2182 2183
int tscProcessDropDbRsp(SSqlObj *pSql) {
  pSql->pTscObj->db[0] = 0;
2184
  taosHashEmpty(tscTableMetaInfo);
H
hzcheng 已提交
2185 2186 2187 2188
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
2189
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2190

2191 2192 2193 2194
  //The cached tableMeta is expired in this case, so clean it in hash table
  taosHashRemove(tscTableMetaInfo, pTableMetaInfo->name, strnlen(pTableMetaInfo->name, TSDB_TABLE_FNAME_LEN));
  tscDebug("%p remove table meta after drop table:%s, numOfRemain:%d", pSql, pTableMetaInfo->name,
           (int32_t) taosHashGetSize(tscTableMetaInfo));
H
hzcheng 已提交
2195

D
dapan1121 已提交
2196
  pTableMetaInfo->pTableMeta = NULL;
H
hzcheng 已提交
2197 2198 2199 2200
  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
2201
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2202

2203 2204
  char* name = pTableMetaInfo->name;
  tscDebug("%p remove tableMeta in hashMap after alter-table: %s", pSql, name);
H
hzcheng 已提交
2205

2206 2207 2208
  bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
  taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
  tfree(pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2209

2210 2211
  if (isSuperTable) {  // if it is a super table, iterate the hashTable and remove all the childTableMeta
    taosHashEmpty(tscTableMetaInfo);
H
hzcheng 已提交
2212 2213 2214 2215 2216 2217 2218 2219 2220
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}
2221

2222 2223 2224
int tscProcessShowCreateRsp(SSqlObj *pSql) {
  return tscLocalResultCommonBuilder(pSql, 1);
}
H
hzcheng 已提交
2225 2226 2227 2228

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

2229
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
2230 2231 2232
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
2233
  pRes->data = NULL;
S
slguan 已提交
2234
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2235 2236 2237
  return 0;
}

H
hjxilinx 已提交
2238
int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
S
slguan 已提交
2239 2240 2241
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
2242
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
2243 2244 2245 2246
  if (pRetrieve == NULL) {
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pRes->code;
  }
H
hzcheng 已提交
2247 2248 2249

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
2250 2251
  pRes->offset    = htobe64(pRetrieve->offset);
  pRes->useconds  = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
2252
  pRes->completed = (pRetrieve->completed == 1);
2253
  pRes->data      = pRetrieve->data;
H
hjxilinx 已提交
2254
  
2255
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2256 2257 2258
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
    return pRes->code;
  }
2259

H
Haojun Liao 已提交
2260 2261 2262 2263 2264 2265
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  if (pCmd->command == TSDB_SQL_RETRIEVE) {
    tscSetResRawPtr(pRes, pQueryInfo);
  } else if ((UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY)) {
    tscSetResRawPtr(pRes, pQueryInfo);
  } else if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
2266 2267 2268
    tscSetResRawPtr(pRes, pQueryInfo);
  }

weixin_48148422's avatar
weixin_48148422 已提交
2269
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
2270
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2271
    
H
hjxilinx 已提交
2272
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
H
hjxilinx 已提交
2273 2274
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
2275 2276
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
2277
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
2278
    p += sizeof(int32_t);
S
slguan 已提交
2279
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
2280 2281
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
weixin_48148422's avatar
weixin_48148422 已提交
2282
      p += sizeof(int32_t); // skip tid
weixin_48148422's avatar
weixin_48148422 已提交
2283 2284
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
2285
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
2286
    }
2287 2288
  }

H
hzcheng 已提交
2289
  pRes->row = 0;
H
Haojun Liao 已提交
2290
  tscDebug("%p numOfRows:%d, offset:%" PRId64 ", complete:%d", pSql, pRes->numOfRows, pRes->offset, pRes->completed);
H
hzcheng 已提交
2291 2292 2293 2294

  return 0;
}

H
hjxilinx 已提交
2295
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
H
hzcheng 已提交
2296

2297
static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
S
slguan 已提交
2298 2299
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
2300
    tscError("%p malloc failed for new sqlobj to get table meta", pSql);
2301
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2302
  }
2303

H
hzcheng 已提交
2304 2305 2306
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
2307

2308
  tscAddSubqueryInfo(&pNew->cmd);
2309

2310
  SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0);
2311

H
hjxilinx 已提交
2312
  pNew->cmd.autoCreated = pSql->cmd.autoCreated;  // create table if not exists
B
Bomin Zhang 已提交
2313
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) {
2314
    tscError("%p malloc failed for payload to get table meta", pSql);
2315
    tscFreeSqlObj(pNew);
2316
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2317 2318
  }

H
hjxilinx 已提交
2319
  STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
2320
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
2321

B
Bomin Zhang 已提交
2322
  tstrncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, sizeof(pNewMeterMetaInfo->name));
2323

2324 2325 2326
  if (pSql->cmd.autoCreated) {
    int32_t code = copyTagData(&pNew->cmd.tagData, &pSql->cmd.tagData);
    if (code != TSDB_CODE_SUCCESS) {
2327 2328 2329 2330 2331 2332
      tscError("%p malloc failed for new tag data to get table meta", pSql);
      tscFreeSqlObj(pNew);
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }

2333
  tscDebug("%p new pSqlObj:%p to get tableMeta, auto create:%d", pSql, pNew, pNew->cmd.autoCreated);
H
hzcheng 已提交
2334

D
TD-2516  
dapan1121 已提交
2335 2336
  registerSqlObj(pNew);

H
hjxilinx 已提交
2337
  pNew->fp = tscTableMetaCallBack;
D
TD-2516  
dapan1121 已提交
2338
  pNew->param = (void *)pSql->self;
H
hzcheng 已提交
2339

D
TD-2516  
dapan1121 已提交
2340 2341 2342
  tscDebug("%p metaRid from %" PRId64 " to %" PRId64 , pSql, pSql->metaRid, pNew->self);
  
  pSql->metaRid = pNew->self;
2343

H
hjxilinx 已提交
2344 2345
  int32_t code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
2346
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;  // notify application that current process needs to be terminated
H
hzcheng 已提交
2347 2348 2349 2350 2351
  }

  return code;
}

H
hjxilinx 已提交
2352
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
H
hjxilinx 已提交
2353
  assert(strlen(pTableMetaInfo->name) != 0);
2354
  tfree(pTableMetaInfo->pTableMeta);
S
slguan 已提交
2355

2356 2357 2358
  uint32_t size = tscGetTableMetaMaxSize();
  pTableMetaInfo->pTableMeta = calloc(1, size);

D
TD-2516  
dapan1121 已提交
2359
  pTableMetaInfo->pTableMeta->tableType = -1;
2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373
  pTableMetaInfo->pTableMeta->tableInfo.numOfColumns  = -1;
  int32_t len = (int32_t) strlen(pTableMetaInfo->name);

  taosHashGetClone(tscTableMetaInfo, pTableMetaInfo->name, len, NULL, pTableMetaInfo->pTableMeta, -1);

  // TODO resize the tableMeta
  STableMeta* pMeta = pTableMetaInfo->pTableMeta;
  if (pMeta->id.uid > 0) {
    if (pMeta->tableType == TSDB_CHILD_TABLE) {
      int32_t code = tscCreateTableMetaFromCChildMeta(pTableMetaInfo->pTableMeta, pTableMetaInfo->name);
      if (code != TSDB_CODE_SUCCESS) {
        return getTableMetaFromMnode(pSql, pTableMetaInfo);
      }
    }
H
hzcheng 已提交
2374 2375 2376

    return TSDB_CODE_SUCCESS;
  }
2377

2378
  return getTableMetaFromMnode(pSql, pTableMetaInfo);
H
hzcheng 已提交
2379 2380
}

2381
int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
H
hjxilinx 已提交
2382
  pSql->cmd.autoCreated = createIfNotExists;
H
hjxilinx 已提交
2383
  return tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
2384 2385 2386
}

/**
H
Haojun Liao 已提交
2387
 * retrieve table meta from mnode, and update the local table meta hashmap.
H
hzcheng 已提交
2388
 * @param pSql          sql object
B
Bomin Zhang 已提交
2389
 * @param tableIndex    table index
H
hzcheng 已提交
2390 2391
 * @return              status code
 */
B
Bomin Zhang 已提交
2392
int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
H
hzcheng 已提交
2393
  SSqlCmd *pCmd = &pSql->cmd;
2394

H
Haojun Liao 已提交
2395
  SQueryInfo     *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
B
Bomin Zhang 已提交
2396
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
H
Haojun Liao 已提交
2397
  const char* name = pTableMetaInfo->name;
H
hzcheng 已提交
2398

H
Haojun Liao 已提交
2399
  STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
H
Haojun Liao 已提交
2400 2401 2402
  if (pTableMeta) {
    tscDebug("%p update table meta:%s, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64, pSql, name,
        tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid);
H
hzcheng 已提交
2403 2404
  }

H
Haojun Liao 已提交
2405 2406
  // remove stored tableMeta info in hash table
  taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
2407
  return getTableMetaFromMnode(pSql, pTableMetaInfo);
H
hzcheng 已提交
2408 2409
}

H
hjxilinx 已提交
2410
static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
2411
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2412
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2413
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2414 2415
    if (pTableMetaInfo->vgroupList == NULL) {
      return false;
S
slguan 已提交
2416 2417
    }
  }
H
hjxilinx 已提交
2418 2419 2420 2421
  
  // all super tables vgroupinfo are retrieved, no need to retrieve vgroup info anymore
  return true;
}
H
hzcheng 已提交
2422

H
hjxilinx 已提交
2423
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
2424
  int      code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hjxilinx 已提交
2425 2426 2427
  SSqlCmd *pCmd = &pSql->cmd;
  
  if (allVgroupInfoRetrieved(pCmd, clauseIndex)) {
H
hzcheng 已提交
2428 2429
    return TSDB_CODE_SUCCESS;
  }
H
hjxilinx 已提交
2430

S
slguan 已提交
2431
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
2432 2433 2434
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

H
hjxilinx 已提交
2435
  pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
H
Haojun Liao 已提交
2436 2437

  // TODO TEST IT
2438 2439
  SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0);
  if (pNewQueryInfo == NULL) {
B
Bomin Zhang 已提交
2440
    tscFreeSqlObj(pNew);
2441 2442
    return code;
  }
2443
  
H
hjxilinx 已提交
2444
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2445
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2446
    STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
2447
    STableMeta* pTableMeta = tscTableMetaClone(pMInfo->pTableMeta);
H
Haojun Liao 已提交
2448
    tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList, pMInfo->pVgroupTables);
S
slguan 已提交
2449 2450 2451 2452 2453 2454
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
2455

2456
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
2457
  registerSqlObj(pNew);
H
Haojun Liao 已提交
2458

D
TD-2516  
dapan1121 已提交
2459 2460 2461 2462 2463
  tscDebug("%p svgroupRid from %" PRId64 " to %" PRId64 , pSql, pSql->svgroupRid, pNew->self);
  
  pSql->svgroupRid = pNew->self;
  

2464
  tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
H
hzcheng 已提交
2465

2466
  pNew->fp = tscTableMetaCallBack;
D
TD-2516  
dapan1121 已提交
2467
  pNew->param = (void *)pSql->self;
2468 2469
  code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
2470
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2471 2472 2473 2474 2475
  }

  return code;
}

2476
void tscInitMsgsFp() {
S
slguan 已提交
2477 2478
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
J
jtao1735 已提交
2479
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;
H
hzcheng 已提交
2480 2481

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
2482
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
2483

2484 2485
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
2486 2487

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
S
[TD-16]  
slguan 已提交
2488
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserMsg;
H
hzcheng 已提交
2489 2490 2491
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
2492
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
2493 2494 2495
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
2496
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
2497
  tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
H
hzcheng 已提交
2498 2499 2500 2501
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
H
hjxilinx 已提交
2502
  tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
H
hjxilinx 已提交
2503
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
S
slguan 已提交
2504
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
2505 2506 2507 2508

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
2509 2510 2511
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
2512 2513

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
H
hjxilinx 已提交
2514
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
H
hzcheng 已提交
2515 2516 2517 2518 2519

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
2520
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
H
hjxilinx 已提交
2521
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
S
slguan 已提交
2522
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
2523 2524

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
H
hjxilinx 已提交
2525
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode;  // rsp handled by same function.
H
hzcheng 已提交
2526
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
2527

H
Haojun Liao 已提交
2528 2529 2530 2531 2532
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB]   = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION]  = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS]  = tscProcessLocalRetrieveRsp;
2533

H
hzcheng 已提交
2534 2535
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

H
hjxilinx 已提交
2536
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp;
H
hzcheng 已提交
2537 2538 2539 2540

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

2541 2542 2543 2544
  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp;
  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp;
  

H
hzcheng 已提交
2545 2546 2547 2548 2549 2550
  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;
}