tscServer.c 79.6 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17
#include "tcache.h"
H
Haojun Liao 已提交
18
#include "tcmdtype.h"
H
hzcheng 已提交
19
#include "trpc.h"
20 21
#include "tscLocalMerge.h"
#include "tscLog.h"
H
hzcheng 已提交
22 23 24 25 26 27
#include "tscProfile.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "ttimer.h"
#include "tutil.h"
dengyihao's avatar
dengyihao 已提交
28
#include "tlockfree.h"
H
hzcheng 已提交
29 30 31

#define TSC_MGMT_VNODE 999

32
SRpcCorEpSet  tscMgmtEpSet;
33
SRpcEpSet  tscDnodeEpSet;
S
slguan 已提交
34

35 36
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
37 38 39
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
40

B
Bomin Zhang 已提交
41
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt);
42 43
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
44

S
slguan 已提交
45
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
Haojun Liao 已提交
46 47 48 49 50 51 52 53
static int32_t getWaitingTimeInterval(int32_t count) {
  int32_t initial = 100; // 100 ms by default
  if (count <= 1) {
    return 0;
  }

  return initial * (2<<(count - 2));
}
H
hzcheng 已提交
54

55
static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
56 57
  assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);

58
  SRpcEpSet* pEpSet = &pSql->epSet;
59 60 61 62
  pEpSet->inUse = 0;

  // apply the FQDN string length check here
  bool hasFqdn = false;
dengyihao's avatar
bugfix  
dengyihao 已提交
63

64 65 66 67
  pEpSet->numOfEps = pVgroupInfo->numOfEps;
  for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
    strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn);
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
68 69 70 71

    if (!hasFqdn) {
      hasFqdn = (strlen(pEpSet->fqdn[i]) > 0);
    }
72
  }
73 74

  assert(hasFqdn);
75
}
76

dengyihao's avatar
dengyihao 已提交
77 78 79 80
static void tscDumpMgmtEpSet(SRpcEpSet *epSet) {
  taosCorBeginRead(&tscMgmtEpSet.version);
  *epSet = tscMgmtEpSet.epSet;
  taosCorEndRead(&tscMgmtEpSet.version);
81
}  
dengyihao's avatar
dengyihao 已提交
82 83
static void tscEpSetHtons(SRpcEpSet *s) {
   for (int32_t i = 0; i < s->numOfEps; i++) {
dengyihao's avatar
fixbug  
dengyihao 已提交
84 85 86
      s->port[i] = htons(s->port[i]);    
   }
} 
dengyihao's avatar
dengyihao 已提交
87 88
bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
   if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
89 90
     return false;
   } 
dengyihao's avatar
dengyihao 已提交
91
   for (int32_t i = 0; i < s1->numOfEps; i++) {
92 93 94 95 96
     if (s1->port[i] != s2->port[i] 
        || strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0)
        return false;
   }
   return true;
S
slguan 已提交
97
}
dengyihao's avatar
dengyihao 已提交
98
void tscUpdateMgmtEpSet(SRpcEpSet *pEpSet) {
99
  // no need to update if equal
dengyihao's avatar
dengyihao 已提交
100 101 102
  taosCorBeginWrite(&tscMgmtEpSet.version);
  tscMgmtEpSet.epSet = *pEpSet;
  taosCorEndWrite(&tscMgmtEpSet.version);
103
}
dengyihao's avatar
dengyihao 已提交
104
static void tscDumpEpSetFromVgroupInfo(SCMCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
105
  if (pVgroupInfo == NULL) { return;}
dengyihao's avatar
bugfix  
dengyihao 已提交
106
  taosCorBeginRead(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
107
  int8_t inUse = pVgroupInfo->inUse;
dengyihao's avatar
dengyihao 已提交
108 109 110
  pEpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0; 
  pEpSet->numOfEps = pVgroupInfo->numOfEps;  
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
Y
yihaoDeng 已提交
111
    tstrncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, sizeof(pEpSet->fqdn[i]));
dengyihao's avatar
dengyihao 已提交
112
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
S
slguan 已提交
113
  }
dengyihao's avatar
bugfix  
dengyihao 已提交
114
  taosCorEndRead(&pVgroupInfo->version);
S
slguan 已提交
115 116
}

dengyihao's avatar
dengyihao 已提交
117
static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
118 119
  SSqlCmd *pCmd = &pObj->cmd;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
dengyihao's avatar
dengyihao 已提交
120
  if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;}
dengyihao's avatar
bugfix  
dengyihao 已提交
121
  SCMCorVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->corVgroupInfo;
dengyihao's avatar
dengyihao 已提交
122

dengyihao's avatar
bugfix  
dengyihao 已提交
123
  taosCorBeginWrite(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
124
  tscDebug("before: Endpoint in use: %d", pVgroupInfo->inUse);
125 126
  pVgroupInfo->inUse = pEpSet->inUse;
  pVgroupInfo->numOfEps = pEpSet->numOfEps;
dengyihao's avatar
dengyihao 已提交
127
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; i++) {
Y
yihaoDeng 已提交
128
    tstrncpy(pVgroupInfo->epAddr[i].fqdn, pEpSet->fqdn[i], sizeof(pEpSet->fqdn[i]));
129
    pVgroupInfo->epAddr[i].port = pEpSet->port[i];
dengyihao's avatar
dengyihao 已提交
130
  }
dengyihao's avatar
dengyihao 已提交
131
  tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse);
dengyihao's avatar
bugfix  
dengyihao 已提交
132
  taosCorEndWrite(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
133
}
134 135 136 137
void tscPrintMgmtEp() {
  SRpcEpSet dump;
  tscDumpMgmtEpSet(&dump);
  if (dump.numOfEps <= 0) {
dengyihao's avatar
dengyihao 已提交
138
    tscError("invalid mnode EP list:%d", dump.numOfEps);
S
slguan 已提交
139
  } else {
140
    for (int i = 0; i < dump.numOfEps; ++i) {
141
      tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]);
S
slguan 已提交
142
    }
S
slguan 已提交
143
  }
S
slguan 已提交
144 145
}

H
hzcheng 已提交
146 147 148 149 150 151 152 153 154 155 156 157
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
  if (pObj != pObj->signature) {
    tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
    return;
  }

  SSqlObj *pSql = pObj->pHb;
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
158
    SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
dengyihao's avatar
dengyihao 已提交
159 160 161 162
    SRpcEpSet *      epSet = &pRsp->epSet;
    if (epSet->numOfEps > 0) {
      tscEpSetHtons(epSet);
      tscUpdateMgmtEpSet(epSet);
dengyihao's avatar
dengyihao 已提交
163
    } 
S
slguan 已提交
164

S
Shengliang Guan 已提交
165 166
    pSql->pTscObj->connId = htonl(pRsp->connId);

H
hzcheng 已提交
167 168
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
169
      return;
H
hzcheng 已提交
170
    } else {
S
slguan 已提交
171 172
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
173 174
    }
  } else {
175
    tscDebug("heart beat failed, code:%s", tstrerror(code));
H
hzcheng 已提交
176 177 178 179 180 181 182
  }

  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;
183 184 185
  if (pObj == NULL || pObj->signature != pObj) {
    return;
  }
H
hzcheng 已提交
186

187 188 189
  SSqlObj* pHB = pObj->pHb;
  if (pObj->pTimer != tmrId || pHB == NULL) {
    return;
H
hzcheng 已提交
190 191
  }

192 193 194
  if (tscShouldFreeHeartBeat(pHB)) {
    tscDebug("%p free HB object and release connection", pHB);
    tscFreeSqlObj(pHB);
H
hzcheng 已提交
195
    tscCloseTscObj(pObj);
H
Haojun Liao 已提交
196
  } else {
197
    int32_t code = tscProcessSql(pHB);
H
Haojun Liao 已提交
198
    if (code != TSDB_CODE_SUCCESS) {
199
      tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
H
Haojun Liao 已提交
200
    }
H
hzcheng 已提交
201 202 203 204
  }
}

int tscSendMsgToServer(SSqlObj *pSql) {
205
  STscObj* pObj = pSql->pTscObj;
H
hjxilinx 已提交
206 207 208
  SSqlCmd* pCmd = &pSql->cmd;
  
  char *pMsg = rpcMallocCont(pCmd->payloadLen);
S
slguan 已提交
209
  if (NULL == pMsg) {
210
    tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]);
211
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
212 213
  }

214 215
  // set the mgmt ip list
  if (pSql->cmd.command >= TSDB_SQL_MGMT) {
dengyihao's avatar
dengyihao 已提交
216
    tscDumpMgmtEpSet(&pSql->epSet);
J
jtao1735 已提交
217 218
  }

219
  memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
220

J
jtao1735 已提交
221
  SRpcMsg rpcMsg = {
222 223 224
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
      .contLen = pSql->cmd.payloadLen,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
225 226
      .ahandle = pSql,
      .handle  = &pSql->pRpcCtx,
H
hjxilinx 已提交
227
      .code    = 0
J
jtao1735 已提交
228
  };
H
hzcheng 已提交
229

H
Haojun Liao 已提交
230 231 232 233
  // NOTE: the rpc context should be acquired before sending data to server.
  // Otherwise, the pSql object may have been released already during the response function, which is
  // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
  // cause crash.
234
  rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg);
S
slguan 已提交
235
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
236 237
}

238
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
239
  SSqlObj *pSql = (SSqlObj *)rpcMsg->ahandle;
H
Haojun Liao 已提交
240
  if (pSql == NULL || pSql->signature != pSql) {
B
Bomin Zhang 已提交
241
    tscError("%p sql is already released", pSql);
242 243
    return;
  }
244

H
Haojun Liao 已提交
245
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
246 247
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
248

H
Haojun Liao 已提交
249
  if (pObj->signature != pObj) {
250
    tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
H
Haojun Liao 已提交
251 252 253 254 255 256

    tscFreeSqlObj(pSql);
    rpcFreeCont(rpcMsg->pCont);
    return;
  }

257 258
  pSql->pRpcCtx = NULL;    // clear the rpcCtx

H
Haojun Liao 已提交
259 260
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
261
    tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
262
        pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
H
Haojun Liao 已提交
263

H
Haojun Liao 已提交
264
    tscFreeSqlObj(pSql);
265
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
266
    return;
H
hzcheng 已提交
267 268
  }

269
  if (pEpSet) { 
dengyihao's avatar
dengyihao 已提交
270
    if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
dengyihao's avatar
fixbug  
dengyihao 已提交
271
      if(pCmd->command < TSDB_SQL_MGMT)  { 
272
        tscUpdateVgroupInfo(pSql, pEpSet); 
dengyihao's avatar
dengyihao 已提交
273
      } else {
dengyihao's avatar
dengyihao 已提交
274
        tscUpdateMgmtEpSet(pEpSet);
dengyihao's avatar
fixbug  
dengyihao 已提交
275
    }
dengyihao's avatar
dengyihao 已提交
276
    }
J
jtao1735 已提交
277 278
  }

279 280 281 282 283 284 285
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);

  int32_t cmd = pCmd->command;
  if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
      (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
       rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
       rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL ||
S
Shengliang Guan 已提交
286
       rpcMsg->code == TSDB_CODE_APP_NOT_READY ||
287 288 289 290 291 292 293
       rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) {
    tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);

    // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
    if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
      pSql->cmd.submitSchema = 1;
    }
294

295 296 297 298
    pSql->res.code = rpcMsg->code;  // keep the previous error code
    if (pSql->retry > pSql->maxRetry) {
      tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
    } else {
H
Haojun Liao 已提交
299 300 301 302 303 304
      // wait for a little bit moment and then retry
      if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
        int32_t duration = getWaitingTimeInterval(pSql->retry);
        taosMsleep(duration);
      }

305 306 307 308 309 310
      rpcMsg->code = tscRenewTableMeta(pSql, pTableMetaInfo->name);

      // if there is an error occurring, proceed to the following error handling procedure.
      if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
        rpcFreeCont(rpcMsg->pCont);
        return;
H
hzcheng 已提交
311 312
      }
    }
S
slguan 已提交
313
  }
314

H
hzcheng 已提交
315
  pRes->rspLen = 0;
316
  
317 318
  if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
    pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hzcheng 已提交
319
  } else {
320
    tscDebug("%p query is cancelled, code:%s", pSql, tstrerror(pRes->code));
H
hzcheng 已提交
321 322
  }

S
slguan 已提交
323
  if (pRes->code == TSDB_CODE_SUCCESS) {
324
    tscDebug("%p reset retry counter to be 0 due to success rsp, old:%d", pSql, pSql->retry);
S
slguan 已提交
325 326 327
    pSql->retry = 0;
  }

328
  if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
329
    assert(rpcMsg->msgType == pCmd->msgType + 1);
330
    pRes->code    = rpcMsg->code;
331
    pRes->rspType = rpcMsg->msgType;
332
    pRes->rspLen  = rpcMsg->contLen;
H
hzcheng 已提交
333

334
    if (pRes->rspLen > 0 && rpcMsg->pCont) {
335 336
      char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
      if (tmp == NULL) {
337
        pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
338 339
      } else {
        pRes->pRsp = tmp;
340
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
341
      }
342 343
    } else {
      pRes->pRsp = NULL;
S
slguan 已提交
344 345
    }

H
hzcheng 已提交
346 347 348 349
    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
350
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
351
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
352 353 354 355 356 357 358
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
359
      tscDebug("%p SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql, sqlCmd[pCmd->command], 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
360
          tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
S
slguan 已提交
361
    } else {
362
      tscDebug("%p SQL cmd:%s, code:%s rspLen:%d", pSql, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
H
hzcheng 已提交
363 364
    }
  }
365
  
H
Haojun Liao 已提交
366
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
367
    rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
Haojun Liao 已提交
368
  }
S
Shengliang Guan 已提交
369

370
  if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
S
TD-1057  
Shengliang Guan 已提交
371
    rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
372
    
H
hjxilinx 已提交
373
    bool shouldFree = tscShouldBeFreed(pSql);
H
Haojun Liao 已提交
374
    (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
H
hzcheng 已提交
375

376
    if (shouldFree) {
377
      tscDebug("%p sqlObj is automatically freed", pSql);
sangshuduo's avatar
sangshuduo 已提交
378
      tscFreeSqlObj(pSql);
H
hzcheng 已提交
379 380 381
    }
  }

382
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
383 384
}

S
slguan 已提交
385 386 387
int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
388

H
hjxilinx 已提交
389 390 391 392 393 394 395
  if (pCmd->command == TSDB_SQL_SELECT ||
      pCmd->command == TSDB_SQL_FETCH ||
      pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT ||
      pCmd->command == TSDB_SQL_CONNECT ||
      pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META ||
H
hjxilinx 已提交
396
      pCmd->command == TSDB_SQL_STABLEVGROUP) {
397 398 399 400 401 402
    pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
  }
  
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return pRes->code;
S
slguan 已提交
403
  }
404

405 406 407
  int32_t code = tscSendMsgToServer(pSql);

  // NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads.
408
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
409
    pRes->code = code;
H
hjxilinx 已提交
410
    tscQueueAsyncRes(pSql);
411
    return pRes->code;
S
slguan 已提交
412
  }
H
hjxilinx 已提交
413 414
  
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
415 416 417
}

int tscProcessSql(SSqlObj *pSql) {
418
  char    *name = NULL;
419
  SSqlCmd *pCmd = &pSql->cmd;
420
  
421
  SQueryInfo     *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
422
  STableMetaInfo *pTableMetaInfo = NULL;
423
  uint32_t        type = 0;
424

425
  if (pQueryInfo != NULL) {
H
hjxilinx 已提交
426
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
427
    name = (pTableMetaInfo != NULL)? pTableMetaInfo->name:NULL;
428
    type = pQueryInfo->type;
429

H
hjxilinx 已提交
430
    // while numOfTables equals to 0, it must be Heartbeat
H
hjxilinx 已提交
431
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
432
  }
433

434
  tscDebug("%p SQL cmd:%s will be processed, name:%s, type:%d", pSql, sqlCmd[pCmd->command], name, type);
H
hjxilinx 已提交
435
  if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL
H
hjxilinx 已提交
436
    if (pTableMetaInfo == NULL) {
437
      pSql->res.code = TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
438 439
      return pSql->res.code;
    }
H
hjxilinx 已提交
440
  } else if (pCmd->command < TSDB_SQL_LOCAL) {
441
    //pSql->epSet = tscMgmtEpSet;
H
hzcheng 已提交
442 443 444
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }
445
  
S
slguan 已提交
446 447
  return doProcessSql(pSql);
}
H
hzcheng 已提交
448

H
hjxilinx 已提交
449
void tscKillSTableQuery(SSqlObj *pSql) {
450 451 452
  SSqlCmd* pCmd = &pSql->cmd;
  
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
453
  if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
H
hzcheng 已提交
454 455 456 457 458
    return;
  }

  for (int i = 0; i < pSql->numOfSubs; ++i) {
    SSqlObj *pSub = pSql->pSubs[i];
S
slguan 已提交
459
    if (pSub == NULL) {
H
hzcheng 已提交
460 461
      continue;
    }
S
slguan 已提交
462

H
hzcheng 已提交
463 464
    /*
     * here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
H
Haojun Liao 已提交
465
     * sub-queries not correctly released and master sql object of super table query reaches an abnormal state.
H
hzcheng 已提交
466
     */
dengyihao's avatar
dengyihao 已提交
467 468 469
    rpcCancelRequest(pSub->pRpcCtx);
    pSub->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
    tscQueueAsyncRes(pSub);
H
hzcheng 已提交
470 471 472 473 474
  }

  /*
   * 1. if the subqueries are not launched or partially launched, we need to waiting the launched
   * query return to successfully free allocated resources.
475
   * 2. if no any subqueries are launched yet, which means the super table query only in parse sql stage,
H
hzcheng 已提交
476 477 478 479 480
   * set the res.code, and return.
   */
  const int64_t MAX_WAITING_TIME = 10000;  // 10 Sec.
  int64_t       stime = taosGetTimestampMs();

H
hjxilinx 已提交
481
  while (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
H
hzcheng 已提交
482 483 484 485 486 487
    taosMsleep(100);
    if (taosGetTimestampMs() - stime > MAX_WAITING_TIME) {
      break;
    }
  }

488
  tscDebug("%p super table query cancelled", pSql);
H
hzcheng 已提交
489 490
}

J
jtao1735 已提交
491
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
492
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
S
slguan 已提交
493
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
S
slguan 已提交
494

H
Haojun Liao 已提交
495
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
S
slguan 已提交
496
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
497

498
  // todo valid the vgroupId at the client side
H
hjxilinx 已提交
499 500
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  
weixin_48148422's avatar
weixin_48148422 已提交
501
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
502
    int32_t vgIndex = pTableMetaInfo->vgroupIndex;
503
    
H
hjxilinx 已提交
504
    SVgroupsInfo* pVgroupInfo = pTableMetaInfo->vgroupList;
H
Haojun Liao 已提交
505 506
    assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);

H
hjxilinx 已提交
507
    pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
508
  } else {
H
hjxilinx 已提交
509
    STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
510
    pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
511
  }
512 513

  pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg);
J
jtao1735 已提交
514
  pSql->cmd.msgType = TSDB_MSG_TYPE_FETCH;
515 516 517

  pRetrieveMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));

518
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
519 520
}

521
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hjxilinx 已提交
522
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
523
  STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
H
hjxilinx 已提交
524
  
525
  char* pMsg = pSql->cmd.payload;
526 527 528
  
  // NOTE: shell message size should not include SMsgDesc
  int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
529 530
  int32_t vgId = pTableMeta->vgroupInfo.vgId;

531
  SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
532 533
  pMsgDesc->numOfVnodes = htonl(1); // always one vnode

534
  pMsg += sizeof(SMsgDesc);
535
  SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
536

H
hjxilinx 已提交
537
  pShellMsg->header.vgId = htonl(vgId);
H
Haojun Liao 已提交
538
  pShellMsg->header.contLen = htonl(size);      // the length not includes the size of SMsgDesc
539
  pShellMsg->length = pShellMsg->header.contLen;
H
hjxilinx 已提交
540
  
H
Haojun Liao 已提交
541
  pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit);  // number of tables to be inserted
H
hzcheng 已提交
542

H
hjxilinx 已提交
543
  // pSql->cmd.payloadLen is set during copying data into payload
S
slguan 已提交
544
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
dengyihao's avatar
dengyihao 已提交
545
  tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet);
546

547 548
  tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
      pSql->epSet.numOfEps);
549
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
550 551 552
}

/*
553
 * for table query, simply return the size <= 1k
H
hzcheng 已提交
554
 */
555
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
556
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
557
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
558

S
TD-1057  
Shengliang Guan 已提交
559
  int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo));
H
hjxilinx 已提交
560 561
  
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
S
TD-1057  
Shengliang Guan 已提交
562
  int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs);
H
hjxilinx 已提交
563
  
564
  return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + 4096;
H
hzcheng 已提交
565 566
}

567
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
568
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
B
Bomin Zhang 已提交
569
  TSKEY dfltKey = htobe64(pQueryMsg->window.skey);
570

H
hjxilinx 已提交
571
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
weixin_48148422's avatar
weixin_48148422 已提交
572
  if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
573 574
    
    SCMVgroupInfo* pVgroupInfo = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
575
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
576 577
      int32_t index = pTableMetaInfo->vgroupIndex;
      assert(index >= 0);
H
hjxilinx 已提交
578
  
B
Bomin Zhang 已提交
579
      if (pTableMetaInfo->vgroupList->numOfVgroups > 0) {
H
Haojun Liao 已提交
580
        assert(index < pTableMetaInfo->vgroupList->numOfVgroups);
B
Bomin Zhang 已提交
581 582
        pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
      }
583
      tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups);
H
hjxilinx 已提交
584 585
    } else {
      pVgroupInfo = &pTableMeta->vgroupInfo;
586
    }
weixin_48148422's avatar
weixin_48148422 已提交
587

588 589 590 591
    assert(pVgroupInfo != NULL);

    tscSetDnodeEpSet(pSql, pVgroupInfo);
    pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
weixin_48148422's avatar
weixin_48148422 已提交
592

593
    STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
594 595 596
    pTableIdInfo->tid = htonl(pTableMeta->id.tid);
    pTableIdInfo->uid = htobe64(pTableMeta->id.uid);
    pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->id.uid, dfltKey));
weixin_48148422's avatar
weixin_48148422 已提交
597

598 599
    pQueryMsg->numOfTables = htonl(1);  // set the number of tables
    pMsg += sizeof(STableIdInfo);
600
  } else { // it is a subquery of the super table query, this EP info is acquired from vgroupInfo
601
    int32_t index = pTableMetaInfo->vgroupIndex;
S
TD-1057  
Shengliang Guan 已提交
602
    int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
603
    assert(index >= 0 && index < numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
604

605
    tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
606

607
    SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
608

dengyihao's avatar
bugfix  
dengyihao 已提交
609
    // set the vgroup info 
610
    tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo);
611 612
    pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
    
S
TD-1057  
Shengliang Guan 已提交
613
    int32_t numOfTables = (int32_t)taosArrayGetSize(pTableIdList->itemList);
614 615 616 617 618 619 620 621 622
    pQueryMsg->numOfTables = htonl(numOfTables);  // set the number of tables
  
    // serialize each table id info
    for(int32_t i = 0; i < numOfTables; ++i) {
      STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i);
      
      STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
      pTableIdInfo->tid = htonl(pItem->tid);
      pTableIdInfo->uid = htobe64(pItem->uid);
B
Bomin Zhang 已提交
623
      pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pItem->uid, dfltKey));
624 625 626 627
      pMsg += sizeof(STableIdInfo);
    }
  }
  
628
  tscDebug("%p vgId:%d, query on table:%s, tid:%d, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name,
629
      pTableMeta->id.tid, pTableMeta->id.uid);
H
hjxilinx 已提交
630
  
631 632 633
  return pMsg;
}

634
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
635 636
  SSqlCmd *pCmd = &pSql->cmd;

637
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
638

S
slguan 已提交
639 640
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
H
Haojun Liao 已提交
641
    return -1;  // todo add test for this
S
slguan 已提交
642
  }
643
  
644
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
645
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
646
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
647
  
H
hjxilinx 已提交
648
  if (taosArrayGetSize(pQueryInfo->colList) <= 0 && !tscQueryTags(pQueryInfo)) {
649 650 651
    tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
    return -1;
  }
652 653 654 655 656 657 658 659 660 661
  
  if (pQueryInfo->intervalTime < 0) {
    tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
    return -1;
  }
  
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
    return -1;
  }
662

663
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
H
hzcheng 已提交
664

S
TD-1057  
Shengliang Guan 已提交
665
  int32_t numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList);
666
  
667
  if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
H
hjxilinx 已提交
668 669
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey);
H
hzcheng 已提交
670
  } else {
H
hjxilinx 已提交
671 672
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.ekey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey);
H
hzcheng 已提交
673 674
  }

675 676
  pQueryMsg->order          = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId     = htons(pQueryInfo->order.orderColId);
H
Haojun Liao 已提交
677
  pQueryMsg->fillType       = htons(pQueryInfo->fillType);
678 679
  pQueryMsg->limit          = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset         = htobe64(pQueryInfo->limit.offset);
S
TD-1057  
Shengliang Guan 已提交
680
  pQueryMsg->numOfCols      = htons((int16_t)taosArrayGetSize(pQueryInfo->colList));
681 682
  pQueryMsg->intervalTime   = htobe64(pQueryInfo->intervalTime);
  pQueryMsg->slidingTime    = htobe64(pQueryInfo->slidingTime);
H
hjxilinx 已提交
683
  pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
684
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
685
  pQueryMsg->numOfTags      = htonl(numOfTags);
weixin_48148422's avatar
weixin_48148422 已提交
686
  pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
H
Haojun Liao 已提交
687
  pQueryMsg->queryType      = htonl(pQueryInfo->type);
H
hjxilinx 已提交
688 689
  
  size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
S
TD-1057  
Shengliang Guan 已提交
690
  pQueryMsg->numOfOutput = htons((int16_t)numOfOutput);
H
hzcheng 已提交
691 692

  // set column list ids
693 694
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
H
hjxilinx 已提交
695
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
696
  
697 698 699
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
    SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
700

701
    if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
702
        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
703 704
      tscError("%p tid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
          pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
705 706
               pColSchema->name);

707
      return TSDB_CODE_TSC_INVALID_SQL;
708
    }
H
hzcheng 已提交
709 710 711

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
712
    pQueryMsg->colList[i].type  = htons(pColSchema->type);
S
slguan 已提交
713
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
714

S
slguan 已提交
715 716 717
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
718

S
slguan 已提交
719
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
720
      pFilterMsg->filterstr = htons(pColFilter->filterstr);
S
slguan 已提交
721 722

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
723

724
      if (pColFilter->filterstr) {
S
slguan 已提交
725
        pFilterMsg->len = htobe64(pColFilter->len);
726
        memcpy(pMsg, (void *)pColFilter->pz, (size_t)(pColFilter->len + 1));
S
slguan 已提交
727 728 729 730 731 732 733 734
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
735

S
slguan 已提交
736 737 738 739 740
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
        return -1;
      }
    }
H
hzcheng 已提交
741 742
  }

H
hjxilinx 已提交
743
  SSqlFuncMsg *pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hjxilinx 已提交
744
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
745
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
746

H
hjxilinx 已提交
747
    if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId)) {
748
      /* column id is not valid according to the cached table meta, the table meta is expired */
H
hzcheng 已提交
749 750 751 752
      tscError("%p table schema is not matched with parsed sql", pSql);
      return -1;
    }

753 754 755
    pSqlFuncExpr->colInfo.colId    = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
    pSqlFuncExpr->colInfo.flag     = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
756

757
    pSqlFuncExpr->functionId  = htons(pExpr->functionId);
H
hzcheng 已提交
758
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
H
hjxilinx 已提交
759
    pMsg += sizeof(SSqlFuncMsg);
H
hzcheng 已提交
760 761

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
H
Haojun Liao 已提交
762
      // todo add log
H
hzcheng 已提交
763 764 765 766 767
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
768
        pMsg += pExpr->param[j].nLen;
H
hzcheng 已提交
769 770 771 772 773
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

H
hjxilinx 已提交
774
    pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hzcheng 已提交
775
  }
776
  
777
  // serialize the table info (sid, uid, tags)
778 779
  pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
  
780
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
781
  if (pGroupbyExpr->numOfGroupCols > 0) {
S
slguan 已提交
782
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
783 784
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
785
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
786 787
      SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
  
S
slguan 已提交
788 789 790
      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

791 792
      *((int16_t *)pMsg) += pCol->colIndex;
      pMsg += sizeof(pCol->colIndex);
S
slguan 已提交
793 794 795

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
796 797 798
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
799 800 801
    }
  }

802
  if (pQueryInfo->fillType != TSDB_FILL_NONE) {
H
hjxilinx 已提交
803
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
804 805
      *((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]);
      pMsg += sizeof(pQueryInfo->fillVal[0]);
S
slguan 已提交
806 807
    }
  }
808 809 810 811 812 813 814 815 816
  
  if (numOfTags != 0) {
    int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
    int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
    int32_t total = numOfTagColumns + numOfColumns;
    
    pSchema = tscGetTableTagSchema(pTableMeta);
    
    for (int32_t i = 0; i < numOfTags; ++i) {
H
hjxilinx 已提交
817
      SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
818 819 820 821
      SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];

      if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
          (pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
822 823
        tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
                 pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, total, numOfTagColumns,
B
Bomin Zhang 已提交
824
                 pCol->colIndex.columnIndex, pColSchema->name);
825

826
        return TSDB_CODE_TSC_INVALID_SQL;
827 828 829 830 831 832 833 834 835 836 837 838
      }
  
      SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
  
      pTagCol->colId = htons(pColSchema->colId);
      pTagCol->bytes = htons(pColSchema->bytes);
      pTagCol->type  = htons(pColSchema->type);
      pTagCol->numOfFilters = 0;
      
      pMsg += sizeof(SColumnInfo);
    }
  }
S
slguan 已提交
839

H
Haojun Liao 已提交
840 841 842 843
  // serialize tag column query condition
  if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
    STagCond* pTagCond = &pQueryInfo->tagCond;
    
844
    SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid);
H
Haojun Liao 已提交
845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860
    if (pCond != NULL && pCond->cond != NULL) {
      pQueryMsg->tagCondLen = htons(pCond->len);
      memcpy(pMsg, pCond->cond, pCond->len);
      
      pMsg += pCond->len;
    }
  }
  
  if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
    *pMsg = 0;
    pMsg++;
  } else {
    strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
    pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
  }

S
slguan 已提交
861
  // compressed ts block
S
TD-1057  
Shengliang Guan 已提交
862
  pQueryMsg->tsOffset = htonl((int32_t)(pMsg - pCmd->payload));
S
slguan 已提交
863 864 865
  int32_t tsLen = 0;
  int32_t numOfBlocks = 0;

866
  if (pQueryInfo->tsBuf != NULL) {
H
hjxilinx 已提交
867
    STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vgroupIndex);
868
    assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL);  // this query should not be sent
S
slguan 已提交
869 870

    // todo refactor
B
Bomin Zhang 已提交
871 872 873 874 875
    if (fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET) != 0) {
      int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
      tscError("%p: fseek failed: %s", pSql, tstrerror(code));
      return code;
    }
H
Haojun Liao 已提交
876 877 878

    size_t s = fread(pMsg, 1, pBlockInfo->compLen, pQueryInfo->tsBuf->f);
    if (s != pBlockInfo->compLen) {
B
Bomin Zhang 已提交
879 880 881 882
      int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
      tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
      return code;
    }
S
slguan 已提交
883 884 885 886

    pMsg += pBlockInfo->compLen;
    tsLen = pBlockInfo->compLen;
    numOfBlocks = pBlockInfo->numOfBlocks;
H
hzcheng 已提交
887 888
  }

S
slguan 已提交
889 890
  pQueryMsg->tsLen = htonl(tsLen);
  pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
891 892
  if (pQueryInfo->tsBuf != NULL) {
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
H
hzcheng 已提交
893 894
  }

S
TD-1057  
Shengliang Guan 已提交
895
  int32_t msgLen = (int32_t)(pMsg - pCmd->payload);
H
hzcheng 已提交
896

897
  tscDebug("%p msg built success,len:%d bytes", pSql, msgLen);
H
hzcheng 已提交
898
  pCmd->payloadLen = msgLen;
S
slguan 已提交
899
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hjxilinx 已提交
900
  
901
  pQueryMsg->head.contLen = htonl(msgLen);
S
TD-1057  
Shengliang Guan 已提交
902
  assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize);
903 904

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
905 906
}

907 908
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
909
  pCmd->payloadLen = sizeof(SCMCreateDbMsg);
S
slguan 已提交
910
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
H
hzcheng 已提交
911

912
  SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload;
913

914
  assert(pCmd->numOfClause == 1);
915
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
916
  tstrncpy(pCreateDbMsg->db, pTableMetaInfo->name, sizeof(pCreateDbMsg->db));
H
hzcheng 已提交
917

918
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
919 920
}

921 922
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
923
  pCmd->payloadLen = sizeof(SCMCreateDnodeMsg);
S
slguan 已提交
924 925
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
926
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
927
  }
H
hzcheng 已提交
928

929
  SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload;
J
jtao1735 已提交
930 931
  strncpy(pCreate->ep, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
  
S
slguan 已提交
932
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
H
hzcheng 已提交
933

934
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
935 936
}

937 938
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
939
  pCmd->payloadLen = sizeof(SCMCreateAcctMsg);
S
slguan 已提交
940 941
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
942
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
943
  }
H
hzcheng 已提交
944

945
  SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
946

947 948
  SSQLToken *pName = &pInfo->pDCLInfo->user.user;
  SSQLToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
949

950 951
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
952

953
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
954

955 956 957 958 959 960 961 962
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
963

964 965 966 967 968 969 970 971 972 973 974 975 976
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
977

S
slguan 已提交
978
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
979
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
980 981
}

982 983
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
984
  pCmd->payloadLen = sizeof(SCMCreateUserMsg);
S
slguan 已提交
985

S
slguan 已提交
986 987
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
988
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
989 990
  }

991
  SCMCreateUserMsg *pAlterMsg = (SCMCreateUserMsg*)pCmd->payload;
H
hzcheng 已提交
992

993 994
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
S
TD-1057  
Shengliang Guan 已提交
995
  pAlterMsg->flag = (int8_t)pUser->type;
H
hzcheng 已提交
996

997 998 999 1000
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
1001 1002
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
1003
  }
H
hzcheng 已提交
1004

1005
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
S
slguan 已提交
1006
    pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
1007
  } else {
S
slguan 已提交
1008
    pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
1009
  }
H
hzcheng 已提交
1010

1011
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1012 1013
}

1014 1015
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1016
  pCmd->payloadLen = sizeof(SCMCfgDnodeMsg);
guanshengliang's avatar
guanshengliang 已提交
1017
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONFIG_DNODE;
1018 1019
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
1020

1021 1022
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1023
  pCmd->payloadLen = sizeof(SCMDropDbMsg);
H
hzcheng 已提交
1024

S
slguan 已提交
1025 1026
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1027
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1028 1029
  }

1030
  SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
1031

1032
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1033
  tstrncpy(pDropDbMsg->db, pTableMetaInfo->name, sizeof(pDropDbMsg->db));
1034
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1035

S
slguan 已提交
1036
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
1037
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1038 1039
}

1040 1041
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1042
  pCmd->payloadLen = sizeof(SCMDropTableMsg);
H
hzcheng 已提交
1043

S
slguan 已提交
1044 1045
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1046
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1047
  }
H
hzcheng 已提交
1048

1049
  SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
1050
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1051
  strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
S
slguan 已提交
1052
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1053

S
slguan 已提交
1054
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
1055
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1056 1057
}

1058
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1059
  SSqlCmd *pCmd = &pSql->cmd;
1060
  pCmd->payloadLen = sizeof(SCMDropDnodeMsg);
S
slguan 已提交
1061 1062
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1063
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1064
  }
H
hzcheng 已提交
1065

1066
  SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
1067
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1068
  tstrncpy(pDrop->ep, pTableMetaInfo->name, sizeof(pDrop->ep));
S
slguan 已提交
1069
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
H
hzcheng 已提交
1070

1071
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1072 1073
}

S
[TD-16]  
slguan 已提交
1074
int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1075
  SSqlCmd *pCmd = &pSql->cmd;
1076
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
S
slguan 已提交
1077
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
H
hzcheng 已提交
1078

S
slguan 已提交
1079 1080
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1081
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1082
  }
H
hzcheng 已提交
1083

1084
  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
1085
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1086
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
H
hzcheng 已提交
1087

1088
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1089 1090
}

S
[TD-16]  
slguan 已提交
1091 1092 1093 1094 1095 1096 1097
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_ACCT;

  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1098
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
[TD-16]  
slguan 已提交
1099 1100 1101 1102
  }

  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1103
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
S
[TD-16]  
slguan 已提交
1104 1105 1106 1107

  return TSDB_CODE_SUCCESS;
}

1108 1109
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1110
  pCmd->payloadLen = sizeof(SCMUseDbMsg);
H
hzcheng 已提交
1111

S
slguan 已提交
1112 1113
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1114
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1115
  }
1116

1117
  SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload;
1118
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1119
  strcpy(pUseDbMsg->db, pTableMetaInfo->name);
S
slguan 已提交
1120
  pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
H
hzcheng 已提交
1121

1122
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1123 1124
}

1125
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1126
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1127
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1128
  pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
1129
  pCmd->payloadLen = sizeof(SCMShowMsg) + 100;
H
hzcheng 已提交
1130

S
slguan 已提交
1131 1132
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1133
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1134
  }
H
hzcheng 已提交
1135

1136
  SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload;
S
slguan 已提交
1137

1138
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1139
  size_t nameLen = strlen(pTableMetaInfo->name);
S
slguan 已提交
1140
  if (nameLen > 0) {
B
Bomin Zhang 已提交
1141
    tstrncpy(pShowMsg->db, pTableMetaInfo->name, sizeof(pShowMsg->db));  // prefix is set here
H
hzcheng 已提交
1142
  } else {
B
Bomin Zhang 已提交
1143
    tstrncpy(pShowMsg->db, pObj->db, sizeof(pShowMsg->db));
H
hzcheng 已提交
1144 1145
  }

1146 1147 1148 1149 1150 1151 1152 1153 1154 1155
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
    SSQLToken *pPattern = &pShowInfo->pattern;
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
dengyihao's avatar
dengyihao 已提交
1156 1157
    SSQLToken *pEpAddr = &pShowInfo->prefix;
    assert(pEpAddr->n > 0 && pEpAddr->type > 0);
H
hzcheng 已提交
1158

dengyihao's avatar
dengyihao 已提交
1159 1160
    strncpy(pShowMsg->payload, pEpAddr->z, pEpAddr->n);
    pShowMsg->payloadLen = htons(pEpAddr->n);
1161 1162
  }

1163
  pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
1164
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1165 1166
}

1167
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1168
  SSqlCmd *pCmd = &pSql->cmd;
1169
  pCmd->payloadLen = sizeof(SCMKillQueryMsg);
H
hzcheng 已提交
1170

1171 1172
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
S
slguan 已提交
1173
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
1174 1175
      break;
    case TSDB_SQL_KILL_CONNECTION:
S
slguan 已提交
1176
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
1177 1178
      break;
    case TSDB_SQL_KILL_STREAM:
S
slguan 已提交
1179
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
1180 1181 1182
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1183 1184
}

1185
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1186 1187
  SSqlCmd *pCmd = &(pSql->cmd);

guanshengliang's avatar
guanshengliang 已提交
1188
  int32_t size = minMsgSize() + sizeof(SCMCreateTableMsg);
H
hzcheng 已提交
1189

1190
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1191
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
1192 1193
    size += sizeof(STagData);
  } else {
S
slguan 已提交
1194
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1195
  }
1196

1197 1198 1199
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1200 1201 1202 1203

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1204
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1205
  int              msgLen = 0;
S
slguan 已提交
1206
  SSchema *        pSchema;
H
hzcheng 已提交
1207
  int              size = 0;
1208 1209 1210
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1211
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1212 1213

  // Reallocate the payload size
1214
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1215 1216
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1217
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1218
  }
H
hzcheng 已提交
1219 1220


1221
  SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
H
hjxilinx 已提交
1222
  strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
1223 1224

  // use dbinfo from table id without modifying current db info
H
Haojun Liao 已提交
1225
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pCreateTableMsg->db);
H
hzcheng 已提交
1226

1227 1228 1229
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;
H
hzcheng 已提交
1230 1231 1232 1233
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
S
slguan 已提交
1234
  char *pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
1235

1236 1237
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
1238 1239 1240 1241 1242 1243 1244
    STagData* pTag = &pInfo->pCreateTableInfo->usingInfo.tagdata;
    *(int32_t*)pMsg = htonl(pTag->dataLen);
    pMsg += sizeof(int32_t);
    memcpy(pMsg, pTag->name, sizeof(pTag->name));
    pMsg += sizeof(pTag->name);
    memcpy(pMsg, pTag->data, pTag->dataLen);
    pMsg += pTag->dataLen;
1245
  } else {  // create (super) table
1246
    pSchema = (SSchema *)pCreateTableMsg->schema;
1247

H
hzcheng 已提交
1248
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
H
hjxilinx 已提交
1249
      TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hzcheng 已提交
1250 1251 1252 1253

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
1254

H
hzcheng 已提交
1255 1256 1257 1258
      pSchema++;
    }

    pMsg = (char *)pSchema;
1259 1260
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
1261

1262 1263 1264
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
1265 1266 1267
    }
  }

H
hjxilinx 已提交
1268
  tscFieldInfoClear(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
1269

S
TD-1057  
Shengliang Guan 已提交
1270
  msgLen = (int32_t)(pMsg - (char*)pCreateTableMsg);
S
slguan 已提交
1271
  pCreateTableMsg->contLen = htonl(msgLen);
H
hzcheng 已提交
1272
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1273
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
H
hzcheng 已提交
1274 1275

  assert(msgLen + minMsgSize() <= size);
1276
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1277 1278 1279
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
1280
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
guanshengliang's avatar
guanshengliang 已提交
1281
  return minMsgSize() + sizeof(SCMAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
1282 1283 1284
         TSDB_EXTRA_PAYLOAD_SIZE;
}

1285
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1286 1287
  char *pMsg;
  int   msgLen = 0;
H
hzcheng 已提交
1288

1289
  SSqlCmd    *pCmd = &pSql->cmd;
1290 1291
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
1292
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1293 1294 1295
  
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;
  int size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
1296 1297 1298 1299
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
    return -1;
  }
1300 1301
  
  SCMAlterTableMsg *pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
H
Haojun Liao 已提交
1302
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
1303

H
hjxilinx 已提交
1304
  strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
1305
  pAlterTableMsg->type = htons(pAlterInfo->type);
1306

1307
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
S
slguan 已提交
1308
  SSchema *pSchema = pAlterTableMsg->schema;
1309
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
H
hjxilinx 已提交
1310
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
1311
  
H
hzcheng 已提交
1312 1313 1314 1315 1316 1317 1318
    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;
1319 1320 1321
  pAlterTableMsg->tagValLen = htonl(pAlterInfo->tagData.dataLen);
  memcpy(pMsg, pAlterInfo->tagData.data, pAlterInfo->tagData.dataLen);
  pMsg += pAlterInfo->tagData.dataLen;
H
hzcheng 已提交
1322

S
TD-1057  
Shengliang Guan 已提交
1323
  msgLen = (int32_t)(pMsg - (char*)pAlterTableMsg);
1324

H
hzcheng 已提交
1325
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1326
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
H
hzcheng 已提交
1327 1328

  assert(msgLen + minMsgSize() <= size);
1329

1330
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1331 1332
}

1333 1334 1335 1336
int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
  SSqlCmd* pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL;
  
1337
  SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
1338
  pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
1339

1340 1341
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1342

dengyihao's avatar
dengyihao 已提交
1343
  tscDumpEpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->epSet);
1344

1345 1346 1347
  return TSDB_CODE_SUCCESS;
}

1348
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1349
  SSqlCmd *pCmd = &pSql->cmd;
1350
  pCmd->payloadLen = sizeof(SCMAlterDbMsg);
S
slguan 已提交
1351
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
H
hzcheng 已提交
1352

1353
  SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
1354
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1355
  tstrncpy(pAlterDbMsg->db, pTableMetaInfo->name, sizeof(pAlterDbMsg->db));
H
hzcheng 已提交
1356

1357
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1358 1359
}

1360
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1361
  SSqlCmd *pCmd = &pSql->cmd;
J
jtao1735 已提交
1362
  pCmd->msgType = TSDB_MSG_TYPE_CM_RETRIEVE;
S
slguan 已提交
1363
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
1364

S
slguan 已提交
1365 1366
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1367
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
1368
  }
S
slguan 已提交
1369

S
slguan 已提交
1370 1371 1372 1373
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
1374

1375
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1376 1377
}

1378
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
1379
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1380 1381 1382
    return pRes->code;
  }

H
hjxilinx 已提交
1383
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
H
hjxilinx 已提交
1384
    int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
H
hjxilinx 已提交
1385
    pRes->tsrow[i] = ((char*) pRes->data + offset * pRes->numOfRows);
H
hzcheng 已提交
1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
1400

1401
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1402

H
hzcheng 已提交
1403 1404 1405 1406 1407 1408
  pRes->code = TSDB_CODE_SUCCESS;
  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

1409
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1410
  } else {
S
slguan 已提交
1411
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
1427
  SSqlCmd *       pCmd = &pSql->cmd;
1428
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
1429

H
hjxilinx 已提交
1430
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1431 1432
  
  int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
H
hzcheng 已提交
1433 1434 1435
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
Haojun Liao 已提交
1436 1437 1438
int tscProcessLocalRetrieveRsp(SSqlObj *pSql) {
  int32_t numOfRes = 1;
  pSql->res.completed = true;
H
hzcheng 已提交
1439 1440 1441
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
hjxilinx 已提交
1442
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
H
hzcheng 已提交
1443 1444 1445
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

1446
  pRes->code = tscDoLocalMerge(pSql);
H
hjxilinx 已提交
1447
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1448 1449

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
1450
    tscCreateResPointerInfo(pRes, pQueryInfo);
H
hzcheng 已提交
1451 1452 1453
  }

  pRes->row = 0;
1454
  pRes->completed = (pRes->numOfRows == 0);
H
hzcheng 已提交
1455

1456
  int32_t code = pRes->code;
H
hjxilinx 已提交
1457 1458 1459 1460
  if (pRes->code == TSDB_CODE_SUCCESS) {
    (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
  } else {
    tscQueueAsyncRes(pSql);
H
hzcheng 已提交
1461 1462 1463 1464 1465
  }

  return code;
}

S
slguan 已提交
1466
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
1467

1468
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1469
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1470
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1471
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
1472
  pCmd->payloadLen = sizeof(SCMConnectMsg);
H
hzcheng 已提交
1473

S
slguan 已提交
1474 1475
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1476
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1477 1478
  }

1479
  SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
H
hzcheng 已提交
1480 1481 1482 1483

  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
B
Bomin Zhang 已提交
1484 1485 1486
  tstrncpy(pConnect->db, db, sizeof(pConnect->db));
  tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion));
  tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion));
H
hzcheng 已提交
1487

1488
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1489 1490
}

H
hjxilinx 已提交
1491
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1492
  SCMTableInfoMsg *pInfoMsg;
S
slguan 已提交
1493
  char *         pMsg;
H
hzcheng 已提交
1494 1495
  int            msgLen = 0;

B
Bomin Zhang 已提交
1496 1497 1498 1499
  char *tmpData = NULL;
  uint32_t len = pSql->cmd.payloadLen;
  if (len > 0) {
    tmpData = calloc(1, len);
1500
    if (NULL == tmpData) {
1501
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
1502 1503
    }

H
hzcheng 已提交
1504
    // STagData is in binary format, strncpy is not available
B
Bomin Zhang 已提交
1505
    memcpy(tmpData, pSql->cmd.payload, len);
H
hzcheng 已提交
1506 1507
  }

1508 1509 1510
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
1511
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1512

1513
  pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
H
hjxilinx 已提交
1514
  strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
H
hjxilinx 已提交
1515
  pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
S
slguan 已提交
1516

1517
  pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
H
hzcheng 已提交
1518

B
Bomin Zhang 已提交
1519 1520 1521
  if (pSql->cmd.autoCreated && len > 0) {
    memcpy(pInfoMsg->tags, tmpData, len);
    pMsg += len;
H
hzcheng 已提交
1522 1523
  }

S
TD-1057  
Shengliang Guan 已提交
1524
  pCmd->payloadLen = (int32_t)(pMsg - (char*)pInfoMsg);
S
slguan 已提交
1525
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
H
hzcheng 已提交
1526

S
Shengliang Guan 已提交
1527
  taosTFree(tmpData);
H
hzcheng 已提交
1528

S
TD-1057  
Shengliang Guan 已提交
1529
  assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize);
1530
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1531 1532
}

S
slguan 已提交
1533
/**
1534
 *  multi table meta req pkg format:
1535
 *  | SMgmtHead | SCMMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
1536 1537
 *      no used         4B
 **/
1538
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
guanshengliang's avatar
guanshengliang 已提交
1539
#if 0
S
slguan 已提交
1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
    tmpData = calloc(1, pCmd->payloadLen + 1);
    if (NULL == tmpData) return -1;
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
H
Haojun Liao 已提交
1552
  memset(pMgmt->db, 0, TSDB_TABLE_FNAME_LEN);  // server don't need the db
S
slguan 已提交
1553

1554
  SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
S
slguan 已提交
1555
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
1556 1557

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
1558
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
1559 1560
  }

S
Shengliang Guan 已提交
1561
  taosTFree(tmpData);
S
slguan 已提交
1562

1563
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1564
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
S
slguan 已提交
1565 1566 1567

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

1568
  tscDebug("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
1569 1570 1571
           pCmd->payloadLen);

  return pCmd->payloadLen;
guanshengliang's avatar
guanshengliang 已提交
1572 1573
#endif
  return 0;  
S
slguan 已提交
1574 1575
}

H
hjxilinx 已提交
1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592
//static UNUSED_FUNC int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
////  const int32_t defaultSize =
////      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
////  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
////
////  int32_t n = 0;
////  size_t size = taosArrayGetSize(pQueryInfo->tagCond.pCond);
////  for (int32_t i = 0; i < size; ++i) {
////    assert(0);
//////    n += strlen(pQueryInfo->tagCond.cond[i].cond);
////  }
////
////  int32_t tagLen = n * TSDB_NCHAR_SIZE;
////  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
////    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
////  }
////
H
Haojun Liao 已提交
1593
////  int32_t joinCondLen = (TSDB_TABLE_FNAME_LEN + sizeof(int16_t)) * 2;
H
hjxilinx 已提交
1594 1595 1596 1597 1598 1599 1600 1601
////  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
////
////  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndex);
////
////  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
////
////  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
//}
H
hzcheng 已提交
1602

H
hjxilinx 已提交
1603 1604
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hjxilinx 已提交
1605 1606 1607 1608 1609 1610 1611 1612 1613 1614
  
  char* pMsg = pCmd->payload;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  
  SCMSTableVgroupMsg *pStableVgroupMsg = (SCMSTableVgroupMsg *) pMsg;
  pStableVgroupMsg->numOfTables = htonl(pQueryInfo->numOfTables);
  pMsg += sizeof(SCMSTableVgroupMsg);
  
  for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
1615 1616 1617
    size_t size = sizeof(pTableMetaInfo->name);
    tstrncpy(pMsg, pTableMetaInfo->name, size);
    pMsg += size;
H
hjxilinx 已提交
1618
  }
H
hjxilinx 已提交
1619 1620

  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
S
TD-1057  
Shengliang Guan 已提交
1621
  pCmd->payloadLen = (int32_t)(pMsg - pCmd->payload);
H
hjxilinx 已提交
1622

1623
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1624 1625
}

1626 1627
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1628 1629
  STscObj *pObj = pSql->pTscObj;

1630
  pthread_mutex_lock(&pObj->mutex);
H
hzcheng 已提交
1631

S
Shengliang Guan 已提交
1632
  int32_t numOfQueries = 2;
H
hzcheng 已提交
1633 1634 1635
  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
    tpSql = tpSql->next;
1636
    numOfQueries++;
H
hzcheng 已提交
1637 1638
  }

S
Shengliang Guan 已提交
1639
  int32_t numOfStreams = 2;
H
hzcheng 已提交
1640 1641 1642
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
    pStream = pStream->next;
1643
    numOfStreams++;
H
hzcheng 已提交
1644 1645
  }

1646
  int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100;
S
slguan 已提交
1647
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
1648
    pthread_mutex_unlock(&pObj->mutex);
S
slguan 已提交
1649 1650 1651
    tscError("%p failed to malloc for heartbeat msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1652

1653
  SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
1654 1655
  pHeartbeat->numOfQueries = numOfQueries;
  pHeartbeat->numOfStreams = numOfStreams;
1656
  int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj);
H
hzcheng 已提交
1657 1658 1659 1660

  pthread_mutex_unlock(&pObj->mutex);

  pCmd->payloadLen = msgLen;
S
slguan 已提交
1661
  pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
H
hzcheng 已提交
1662

1663
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1664 1665
}

1666 1667
int tscProcessTableMetaRsp(SSqlObj *pSql) {
  STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
H
hzcheng 已提交
1668

1669 1670
  pMetaMsg->sid = htonl(pMetaMsg->sid);
  pMetaMsg->sversion = htons(pMetaMsg->sversion);
1671
  pMetaMsg->tversion = htons(pMetaMsg->tversion);
H
hjxilinx 已提交
1672 1673
  pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
  
1674 1675
  pMetaMsg->uid = htobe64(pMetaMsg->uid);
  pMetaMsg->contLen = htons(pMetaMsg->contLen);
1676
  pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1677

1678 1679
  if (pMetaMsg->sid < 0 || pMetaMsg->vgroup.numOfEps < 0) {
    tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgroup.numOfEps, pMetaMsg->sid);
1680
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1681 1682
  }

B
Bomin Zhang 已提交
1683
  if (pMetaMsg->numOfTags > TSDB_MAX_TAGS) {
1684
    tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
1685
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1686 1687
  }

1688 1689
  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
1690
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1691 1692
  }

1693 1694
  for (int i = 0; i < pMetaMsg->vgroup.numOfEps; ++i) {
    pMetaMsg->vgroup.epAddr[i].port = htons(pMetaMsg->vgroup.epAddr[i].port);
H
hzcheng 已提交
1695 1696
  }

1697
  SSchema* pSchema = pMetaMsg->schema;
H
hzcheng 已提交
1698

1699
  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
H
hzcheng 已提交
1700 1701 1702
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);
1703 1704 1705 1706 1707

    if (pSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
      assert(i == 0);
    }

H
hjxilinx 已提交
1708
    assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
1709 1710 1711
    pSchema++;
  }

1712 1713
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
1714
  
H
hzcheng 已提交
1715
  // todo add one more function: taosAddDataIfNotExists();
1716
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1717
  assert(pTableMetaInfo->pTableMeta == NULL);
H
hzcheng 已提交
1718

H
Haojun Liao 已提交
1719 1720
  pTableMetaInfo->pTableMeta = (STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name,
      strlen(pTableMetaInfo->name), pTableMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1721
  
1722
  // todo handle out of memory case
1723
  if (pTableMetaInfo->pTableMeta == NULL) {
B
Bomin Zhang 已提交
1724
    free(pTableMeta);
1725
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1726
  }
H
hzcheng 已提交
1727

1728
  tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->id.uid, pTableMeta->id.tid, pTableMetaInfo->name);
1729
  free(pTableMeta);
1730
  
H
hjxilinx 已提交
1731
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1732 1733
}

S
slguan 已提交
1734
/**
1735
 *  multi table meta rsp pkg format:
1736
 *  | STaosRsp | ieType | SCMMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
1737 1738 1739
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
1740
#if 0
S
slguan 已提交
1741 1742 1743 1744 1745
  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
1746
    pSql->res.code = TSDB_CODE_TSC_INVALID_IE;
S
slguan 已提交
1747
    pSql->res.numOfTotal = 0;
1748
    return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1749 1750 1751 1752
  }

  rsp++;

1753
  SCMMultiTableInfoMsg *pInfo = (SCMMultiTableInfoMsg *)rsp;
S
slguan 已提交
1754
  totalNum = htonl(pInfo->numOfTables);
1755
  rsp += sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1756 1757

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
1758
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
1759
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
1760 1761 1762

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
H
hjxilinx 已提交
1763
    pMeta->vgId = htonl(pMeta->vgId);
S
slguan 已提交
1764 1765
    pMeta->uid = htobe64(pMeta->uid);

H
hjxilinx 已提交
1766 1767
    if (pMeta->sid <= 0 || pMeta->vgId < 0) {
      tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
1768
      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
S
slguan 已提交
1769
      pSql->res.numOfTotal = i;
1770
      return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1771 1772
    }

H
hjxilinx 已提交
1773 1774 1775 1776
    //    pMeta->numOfColumns = htons(pMeta->numOfColumns);
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid tag value count:%d", pMeta->numOfTags);
1777
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1778
    //      pSql->res.numOfTotal = i;
1779
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1780 1781 1782 1783
    //    }
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid numOfTags:%d", pMeta->numOfTags);
1784
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1785
    //      pSql->res.numOfTotal = i;
1786
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1787 1788 1789 1790
    //    }
    //
    //    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
    //      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
1791
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1792
    //      pSql->res.numOfTotal = i;
1793
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1794 1795
    //    }
    //
H
hjxilinx 已提交
1796
    //    for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) {
H
hjxilinx 已提交
1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830
    //      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    //    }
    //
    //    pMeta->rowSize = 0;
    //    rsp += sizeof(SMultiTableMeta);
    //    pSchema = (SSchema *)rsp;
    //
    //    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    //    for (int j = 0; j < numOfTotalCols; ++j) {
    //      pSchema->bytes = htons(pSchema->bytes);
    //      pSchema->colId = htons(pSchema->colId);
    //
    //      // ignore the tags length
    //      if (j < pMeta->numOfColumns) {
    //        pMeta->rowSize += pSchema->bytes;
    //      }
    //      pSchema++;
    //    }
    //
    //    rsp += numOfTotalCols * sizeof(SSchema);
    //
    //    int32_t  tagLen = 0;
    //    SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
    //
    //    if (pMeta->tableType == TSDB_CHILD_TABLE) {
    //      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
    //        tagLen += pTagsSchema[j].bytes;
    //      }
    //    }
    //
    //    rsp += tagLen;
    //    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
    //
    //    pMeta->index = 0;
H
hjxilinx 已提交
1831
    //    (void)taosCachePut(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1832
    //  }
S
slguan 已提交
1833
  }
H
hjxilinx 已提交
1834
  
S
slguan 已提交
1835 1836
  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
1837
  tscDebug("%p load multi-metermeta resp from complete num:%d", pSql, pSql->res.numOfTotal);
1838 1839
#endif
  
S
slguan 已提交
1840 1841 1842
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
1843
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
1844
  SSqlRes* pRes = &pSql->res;
H
hjxilinx 已提交
1845
  
H
hjxilinx 已提交
1846
  // NOTE: the order of several table must be preserved.
1847
  SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pRes->pRsp;
H
hjxilinx 已提交
1848 1849
  pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables);
  char* pMsg = pRes->pRsp + sizeof(SCMSTableVgroupRspMsg);
H
hjxilinx 已提交
1850
  
1851 1852 1853
  // master sqlObj locates in param
  SSqlObj* parent = pSql->param;
  assert(parent != NULL);
H
hjxilinx 已提交
1854
  
1855
  SSqlCmd* pCmd = &parent->cmd;
H
hjxilinx 已提交
1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866
  for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) {
    STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
    SVgroupsInfo *  pVgroupInfo = (SVgroupsInfo *)pMsg;
    pVgroupInfo->numOfVgroups = htonl(pVgroupInfo->numOfVgroups);

    size_t size = sizeof(SCMVgroupInfo) * pVgroupInfo->numOfVgroups + sizeof(SVgroupsInfo);
    pInfo->vgroupList = calloc(1, size);
    assert(pInfo->vgroupList != NULL);

    memcpy(pInfo->vgroupList, pVgroupInfo, size);
    for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
dengyihao's avatar
dengyihao 已提交
1867
      //just init, no need to lock
H
hjxilinx 已提交
1868 1869
      SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
      pVgroups->vgId = htonl(pVgroups->vgId);
1870
      assert(pVgroups->numOfEps >= 1);
H
hjxilinx 已提交
1871

1872 1873
      for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
        pVgroups->epAddr[k].port = htons(pVgroups->epAddr[k].port);
H
hjxilinx 已提交
1874
      }
1875
    }
1876 1877

    pMsg += size;
H
hjxilinx 已提交
1878 1879
  }
  
S
slguan 已提交
1880
  return pSql->res.code;
H
hzcheng 已提交
1881 1882 1883 1884 1885 1886
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
H
hjxilinx 已提交
1887
  STableMetaMsg * pMetaMsg;
1888
  SCMShowRsp *pShow;
S
slguan 已提交
1889
  SSchema *    pSchema;
H
hzcheng 已提交
1890 1891
  char         key[20];

1892 1893 1894
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
1895
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
1896

H
hjxilinx 已提交
1897
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1898

1899
  pShow = (SCMShowRsp *)pRes->pRsp;
S
slguan 已提交
1900
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
1901 1902
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
1903
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
1904
  pMetaMsg = &(pShow->tableMeta);
H
hzcheng 已提交
1905

H
hjxilinx 已提交
1906
  pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1907

H
hjxilinx 已提交
1908
  pSchema = pMetaMsg->schema;
H
hjxilinx 已提交
1909 1910
  pMetaMsg->sid = ntohs(pMetaMsg->sid);
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
hzcheng 已提交
1911 1912 1913 1914
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

1915
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
1916 1917
  strcpy(key + 1, "showlist");

H
hjxilinx 已提交
1918
  taosCacheRelease(tscCacheHandle, (void *)&(pTableMetaInfo->pTableMeta), false);
H
hjxilinx 已提交
1919
  
H
hjxilinx 已提交
1920 1921 1922
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
  
H
Haojun Liao 已提交
1923 1924
  pTableMetaInfo->pTableMeta = taosCachePut(tscCacheHandle, key, strlen(key), (char *)pTableMeta, size,
      tsTableMetaKeepTimer);
H
hjxilinx 已提交
1925
  SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
1926

1927 1928 1929 1930
  if (pQueryInfo->colList == NULL) {
    pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
  }
  
H
hjxilinx 已提交
1931 1932
  SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
  
S
slguan 已提交
1933
  SColumnIndex index = {0};
H
hjxilinx 已提交
1934 1935 1936
  pSchema = pMetaMsg->schema;
  
  for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) {
S
slguan 已提交
1937
    index.columnIndex = i;
1938 1939
    tscColumnListInsert(pQueryInfo->colList, &index);
    
H
hjxilinx 已提交
1940 1941
    TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes);
    SFieldSupInfo* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
H
hjxilinx 已提交
1942
    
H
hjxilinx 已提交
1943
    pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
1944
                     pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false);
H
hzcheng 已提交
1945
  }
H
hjxilinx 已提交
1946 1947
  
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
1948
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
1949
  
S
Shengliang Guan 已提交
1950
  taosTFree(pTableMeta);
H
hzcheng 已提交
1951 1952 1953
  return 0;
}

1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982
static void createHBObj(STscObj* pObj) {
  if (pObj->pHb != NULL) {
    return;
  }

  SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
  if (NULL == pSql) return;

  pSql->fp = tscProcessHeartBeatRsp;

  SQueryInfo *pQueryInfo = NULL;
  tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
  pQueryInfo->command = TSDB_SQL_HB;

  pSql->cmd.command = pQueryInfo->command;
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
    taosTFree(pSql);
    return;
  }

  pSql->param = pObj;
  pSql->pTscObj = pObj;
  pSql->signature = pSql;
  pObj->pHb = pSql;
  tscAddSubqueryInfo(&pObj->pHb->cmd);

  tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj);
}

H
hzcheng 已提交
1983
int tscProcessConnectRsp(SSqlObj *pSql) {
H
Haojun Liao 已提交
1984
  char temp[TSDB_TABLE_FNAME_LEN * 2];
H
hzcheng 已提交
1985 1986 1987
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

1988
  SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
B
Bomin Zhang 已提交
1989
  tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId));  // copy acctId from response
1990 1991
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

B
Bomin Zhang 已提交
1992 1993
  assert(len <= sizeof(pObj->db));
  tstrncpy(pObj->db, temp, sizeof(pObj->db));
H
hjxilinx 已提交
1994
  
dengyihao's avatar
dengyihao 已提交
1995 1996 1997
  if (pConnect->epSet.numOfEps > 0) {
    tscEpSetHtons(&pConnect->epSet);
    tscUpdateMgmtEpSet(&pConnect->epSet);
dengyihao's avatar
dengyihao 已提交
1998
  } 
H
hzcheng 已提交
1999

S
slguan 已提交
2000
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
2001 2002
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
S
Shengliang Guan 已提交
2003
  pObj->connId = htonl(pConnect->connId);
2004 2005 2006

  createHBObj(pObj);

S
scripts  
slguan 已提交
2007
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
H
hzcheng 已提交
2008 2009 2010 2011 2012

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
2013
  STscObj *       pObj = pSql->pTscObj;
2014
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2015

B
Bomin Zhang 已提交
2016
  tstrncpy(pObj->db, pTableMetaInfo->name, sizeof(pObj->db));
H
hzcheng 已提交
2017 2018 2019
  return 0;
}

S
Shengliang Guan 已提交
2020
int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
2021
  taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2022 2023 2024 2025
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
2026
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2027

H
Haojun Liao 已提交
2028 2029
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2030 2031 2032 2033 2034 2035 2036
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
2037 2038
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
2039
   */
2040
  tscDebug("%p force release table meta after drop table:%s", pSql, pTableMetaInfo->name);
H
hjxilinx 已提交
2041
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2042

H
hjxilinx 已提交
2043 2044
  if (pTableMetaInfo->pTableMeta) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2045 2046 2047 2048 2049 2050
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
2051
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2052

H
Haojun Liao 已提交
2053
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2054
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2055 2056 2057
    return 0;
  }

2058
  tscDebug("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
H
hjxilinx 已提交
2059
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2060

H
hjxilinx 已提交
2061
  if (pTableMetaInfo->pTableMeta) {
weixin_48148422's avatar
weixin_48148422 已提交
2062
    bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
H
hjxilinx 已提交
2063
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2064

2065
    if (isSuperTable) {  // if it is a super table, reset whole query cache
2066
      tscDebug("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
2067
      taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

2082
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
2083 2084 2085
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
2086
  pRes->data = NULL;
S
slguan 已提交
2087
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2088 2089 2090
  return 0;
}

H
hjxilinx 已提交
2091
int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
S
slguan 已提交
2092 2093 2094
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
2095
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2096 2097 2098

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
2099 2100
  pRes->offset    = htobe64(pRetrieve->offset);
  pRes->useconds  = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
2101
  pRes->completed = (pRetrieve->completed == 1);
2102
  pRes->data      = pRetrieve->data;
H
hjxilinx 已提交
2103
  
2104
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2105 2106 2107 2108
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
    return pRes->code;
  }
  
weixin_48148422's avatar
weixin_48148422 已提交
2109
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
2110
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2111
    
H
hjxilinx 已提交
2112
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
H
hjxilinx 已提交
2113 2114
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
2115 2116
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
2117
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
2118
    p += sizeof(int32_t);
S
slguan 已提交
2119
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
2120 2121
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
weixin_48148422's avatar
weixin_48148422 已提交
2122
      p += sizeof(int32_t); // skip tid
weixin_48148422's avatar
weixin_48148422 已提交
2123 2124
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
2125
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
2126
    }
2127 2128
  }

H
hzcheng 已提交
2129
  pRes->row = 0;
2130
  tscDebug("%p numOfRows:%" PRId64 ", offset:%" PRId64 ", complete:%d", pSql, pRes->numOfRows, pRes->offset, pRes->completed);
H
hzcheng 已提交
2131 2132 2133 2134

  return 0;
}

H
hjxilinx 已提交
2135
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
H
hzcheng 已提交
2136

2137
static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
S
slguan 已提交
2138 2139
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
2140
    tscError("%p malloc failed for new sqlobj to get table meta", pSql);
2141
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2142
  }
2143

H
hzcheng 已提交
2144 2145 2146
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
2147

2148
  tscAddSubqueryInfo(&pNew->cmd);
2149 2150 2151 2152

  SQueryInfo *pNewQueryInfo = NULL;
  tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo);

H
hjxilinx 已提交
2153
  pNew->cmd.autoCreated = pSql->cmd.autoCreated;  // create table if not exists
B
Bomin Zhang 已提交
2154
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) {
2155
    tscError("%p malloc failed for payload to get table meta", pSql);
S
slguan 已提交
2156
    free(pNew);
2157

2158
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2159 2160
  }

H
hjxilinx 已提交
2161
  STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
2162
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
2163

B
Bomin Zhang 已提交
2164
  tstrncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, sizeof(pNewMeterMetaInfo->name));
B
Bomin Zhang 已提交
2165 2166
  memcpy(pNew->cmd.payload, pSql->cmd.payload, pSql->cmd.payloadLen);  // tag information if table does not exists.
  pNew->cmd.payloadLen = pSql->cmd.payloadLen;
2167
  tscDebug("%p new pSqlObj:%p to get tableMeta, auto create:%d", pSql, pNew, pNew->cmd.autoCreated);
H
hzcheng 已提交
2168

H
hjxilinx 已提交
2169 2170
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
H
hzcheng 已提交
2171

H
hjxilinx 已提交
2172 2173
  int32_t code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2174
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;  // notify upper application that current process need to be terminated
H
hzcheng 已提交
2175 2176 2177 2178 2179
  }

  return code;
}

H
hjxilinx 已提交
2180
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
H
hjxilinx 已提交
2181
  assert(strlen(pTableMetaInfo->name) != 0);
S
slguan 已提交
2182

2183
  // If this STableMetaInfo owns a table meta, release it first
H
hjxilinx 已提交
2184 2185
  if (pTableMetaInfo->pTableMeta != NULL) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), false);
2186 2187
  }
  
H
Haojun Liao 已提交
2188
  pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2189
  if (pTableMetaInfo->pTableMeta != NULL) {
H
hjxilinx 已提交
2190
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
2191
    tscDebug("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
2192
             tinfo.numOfTags, pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2193 2194 2195

    return TSDB_CODE_SUCCESS;
  }
2196 2197
  
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2198 2199
}

H
hjxilinx 已提交
2200
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
H
hjxilinx 已提交
2201
  pSql->cmd.autoCreated = createIfNotExists;
H
hjxilinx 已提交
2202
  return tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
2203 2204 2205
}

/**
H
Haojun Liao 已提交
2206
 * retrieve table meta from mnode, and update the local table meta cache.
H
hzcheng 已提交
2207
 * @param pSql          sql object
H
Haojun Liao 已提交
2208
 * @param tableId       table full name
H
hzcheng 已提交
2209 2210
 * @return              status code
 */
H
Haojun Liao 已提交
2211
int tscRenewTableMeta(SSqlObj *pSql, char *tableId) {
H
hzcheng 已提交
2212
  SSqlCmd *pCmd = &pSql->cmd;
2213 2214

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
2215
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
2216

H
Haojun Liao 已提交
2217 2218
  STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
  if (pTableMetaInfo->pTableMeta) {
2219
    tscDebug("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
2220
             tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid, pTableMeta);
H
hzcheng 已提交
2221 2222
  }

H
Haojun Liao 已提交
2223 2224
  taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2225 2226
}

H
hjxilinx 已提交
2227
static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
2228
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2229
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2230
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2231 2232
    if (pTableMetaInfo->vgroupList == NULL) {
      return false;
S
slguan 已提交
2233 2234
    }
  }
H
hjxilinx 已提交
2235 2236 2237 2238
  
  // all super tables vgroupinfo are retrieved, no need to retrieve vgroup info anymore
  return true;
}
H
hzcheng 已提交
2239

H
hjxilinx 已提交
2240
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
2241
  int      code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hjxilinx 已提交
2242 2243 2244
  SSqlCmd *pCmd = &pSql->cmd;
  
  if (allVgroupInfoRetrieved(pCmd, clauseIndex)) {
H
hzcheng 已提交
2245 2246
    return TSDB_CODE_SUCCESS;
  }
H
hjxilinx 已提交
2247

S
slguan 已提交
2248
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
2249 2250 2251
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

H
hjxilinx 已提交
2252
  pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
2253 2254
  
  SQueryInfo *pNewQueryInfo = NULL;
2255
  if ((code = tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo)) != TSDB_CODE_SUCCESS) {
B
Bomin Zhang 已提交
2256
    tscFreeSqlObj(pNew);
2257 2258
    return code;
  }
2259
  
H
hjxilinx 已提交
2260
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2261
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2262 2263 2264
    STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
    STableMeta *pTableMeta = taosCacheAcquireByData(tscCacheHandle, pMInfo->pTableMeta);
    tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList);
S
slguan 已提交
2265 2266 2267 2268 2269 2270
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
2271

2272
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
2273
  tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
H
hzcheng 已提交
2274

2275 2276 2277 2278
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
  code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
2279
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2280 2281 2282 2283 2284
  }

  return code;
}

2285
void tscInitMsgsFp() {
S
slguan 已提交
2286 2287
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
J
jtao1735 已提交
2288
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;
H
hzcheng 已提交
2289 2290

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
2291
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
2292

2293 2294
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
2295 2296

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
S
[TD-16]  
slguan 已提交
2297
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserMsg;
H
hzcheng 已提交
2298 2299 2300
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
2301
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
2302 2303 2304
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
2305
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
2306
  tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
H
hzcheng 已提交
2307 2308 2309 2310
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
H
hjxilinx 已提交
2311
  tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
H
hjxilinx 已提交
2312
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
S
slguan 已提交
2313
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
2314 2315 2316 2317

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
2318 2319 2320
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
2321 2322

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
H
hjxilinx 已提交
2323
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
H
hzcheng 已提交
2324 2325 2326 2327 2328

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
2329
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
H
hjxilinx 已提交
2330
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
S
slguan 已提交
2331
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
2332 2333

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
H
hjxilinx 已提交
2334
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode;  // rsp handled by same function.
H
hzcheng 已提交
2335
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
2336

H
Haojun Liao 已提交
2337 2338 2339 2340 2341
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB]   = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION]  = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS]  = tscProcessLocalRetrieveRsp;
2342

H
hzcheng 已提交
2343 2344
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

H
hjxilinx 已提交
2345
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp;
H
hzcheng 已提交
2346 2347 2348 2349 2350 2351 2352 2353 2354 2355

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;
}