tscServer.c 78.7 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17
#include "tcache.h"
H
Haojun Liao 已提交
18
#include "tcmdtype.h"
H
hzcheng 已提交
19
#include "trpc.h"
20 21
#include "tscLocalMerge.h"
#include "tscLog.h"
H
hzcheng 已提交
22 23 24 25 26 27
#include "tscProfile.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "ttimer.h"
#include "tutil.h"
dengyihao's avatar
dengyihao 已提交
28
#include "tlockfree.h"
H
hzcheng 已提交
29 30 31

#define TSC_MGMT_VNODE 999

32
SRpcCorEpSet  tscMgmtEpSet;
33
SRpcEpSet  tscDnodeEpSet;
S
slguan 已提交
34

35 36
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
37 38 39
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
40

B
Bomin Zhang 已提交
41
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt);
42 43
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
44

S
slguan 已提交
45
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
hzcheng 已提交
46

47
static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
48 49
  assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);

50
  SRpcEpSet* pEpSet = &pSql->epSet;
51 52 53 54
  pEpSet->inUse = 0;

  // apply the FQDN string length check here
  bool hasFqdn = false;
dengyihao's avatar
bugfix  
dengyihao 已提交
55

56 57 58 59
  pEpSet->numOfEps = pVgroupInfo->numOfEps;
  for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
    strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn);
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
60 61 62 63

    if (!hasFqdn) {
      hasFqdn = (strlen(pEpSet->fqdn[i]) > 0);
    }
64
  }
65 66

  assert(hasFqdn);
67
}
68

dengyihao's avatar
dengyihao 已提交
69 70 71 72
static void tscDumpMgmtEpSet(SRpcEpSet *epSet) {
  taosCorBeginRead(&tscMgmtEpSet.version);
  *epSet = tscMgmtEpSet.epSet;
  taosCorEndRead(&tscMgmtEpSet.version);
73
}  
dengyihao's avatar
dengyihao 已提交
74 75
static void tscEpSetHtons(SRpcEpSet *s) {
   for (int32_t i = 0; i < s->numOfEps; i++) {
dengyihao's avatar
fixbug  
dengyihao 已提交
76 77 78
      s->port[i] = htons(s->port[i]);    
   }
} 
dengyihao's avatar
dengyihao 已提交
79 80
bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
   if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
81 82
     return false;
   } 
dengyihao's avatar
dengyihao 已提交
83
   for (int32_t i = 0; i < s1->numOfEps; i++) {
84 85 86 87 88
     if (s1->port[i] != s2->port[i] 
        || strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0)
        return false;
   }
   return true;
S
slguan 已提交
89
}
dengyihao's avatar
dengyihao 已提交
90
void tscUpdateMgmtEpSet(SRpcEpSet *pEpSet) {
91
  // no need to update if equal
dengyihao's avatar
dengyihao 已提交
92 93 94
  taosCorBeginWrite(&tscMgmtEpSet.version);
  tscMgmtEpSet.epSet = *pEpSet;
  taosCorEndWrite(&tscMgmtEpSet.version);
95
}
dengyihao's avatar
dengyihao 已提交
96
static void tscDumpEpSetFromVgroupInfo(SCMCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
97
  if (pVgroupInfo == NULL) { return;}
dengyihao's avatar
bugfix  
dengyihao 已提交
98
  taosCorBeginRead(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
99
  int8_t inUse = pVgroupInfo->inUse;
dengyihao's avatar
dengyihao 已提交
100 101 102 103 104
  pEpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0; 
  pEpSet->numOfEps = pVgroupInfo->numOfEps;  
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
    strncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, TSDB_FQDN_LEN);
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
S
slguan 已提交
105
  }
dengyihao's avatar
bugfix  
dengyihao 已提交
106
  taosCorEndRead(&pVgroupInfo->version);
S
slguan 已提交
107 108
}

dengyihao's avatar
dengyihao 已提交
109
static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
110 111
  SSqlCmd *pCmd = &pObj->cmd;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
dengyihao's avatar
dengyihao 已提交
112
  if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;}
dengyihao's avatar
bugfix  
dengyihao 已提交
113
  SCMCorVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->corVgroupInfo;
dengyihao's avatar
dengyihao 已提交
114

dengyihao's avatar
bugfix  
dengyihao 已提交
115
  taosCorBeginWrite(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
116
  tscDebug("before: Endpoint in use: %d", pVgroupInfo->inUse);
117 118
  pVgroupInfo->inUse = pEpSet->inUse;
  pVgroupInfo->numOfEps = pEpSet->numOfEps;
dengyihao's avatar
dengyihao 已提交
119
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; i++) {
120 121
    strncpy(pVgroupInfo->epAddr[i].fqdn, pEpSet->fqdn[i], TSDB_FQDN_LEN);
    pVgroupInfo->epAddr[i].port = pEpSet->port[i];
dengyihao's avatar
dengyihao 已提交
122
  }
dengyihao's avatar
dengyihao 已提交
123
  tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse);
dengyihao's avatar
bugfix  
dengyihao 已提交
124
  taosCorEndWrite(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
125
}
126 127 128 129
void tscPrintMgmtEp() {
  SRpcEpSet dump;
  tscDumpMgmtEpSet(&dump);
  if (dump.numOfEps <= 0) {
dengyihao's avatar
dengyihao 已提交
130
    tscError("invalid mnode EP list:%d", dump.numOfEps);
S
slguan 已提交
131
  } else {
132
    for (int i = 0; i < dump.numOfEps; ++i) {
133
      tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]);
S
slguan 已提交
134
    }
S
slguan 已提交
135
  }
S
slguan 已提交
136 137
}

H
hzcheng 已提交
138 139 140 141 142 143 144 145 146 147 148 149
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
  if (pObj != pObj->signature) {
    tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
    return;
  }

  SSqlObj *pSql = pObj->pHb;
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
150
    SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
dengyihao's avatar
dengyihao 已提交
151 152 153 154
    SRpcEpSet *      epSet = &pRsp->epSet;
    if (epSet->numOfEps > 0) {
      tscEpSetHtons(epSet);
      tscUpdateMgmtEpSet(epSet);
dengyihao's avatar
dengyihao 已提交
155
    } 
S
slguan 已提交
156

S
Shengliang Guan 已提交
157 158
    pSql->pTscObj->connId = htonl(pRsp->connId);

H
hzcheng 已提交
159 160 161
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
    } else {
S
slguan 已提交
162 163
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
164 165
    }
  } else {
166
    tscDebug("heart beat failed, code:%s", tstrerror(code));
H
hzcheng 已提交
167 168 169 170 171 172 173 174 175 176 177 178 179
  }

  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;

  if (pObj == NULL) return;
  if (pObj->signature != pObj) return;
  if (pObj->pTimer != tmrId) return;

  if (pObj->pHb == NULL) {
S
slguan 已提交
180 181 182
    SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
    if (NULL == pSql) return;

H
hzcheng 已提交
183
    pSql->fp = tscProcessHeartBeatRsp;
H
hjxilinx 已提交
184
    
185 186 187 188
    SQueryInfo *pQueryInfo = NULL;
    tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
    pQueryInfo->command = TSDB_SQL_HB;
    
189
    pSql->cmd.command = TSDB_SQL_HB;
S
slguan 已提交
190
    if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
S
Shengliang Guan 已提交
191
      taosTFree(pSql);
S
slguan 已提交
192 193 194
      return;
    }

195
    pSql->cmd.command = TSDB_SQL_HB;
H
hzcheng 已提交
196 197 198 199
    pSql->param = pObj;
    pSql->pTscObj = pObj;
    pSql->signature = pSql;
    pObj->pHb = pSql;
200
    tscAddSubqueryInfo(&pObj->pHb->cmd);
201

202
    tscDebug("%p pHb is allocated, pObj:%p", pObj->pHb, pObj);
H
hzcheng 已提交
203 204 205
  }

  if (tscShouldFreeHeatBeat(pObj->pHb)) {
206
    tscDebug("%p free HB object and release connection", pObj);
H
hzcheng 已提交
207 208 209 210 211 212 213 214 215
    tscFreeSqlObj(pObj->pHb);
    tscCloseTscObj(pObj);
    return;
  }

  tscProcessSql(pObj->pHb);
}

int tscSendMsgToServer(SSqlObj *pSql) {
216
  STscObj* pObj = pSql->pTscObj;
H
hjxilinx 已提交
217 218 219
  SSqlCmd* pCmd = &pSql->cmd;
  
  char *pMsg = rpcMallocCont(pCmd->payloadLen);
S
slguan 已提交
220
  if (NULL == pMsg) {
221
    tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]);
222
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
223 224
  }

225 226
  // set the mgmt ip list
  if (pSql->cmd.command >= TSDB_SQL_MGMT) {
dengyihao's avatar
dengyihao 已提交
227
    tscDumpMgmtEpSet(&pSql->epSet);
J
jtao1735 已提交
228 229
  }

230
  memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
231

J
jtao1735 已提交
232
  SRpcMsg rpcMsg = {
233 234 235
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
      .contLen = pSql->cmd.payloadLen,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
236 237
      .ahandle = pSql,
      .handle  = &pSql->pRpcCtx,
H
hjxilinx 已提交
238
      .code    = 0
J
jtao1735 已提交
239
  };
H
hzcheng 已提交
240

H
Haojun Liao 已提交
241 242 243 244
  // NOTE: the rpc context should be acquired before sending data to server.
  // Otherwise, the pSql object may have been released already during the response function, which is
  // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
  // cause crash.
245
  rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg);
S
slguan 已提交
246
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
247 248
}

249
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
250
  SSqlObj *pSql = (SSqlObj *)rpcMsg->ahandle;
H
Haojun Liao 已提交
251
  if (pSql == NULL || pSql->signature != pSql) {
B
Bomin Zhang 已提交
252
    tscError("%p sql is already released", pSql);
253 254
    return;
  }
255

H
Haojun Liao 已提交
256
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
257 258
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
259

H
Haojun Liao 已提交
260
  if (pObj->signature != pObj) {
261
    tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
H
Haojun Liao 已提交
262 263 264 265 266 267 268 269

    tscFreeSqlObj(pSql);
    rpcFreeCont(rpcMsg->pCont);
    return;
  }

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
270
    tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
271
        pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
H
Haojun Liao 已提交
272

H
Haojun Liao 已提交
273
    tscFreeSqlObj(pSql);
274
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
275
    return;
H
hzcheng 已提交
276 277
  }

278
  if (pEpSet) { 
dengyihao's avatar
dengyihao 已提交
279
    if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
dengyihao's avatar
fixbug  
dengyihao 已提交
280
      if(pCmd->command < TSDB_SQL_MGMT)  { 
281
        tscUpdateVgroupInfo(pSql, pEpSet); 
dengyihao's avatar
dengyihao 已提交
282
      } else {
dengyihao's avatar
dengyihao 已提交
283
        tscUpdateMgmtEpSet(pEpSet);
dengyihao's avatar
fixbug  
dengyihao 已提交
284
    }
dengyihao's avatar
dengyihao 已提交
285
    }
J
jtao1735 已提交
286 287
  }

288 289 290 291 292 293 294 295 296 297 298 299 300 301
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);

  int32_t cmd = pCmd->command;
  if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
      (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
       rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
       rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL ||
       rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) {
    tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);

    // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
    if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
      pSql->cmd.submitSchema = 1;
    }
302

303 304 305 306 307 308 309 310 311 312
    pSql->res.code = rpcMsg->code;  // keep the previous error code
    if (pSql->retry > pSql->maxRetry) {
      tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
    } else {
      rpcMsg->code = tscRenewTableMeta(pSql, pTableMetaInfo->name);

      // if there is an error occurring, proceed to the following error handling procedure.
      if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
        rpcFreeCont(rpcMsg->pCont);
        return;
H
hzcheng 已提交
313 314
      }
    }
S
slguan 已提交
315
  }
316

H
hzcheng 已提交
317
  pRes->rspLen = 0;
318
  
319 320
  if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
    pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hzcheng 已提交
321
  } else {
322
    tscDebug("%p query is cancelled, code:%s", pSql, tstrerror(pRes->code));
H
hzcheng 已提交
323 324
  }

S
slguan 已提交
325
  if (pRes->code == TSDB_CODE_SUCCESS) {
326
    tscDebug("%p reset retry counter to be 0 due to success rsp, old:%d", pSql, pSql->retry);
S
slguan 已提交
327 328 329
    pSql->retry = 0;
  }

330
  if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
331
    assert(rpcMsg->msgType == pCmd->msgType + 1);
332
    pRes->code    = rpcMsg->code;
333
    pRes->rspType = rpcMsg->msgType;
334
    pRes->rspLen  = rpcMsg->contLen;
H
hzcheng 已提交
335

336
    if (pRes->rspLen > 0 && rpcMsg->pCont) {
337 338
      char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
      if (tmp == NULL) {
339
        pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
340 341
      } else {
        pRes->pRsp = tmp;
342
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
343
      }
344 345
    } else {
      pRes->pRsp = NULL;
S
slguan 已提交
346 347
    }

H
hzcheng 已提交
348 349 350 351
    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
352
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
353
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
354 355 356 357 358 359 360
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
361
      tscDebug("%p SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql, sqlCmd[pCmd->command], 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
362
          tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
S
slguan 已提交
363
    } else {
364
      tscDebug("%p SQL cmd:%s, code:%s rspLen:%d", pSql, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
H
hzcheng 已提交
365 366
    }
  }
367
  
H
Haojun Liao 已提交
368
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
369
    rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
Haojun Liao 已提交
370
  }
S
Shengliang Guan 已提交
371

372
  if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
H
Haojun Liao 已提交
373
    rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS)? pRes->numOfRows: pRes->code;
374
    
H
hjxilinx 已提交
375
    bool shouldFree = tscShouldBeFreed(pSql);
H
Haojun Liao 已提交
376
    (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
H
hzcheng 已提交
377

378
    if (shouldFree) {
379
      tscDebug("%p sqlObj is automatically freed", pSql);
sangshuduo's avatar
sangshuduo 已提交
380
      tscFreeSqlObj(pSql);
H
hzcheng 已提交
381 382 383
    }
  }

384
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
385 386
}

S
slguan 已提交
387 388 389
int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
390

H
hjxilinx 已提交
391 392 393 394 395 396 397
  if (pCmd->command == TSDB_SQL_SELECT ||
      pCmd->command == TSDB_SQL_FETCH ||
      pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT ||
      pCmd->command == TSDB_SQL_CONNECT ||
      pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META ||
H
hjxilinx 已提交
398
      pCmd->command == TSDB_SQL_STABLEVGROUP) {
399 400 401 402 403 404
    pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
  }
  
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return pRes->code;
S
slguan 已提交
405
  }
406

407 408 409
  int32_t code = tscSendMsgToServer(pSql);

  // NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads.
410
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
411
    pRes->code = code;
H
hjxilinx 已提交
412
    tscQueueAsyncRes(pSql);
413
    return pRes->code;
S
slguan 已提交
414
  }
H
hjxilinx 已提交
415 416
  
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
417 418 419
}

int tscProcessSql(SSqlObj *pSql) {
420
  char    *name = NULL;
421
  SSqlCmd *pCmd = &pSql->cmd;
422
  
423
  SQueryInfo     *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
424
  STableMetaInfo *pTableMetaInfo = NULL;
425
  uint32_t        type = 0;
426

427
  if (pQueryInfo != NULL) {
H
hjxilinx 已提交
428
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
429
    name = (pTableMetaInfo != NULL)? pTableMetaInfo->name:NULL;
430
    type = pQueryInfo->type;
431

H
hjxilinx 已提交
432
    // while numOfTables equals to 0, it must be Heartbeat
H
hjxilinx 已提交
433
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
434
  }
435

436
  tscDebug("%p SQL cmd:%s will be processed, name:%s, type:%d", pSql, sqlCmd[pCmd->command], name, type);
H
hjxilinx 已提交
437
  if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL
H
hjxilinx 已提交
438
    if (pTableMetaInfo == NULL) {
439
      pSql->res.code = TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
440 441
      return pSql->res.code;
    }
H
hjxilinx 已提交
442
  } else if (pCmd->command < TSDB_SQL_LOCAL) {
443
    //pSql->epSet = tscMgmtEpSet;
H
hzcheng 已提交
444 445 446
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }
447
  
S
slguan 已提交
448 449
  return doProcessSql(pSql);
}
H
hzcheng 已提交
450

H
hjxilinx 已提交
451
void tscKillSTableQuery(SSqlObj *pSql) {
452 453 454
  SSqlCmd* pCmd = &pSql->cmd;
  
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
455
  if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
H
hzcheng 已提交
456 457 458 459 460
    return;
  }

  for (int i = 0; i < pSql->numOfSubs; ++i) {
    SSqlObj *pSub = pSql->pSubs[i];
S
slguan 已提交
461
    if (pSub == NULL) {
H
hzcheng 已提交
462 463
      continue;
    }
S
slguan 已提交
464

H
hzcheng 已提交
465 466
    /*
     * here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
H
Haojun Liao 已提交
467
     * sub-queries not correctly released and master sql object of super table query reaches an abnormal state.
H
hzcheng 已提交
468
     */
dengyihao's avatar
dengyihao 已提交
469 470 471
    rpcCancelRequest(pSub->pRpcCtx);
    pSub->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
    tscQueueAsyncRes(pSub);
H
hzcheng 已提交
472 473 474 475 476
  }

  /*
   * 1. if the subqueries are not launched or partially launched, we need to waiting the launched
   * query return to successfully free allocated resources.
477
   * 2. if no any subqueries are launched yet, which means the super table query only in parse sql stage,
H
hzcheng 已提交
478 479 480 481 482
   * set the res.code, and return.
   */
  const int64_t MAX_WAITING_TIME = 10000;  // 10 Sec.
  int64_t       stime = taosGetTimestampMs();

H
hjxilinx 已提交
483
  while (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
H
hzcheng 已提交
484 485 486 487 488 489
    taosMsleep(100);
    if (taosGetTimestampMs() - stime > MAX_WAITING_TIME) {
      break;
    }
  }

490
  tscDebug("%p super table query cancelled", pSql);
H
hzcheng 已提交
491 492
}

J
jtao1735 已提交
493
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
494
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
S
slguan 已提交
495
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
S
slguan 已提交
496

H
Haojun Liao 已提交
497
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
S
slguan 已提交
498
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
499

500
  // todo valid the vgroupId at the client side
H
hjxilinx 已提交
501 502
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  
weixin_48148422's avatar
weixin_48148422 已提交
503
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
504
    int32_t vgIndex = pTableMetaInfo->vgroupIndex;
505
    
H
hjxilinx 已提交
506
    SVgroupsInfo* pVgroupInfo = pTableMetaInfo->vgroupList;
H
Haojun Liao 已提交
507 508
    assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);

H
hjxilinx 已提交
509
    pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
510
  } else {
H
hjxilinx 已提交
511
    STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
512
    pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
513
  }
514 515

  pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg);
J
jtao1735 已提交
516
  pSql->cmd.msgType = TSDB_MSG_TYPE_FETCH;
517 518 519

  pRetrieveMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));

520
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
521 522
}

523
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hjxilinx 已提交
524
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
525
  STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
H
hjxilinx 已提交
526
  
527
  char* pMsg = pSql->cmd.payload;
528 529 530
  
  // NOTE: shell message size should not include SMsgDesc
  int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
531 532
  int32_t vgId = pTableMeta->vgroupInfo.vgId;

533
  SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
534 535
  pMsgDesc->numOfVnodes = htonl(1); // always one vnode

536
  pMsg += sizeof(SMsgDesc);
537
  SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
538

H
hjxilinx 已提交
539
  pShellMsg->header.vgId = htonl(vgId);
H
Haojun Liao 已提交
540
  pShellMsg->header.contLen = htonl(size);      // the length not includes the size of SMsgDesc
541
  pShellMsg->length = pShellMsg->header.contLen;
H
hjxilinx 已提交
542
  
H
Haojun Liao 已提交
543
  pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit);  // number of tables to be inserted
H
hzcheng 已提交
544

H
hjxilinx 已提交
545
  // pSql->cmd.payloadLen is set during copying data into payload
S
slguan 已提交
546
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
dengyihao's avatar
dengyihao 已提交
547
  tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet);
548

549 550
  tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
      pSql->epSet.numOfEps);
551
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
552 553 554
}

/*
555
 * for table query, simply return the size <= 1k
H
hzcheng 已提交
556
 */
557
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
558
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
559
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
560

561
  int32_t srcColListSize = taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo);
H
hjxilinx 已提交
562 563 564 565
  
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
  int32_t exprSize = sizeof(SSqlFuncMsg) * numOfExprs;
  
566
  return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + 4096;
H
hzcheng 已提交
567 568
}

569
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
570
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
B
Bomin Zhang 已提交
571
  TSKEY dfltKey = htobe64(pQueryMsg->window.skey);
572

H
hjxilinx 已提交
573
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
weixin_48148422's avatar
weixin_48148422 已提交
574
  if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
575 576
    
    SCMVgroupInfo* pVgroupInfo = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
577
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
578 579
      int32_t index = pTableMetaInfo->vgroupIndex;
      assert(index >= 0);
H
hjxilinx 已提交
580
  
B
Bomin Zhang 已提交
581
      if (pTableMetaInfo->vgroupList->numOfVgroups > 0) {
H
Haojun Liao 已提交
582
        assert(index < pTableMetaInfo->vgroupList->numOfVgroups);
B
Bomin Zhang 已提交
583 584
        pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
      }
585
      tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups);
H
hjxilinx 已提交
586 587
    } else {
      pVgroupInfo = &pTableMeta->vgroupInfo;
588
    }
weixin_48148422's avatar
weixin_48148422 已提交
589

590 591 592 593
    assert(pVgroupInfo != NULL);

    tscSetDnodeEpSet(pSql, pVgroupInfo);
    pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
weixin_48148422's avatar
weixin_48148422 已提交
594

595
    STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
596 597 598
    pTableIdInfo->tid = htonl(pTableMeta->id.tid);
    pTableIdInfo->uid = htobe64(pTableMeta->id.uid);
    pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->id.uid, dfltKey));
weixin_48148422's avatar
weixin_48148422 已提交
599

600 601
    pQueryMsg->numOfTables = htonl(1);  // set the number of tables
    pMsg += sizeof(STableIdInfo);
602
  } else { // it is a subquery of the super table query, this EP info is acquired from vgroupInfo
603 604 605
    int32_t index = pTableMetaInfo->vgroupIndex;
    int32_t numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables);
    assert(index >= 0 && index < numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
606

607
    tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
608

609
    SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
610

dengyihao's avatar
bugfix  
dengyihao 已提交
611
    // set the vgroup info 
612
    tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo);
613 614 615 616 617 618 619 620 621 622 623 624
    pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
    
    int32_t numOfTables = taosArrayGetSize(pTableIdList->itemList);
    pQueryMsg->numOfTables = htonl(numOfTables);  // set the number of tables
  
    // serialize each table id info
    for(int32_t i = 0; i < numOfTables; ++i) {
      STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i);
      
      STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
      pTableIdInfo->tid = htonl(pItem->tid);
      pTableIdInfo->uid = htobe64(pItem->uid);
B
Bomin Zhang 已提交
625
      pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pItem->uid, dfltKey));
626 627 628 629
      pMsg += sizeof(STableIdInfo);
    }
  }
  
630
  tscDebug("%p vgId:%d, query on table:%s, tid:%d, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name,
631
      pTableMeta->id.tid, pTableMeta->id.uid);
H
hjxilinx 已提交
632
  
633 634 635
  return pMsg;
}

636
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
637 638
  SSqlCmd *pCmd = &pSql->cmd;

639
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
640

S
slguan 已提交
641 642
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
H
Haojun Liao 已提交
643
    return -1;  // todo add test for this
S
slguan 已提交
644
  }
645
  
646
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
647
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
648
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
649
  
H
hjxilinx 已提交
650
  if (taosArrayGetSize(pQueryInfo->colList) <= 0 && !tscQueryTags(pQueryInfo)) {
651 652 653
    tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
    return -1;
  }
654 655 656 657 658 659 660 661 662 663
  
  if (pQueryInfo->intervalTime < 0) {
    tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
    return -1;
  }
  
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
    return -1;
  }
664

665
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
H
hzcheng 已提交
666

667
  int32_t numOfTags = taosArrayGetSize(pTableMetaInfo->tagColList);
668
  
669
  if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
H
hjxilinx 已提交
670 671
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey);
H
hzcheng 已提交
672
  } else {
H
hjxilinx 已提交
673 674
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.ekey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey);
H
hzcheng 已提交
675 676
  }

677 678
  pQueryMsg->order          = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId     = htons(pQueryInfo->order.orderColId);
H
Haojun Liao 已提交
679
  pQueryMsg->fillType       = htons(pQueryInfo->fillType);
680 681
  pQueryMsg->limit          = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset         = htobe64(pQueryInfo->limit.offset);
682
  pQueryMsg->numOfCols      = htons(taosArrayGetSize(pQueryInfo->colList));
683 684
  pQueryMsg->intervalTime   = htobe64(pQueryInfo->intervalTime);
  pQueryMsg->slidingTime    = htobe64(pQueryInfo->slidingTime);
H
hjxilinx 已提交
685
  pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
686
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
687
  pQueryMsg->numOfTags      = htonl(numOfTags);
weixin_48148422's avatar
weixin_48148422 已提交
688
  pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
H
Haojun Liao 已提交
689
  pQueryMsg->queryType      = htonl(pQueryInfo->type);
H
hjxilinx 已提交
690 691 692
  
  size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
  pQueryMsg->numOfOutput = htons(numOfOutput);
H
hzcheng 已提交
693 694

  // set column list ids
695 696
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
H
hjxilinx 已提交
697
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
698
  
699 700 701
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
    SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
702

703
    if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
704
        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
705 706
      tscError("%p tid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
          pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
707 708
               pColSchema->name);

709
      return TSDB_CODE_TSC_INVALID_SQL;
710
    }
H
hzcheng 已提交
711 712 713

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
714
    pQueryMsg->colList[i].type  = htons(pColSchema->type);
S
slguan 已提交
715
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
716

S
slguan 已提交
717 718 719
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
720

S
slguan 已提交
721
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
722
      pFilterMsg->filterstr = htons(pColFilter->filterstr);
S
slguan 已提交
723 724

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
725

726
      if (pColFilter->filterstr) {
S
slguan 已提交
727 728 729 730 731 732 733 734 735 736
        pFilterMsg->len = htobe64(pColFilter->len);
        memcpy(pMsg, (void *)pColFilter->pz, pColFilter->len + 1);
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
737

S
slguan 已提交
738 739 740 741 742
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
        return -1;
      }
    }
H
hzcheng 已提交
743 744
  }

H
hjxilinx 已提交
745
  SSqlFuncMsg *pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hjxilinx 已提交
746
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
747
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
748

H
hjxilinx 已提交
749
    if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId)) {
750
      /* column id is not valid according to the cached table meta, the table meta is expired */
H
hzcheng 已提交
751 752 753 754
      tscError("%p table schema is not matched with parsed sql", pSql);
      return -1;
    }

755 756 757
    pSqlFuncExpr->colInfo.colId    = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
    pSqlFuncExpr->colInfo.flag     = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
758

759
    pSqlFuncExpr->functionId  = htons(pExpr->functionId);
H
hzcheng 已提交
760
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
H
hjxilinx 已提交
761
    pMsg += sizeof(SSqlFuncMsg);
H
hzcheng 已提交
762 763

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
H
Haojun Liao 已提交
764
      // todo add log
H
hzcheng 已提交
765 766 767 768 769
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
770
        pMsg += pExpr->param[j].nLen;
H
hzcheng 已提交
771 772 773 774 775
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

H
hjxilinx 已提交
776
    pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hzcheng 已提交
777
  }
778
  
779
  // serialize the table info (sid, uid, tags)
780 781
  pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
  
782
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
783
  if (pGroupbyExpr->numOfGroupCols > 0) {
S
slguan 已提交
784
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
785 786
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
787
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
788 789
      SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
  
S
slguan 已提交
790 791 792
      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

793 794
      *((int16_t *)pMsg) += pCol->colIndex;
      pMsg += sizeof(pCol->colIndex);
S
slguan 已提交
795 796 797

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
798 799 800
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
801 802 803
    }
  }

804
  if (pQueryInfo->fillType != TSDB_FILL_NONE) {
H
hjxilinx 已提交
805
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
806 807
      *((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]);
      pMsg += sizeof(pQueryInfo->fillVal[0]);
S
slguan 已提交
808 809
    }
  }
810 811 812 813 814 815 816 817 818
  
  if (numOfTags != 0) {
    int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
    int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
    int32_t total = numOfTagColumns + numOfColumns;
    
    pSchema = tscGetTableTagSchema(pTableMeta);
    
    for (int32_t i = 0; i < numOfTags; ++i) {
H
hjxilinx 已提交
819
      SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
820 821 822 823
      SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];

      if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
          (pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
824 825
        tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
                 pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, total, numOfTagColumns,
B
Bomin Zhang 已提交
826
                 pCol->colIndex.columnIndex, pColSchema->name);
827

828
        return TSDB_CODE_TSC_INVALID_SQL;
829 830 831 832 833 834 835 836 837 838 839 840
      }
  
      SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
  
      pTagCol->colId = htons(pColSchema->colId);
      pTagCol->bytes = htons(pColSchema->bytes);
      pTagCol->type  = htons(pColSchema->type);
      pTagCol->numOfFilters = 0;
      
      pMsg += sizeof(SColumnInfo);
    }
  }
S
slguan 已提交
841

H
Haojun Liao 已提交
842 843 844 845
  // serialize tag column query condition
  if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
    STagCond* pTagCond = &pQueryInfo->tagCond;
    
846
    SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid);
H
Haojun Liao 已提交
847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862
    if (pCond != NULL && pCond->cond != NULL) {
      pQueryMsg->tagCondLen = htons(pCond->len);
      memcpy(pMsg, pCond->cond, pCond->len);
      
      pMsg += pCond->len;
    }
  }
  
  if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
    *pMsg = 0;
    pMsg++;
  } else {
    strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
    pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
  }

S
slguan 已提交
863
  // compressed ts block
864
  pQueryMsg->tsOffset = htonl(pMsg - pCmd->payload);
S
slguan 已提交
865 866 867
  int32_t tsLen = 0;
  int32_t numOfBlocks = 0;

868
  if (pQueryInfo->tsBuf != NULL) {
H
hjxilinx 已提交
869
    STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vgroupIndex);
870
    assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL);  // this query should not be sent
S
slguan 已提交
871 872

    // todo refactor
B
Bomin Zhang 已提交
873 874 875 876 877
    if (fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET) != 0) {
      int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
      tscError("%p: fseek failed: %s", pSql, tstrerror(code));
      return code;
    }
H
Haojun Liao 已提交
878 879 880

    size_t s = fread(pMsg, 1, pBlockInfo->compLen, pQueryInfo->tsBuf->f);
    if (s != pBlockInfo->compLen) {
B
Bomin Zhang 已提交
881 882 883 884
      int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
      tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
      return code;
    }
S
slguan 已提交
885 886 887 888

    pMsg += pBlockInfo->compLen;
    tsLen = pBlockInfo->compLen;
    numOfBlocks = pBlockInfo->numOfBlocks;
H
hzcheng 已提交
889 890
  }

S
slguan 已提交
891 892
  pQueryMsg->tsLen = htonl(tsLen);
  pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
893 894
  if (pQueryInfo->tsBuf != NULL) {
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
H
hzcheng 已提交
895 896
  }

897
  int32_t msgLen = pMsg - pCmd->payload;
H
hzcheng 已提交
898

899
  tscDebug("%p msg built success,len:%d bytes", pSql, msgLen);
H
hzcheng 已提交
900
  pCmd->payloadLen = msgLen;
S
slguan 已提交
901
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hjxilinx 已提交
902
  
903
  pQueryMsg->head.contLen = htonl(msgLen);
H
hzcheng 已提交
904
  assert(msgLen + minMsgSize() <= size);
905 906

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
907 908
}

909 910
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
911
  pCmd->payloadLen = sizeof(SCMCreateDbMsg);
S
slguan 已提交
912
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
H
hzcheng 已提交
913

914
  SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload;
915

916
  assert(pCmd->numOfClause == 1);
917
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
918
  tstrncpy(pCreateDbMsg->db, pTableMetaInfo->name, sizeof(pCreateDbMsg->db));
H
hzcheng 已提交
919

920
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
921 922
}

923 924
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
925
  pCmd->payloadLen = sizeof(SCMCreateDnodeMsg);
S
slguan 已提交
926 927
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
928
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
929
  }
H
hzcheng 已提交
930

931
  SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload;
J
jtao1735 已提交
932 933
  strncpy(pCreate->ep, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
  
S
slguan 已提交
934
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
H
hzcheng 已提交
935

936
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
937 938
}

939 940
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
941
  pCmd->payloadLen = sizeof(SCMCreateAcctMsg);
S
slguan 已提交
942 943
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
944
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
945
  }
H
hzcheng 已提交
946

947
  SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
948

949 950
  SSQLToken *pName = &pInfo->pDCLInfo->user.user;
  SSQLToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
951

952 953
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
954

955
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
956

957 958 959 960 961 962 963 964
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
965

966 967 968 969 970 971 972 973 974 975 976 977 978
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
979

S
slguan 已提交
980
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
981
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
982 983
}

984 985
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
986
  pCmd->payloadLen = sizeof(SCMCreateUserMsg);
S
slguan 已提交
987

S
slguan 已提交
988 989
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
990
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
991 992
  }

993
  SCMCreateUserMsg *pAlterMsg = (SCMCreateUserMsg*)pCmd->payload;
H
hzcheng 已提交
994

995 996 997
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
  pAlterMsg->flag = pUser->type;
H
hzcheng 已提交
998

999 1000 1001 1002
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
1003 1004
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
1005
  }
H
hzcheng 已提交
1006

1007
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
S
slguan 已提交
1008
    pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
1009
  } else {
S
slguan 已提交
1010
    pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
1011
  }
H
hzcheng 已提交
1012

1013
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1014 1015
}

1016 1017
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1018
  pCmd->payloadLen = sizeof(SCMCfgDnodeMsg);
guanshengliang's avatar
guanshengliang 已提交
1019
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONFIG_DNODE;
1020 1021
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
1022

1023 1024
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1025
  pCmd->payloadLen = sizeof(SCMDropDbMsg);
H
hzcheng 已提交
1026

S
slguan 已提交
1027 1028
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1029
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1030 1031
  }

1032
  SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
1033

1034
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1035
  tstrncpy(pDropDbMsg->db, pTableMetaInfo->name, sizeof(pDropDbMsg->db));
1036
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1037

S
slguan 已提交
1038
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
1039
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1040 1041
}

1042 1043
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1044
  pCmd->payloadLen = sizeof(SCMDropTableMsg);
H
hzcheng 已提交
1045

S
slguan 已提交
1046 1047
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1048
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1049
  }
H
hzcheng 已提交
1050

1051
  SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
1052
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1053
  strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
S
slguan 已提交
1054
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1055

S
slguan 已提交
1056
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
1057
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1058 1059
}

1060
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1061
  SSqlCmd *pCmd = &pSql->cmd;
1062
  pCmd->payloadLen = sizeof(SCMDropDnodeMsg);
S
slguan 已提交
1063 1064
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1065
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1066
  }
H
hzcheng 已提交
1067

1068
  SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
1069
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1070
  tstrncpy(pDrop->ep, pTableMetaInfo->name, sizeof(pDrop->ep));
S
slguan 已提交
1071
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
H
hzcheng 已提交
1072

1073
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1074 1075
}

S
[TD-16]  
slguan 已提交
1076
int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1077
  SSqlCmd *pCmd = &pSql->cmd;
1078
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
S
slguan 已提交
1079
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
H
hzcheng 已提交
1080

S
slguan 已提交
1081 1082
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1083
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1084
  }
H
hzcheng 已提交
1085

1086
  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
1087
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1088
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
H
hzcheng 已提交
1089

1090
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1091 1092
}

S
[TD-16]  
slguan 已提交
1093 1094 1095 1096 1097 1098 1099
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_ACCT;

  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1100
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
[TD-16]  
slguan 已提交
1101 1102 1103 1104
  }

  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1105
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
S
[TD-16]  
slguan 已提交
1106 1107 1108 1109

  return TSDB_CODE_SUCCESS;
}

1110 1111
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1112
  pCmd->payloadLen = sizeof(SCMUseDbMsg);
H
hzcheng 已提交
1113

S
slguan 已提交
1114 1115
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1116
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1117
  }
1118

1119
  SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload;
1120
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1121
  strcpy(pUseDbMsg->db, pTableMetaInfo->name);
S
slguan 已提交
1122
  pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
H
hzcheng 已提交
1123

1124
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1125 1126
}

1127
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1128
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1129
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1130
  pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
1131
  pCmd->payloadLen = sizeof(SCMShowMsg) + 100;
H
hzcheng 已提交
1132

S
slguan 已提交
1133 1134
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1135
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1136
  }
H
hzcheng 已提交
1137

1138
  SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload;
S
slguan 已提交
1139

1140
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1141
  size_t nameLen = strlen(pTableMetaInfo->name);
S
slguan 已提交
1142
  if (nameLen > 0) {
B
Bomin Zhang 已提交
1143
    tstrncpy(pShowMsg->db, pTableMetaInfo->name, sizeof(pShowMsg->db));  // prefix is set here
H
hzcheng 已提交
1144
  } else {
B
Bomin Zhang 已提交
1145
    tstrncpy(pShowMsg->db, pObj->db, sizeof(pShowMsg->db));
H
hzcheng 已提交
1146 1147
  }

1148 1149 1150 1151 1152 1153 1154 1155 1156 1157
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
    SSQLToken *pPattern = &pShowInfo->pattern;
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
dengyihao's avatar
dengyihao 已提交
1158 1159
    SSQLToken *pEpAddr = &pShowInfo->prefix;
    assert(pEpAddr->n > 0 && pEpAddr->type > 0);
H
hzcheng 已提交
1160

dengyihao's avatar
dengyihao 已提交
1161 1162
    strncpy(pShowMsg->payload, pEpAddr->z, pEpAddr->n);
    pShowMsg->payloadLen = htons(pEpAddr->n);
1163 1164
  }

1165
  pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
1166
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1167 1168
}

1169
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1170
  SSqlCmd *pCmd = &pSql->cmd;
1171
  pCmd->payloadLen = sizeof(SCMKillQueryMsg);
H
hzcheng 已提交
1172

1173 1174
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
S
slguan 已提交
1175
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
1176 1177
      break;
    case TSDB_SQL_KILL_CONNECTION:
S
slguan 已提交
1178
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
1179 1180
      break;
    case TSDB_SQL_KILL_STREAM:
S
slguan 已提交
1181
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
1182 1183 1184
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1185 1186
}

1187
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1188 1189
  SSqlCmd *pCmd = &(pSql->cmd);

guanshengliang's avatar
guanshengliang 已提交
1190
  int32_t size = minMsgSize() + sizeof(SCMCreateTableMsg);
H
hzcheng 已提交
1191

1192
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1193
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
1194 1195
    size += sizeof(STagData);
  } else {
S
slguan 已提交
1196
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1197
  }
1198

1199 1200 1201
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1202 1203 1204 1205

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1206
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1207
  int              msgLen = 0;
S
slguan 已提交
1208
  SSchema *        pSchema;
H
hzcheng 已提交
1209
  int              size = 0;
1210 1211 1212
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1213
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1214 1215

  // Reallocate the payload size
1216
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1217 1218
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1219
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1220
  }
H
hzcheng 已提交
1221 1222


1223
  SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
H
hjxilinx 已提交
1224
  strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
1225 1226

  // use dbinfo from table id without modifying current db info
H
Haojun Liao 已提交
1227
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pCreateTableMsg->db);
H
hzcheng 已提交
1228

1229 1230 1231
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;
H
hzcheng 已提交
1232 1233 1234 1235
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
S
slguan 已提交
1236
  char *pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
1237

1238 1239
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
1240 1241 1242 1243 1244 1245 1246
    STagData* pTag = &pInfo->pCreateTableInfo->usingInfo.tagdata;
    *(int32_t*)pMsg = htonl(pTag->dataLen);
    pMsg += sizeof(int32_t);
    memcpy(pMsg, pTag->name, sizeof(pTag->name));
    pMsg += sizeof(pTag->name);
    memcpy(pMsg, pTag->data, pTag->dataLen);
    pMsg += pTag->dataLen;
1247
  } else {  // create (super) table
1248
    pSchema = (SSchema *)pCreateTableMsg->schema;
1249

H
hzcheng 已提交
1250
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
H
hjxilinx 已提交
1251
      TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hzcheng 已提交
1252 1253 1254 1255

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
1256

H
hzcheng 已提交
1257 1258 1259 1260
      pSchema++;
    }

    pMsg = (char *)pSchema;
1261 1262
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
1263

1264 1265 1266
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
1267 1268 1269
    }
  }

H
hjxilinx 已提交
1270
  tscFieldInfoClear(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
1271

S
slguan 已提交
1272
  msgLen = pMsg - (char*)pCreateTableMsg;
S
slguan 已提交
1273
  pCreateTableMsg->contLen = htonl(msgLen);
H
hzcheng 已提交
1274
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1275
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
H
hzcheng 已提交
1276 1277

  assert(msgLen + minMsgSize() <= size);
1278
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1279 1280 1281
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
1282
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
guanshengliang's avatar
guanshengliang 已提交
1283
  return minMsgSize() + sizeof(SCMAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
1284 1285 1286
         TSDB_EXTRA_PAYLOAD_SIZE;
}

1287
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1288 1289
  char *pMsg;
  int   msgLen = 0;
H
hzcheng 已提交
1290

1291
  SSqlCmd    *pCmd = &pSql->cmd;
1292 1293
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
1294
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1295 1296 1297
  
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;
  int size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
1298 1299 1300 1301
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
    return -1;
  }
1302 1303
  
  SCMAlterTableMsg *pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
H
Haojun Liao 已提交
1304
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
1305

H
hjxilinx 已提交
1306
  strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
1307
  pAlterTableMsg->type = htons(pAlterInfo->type);
1308

1309
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
S
slguan 已提交
1310
  SSchema *pSchema = pAlterTableMsg->schema;
1311
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
H
hjxilinx 已提交
1312
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
1313
  
H
hzcheng 已提交
1314 1315 1316 1317 1318 1319 1320
    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;
1321 1322 1323
  pAlterTableMsg->tagValLen = htonl(pAlterInfo->tagData.dataLen);
  memcpy(pMsg, pAlterInfo->tagData.data, pAlterInfo->tagData.dataLen);
  pMsg += pAlterInfo->tagData.dataLen;
H
hzcheng 已提交
1324

S
slguan 已提交
1325
  msgLen = pMsg - (char*)pAlterTableMsg;
1326

H
hzcheng 已提交
1327
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1328
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
H
hzcheng 已提交
1329 1330

  assert(msgLen + minMsgSize() <= size);
1331

1332
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1333 1334
}

1335 1336 1337 1338
int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
  SSqlCmd* pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL;
  
1339
  SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
1340
  pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
1341

1342 1343
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1344

dengyihao's avatar
dengyihao 已提交
1345
  tscDumpEpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->epSet);
1346

1347 1348 1349
  return TSDB_CODE_SUCCESS;
}

1350
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1351
  SSqlCmd *pCmd = &pSql->cmd;
1352
  pCmd->payloadLen = sizeof(SCMAlterDbMsg);
S
slguan 已提交
1353
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
H
hzcheng 已提交
1354

1355
  SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
1356
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1357
  tstrncpy(pAlterDbMsg->db, pTableMetaInfo->name, sizeof(pAlterDbMsg->db));
H
hzcheng 已提交
1358

1359
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1360 1361
}

1362
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1363
  SSqlCmd *pCmd = &pSql->cmd;
J
jtao1735 已提交
1364
  pCmd->msgType = TSDB_MSG_TYPE_CM_RETRIEVE;
S
slguan 已提交
1365
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
1366

S
slguan 已提交
1367 1368
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1369
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
1370
  }
S
slguan 已提交
1371

S
slguan 已提交
1372 1373 1374 1375
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
1376

1377
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1378 1379
}

1380
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
1381
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1382 1383 1384
    return pRes->code;
  }

H
hjxilinx 已提交
1385
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
H
hjxilinx 已提交
1386
    int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
H
hjxilinx 已提交
1387
    pRes->tsrow[i] = ((char*) pRes->data + offset * pRes->numOfRows);
H
hzcheng 已提交
1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
1402

1403
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1404

H
hzcheng 已提交
1405 1406 1407 1408 1409 1410
  pRes->code = TSDB_CODE_SUCCESS;
  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

1411
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1412
  } else {
S
slguan 已提交
1413
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
1429
  SSqlCmd *       pCmd = &pSql->cmd;
1430
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
1431

H
hjxilinx 已提交
1432
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1433 1434
  
  int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
H
hzcheng 已提交
1435 1436 1437
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
Haojun Liao 已提交
1438 1439 1440
int tscProcessLocalRetrieveRsp(SSqlObj *pSql) {
  int32_t numOfRes = 1;
  pSql->res.completed = true;
H
hzcheng 已提交
1441 1442 1443
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
hjxilinx 已提交
1444
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
H
hzcheng 已提交
1445 1446 1447
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

1448
  pRes->code = tscDoLocalMerge(pSql);
H
hjxilinx 已提交
1449
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1450 1451

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
1452
    tscCreateResPointerInfo(pRes, pQueryInfo);
H
hzcheng 已提交
1453 1454 1455
  }

  pRes->row = 0;
1456
  pRes->completed = (pRes->numOfRows == 0);
H
hzcheng 已提交
1457

1458
  int32_t code = pRes->code;
H
hjxilinx 已提交
1459 1460 1461 1462
  if (pRes->code == TSDB_CODE_SUCCESS) {
    (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
  } else {
    tscQueueAsyncRes(pSql);
H
hzcheng 已提交
1463 1464 1465 1466 1467
  }

  return code;
}

S
slguan 已提交
1468
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
1469

1470
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1471
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1472
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1473
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
1474
  pCmd->payloadLen = sizeof(SCMConnectMsg);
H
hzcheng 已提交
1475

S
slguan 已提交
1476 1477
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1478
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1479 1480
  }

1481
  SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
H
hzcheng 已提交
1482 1483 1484 1485

  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
B
Bomin Zhang 已提交
1486 1487 1488
  tstrncpy(pConnect->db, db, sizeof(pConnect->db));
  tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion));
  tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion));
H
hzcheng 已提交
1489

1490
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1491 1492
}

H
hjxilinx 已提交
1493
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1494
  SCMTableInfoMsg *pInfoMsg;
S
slguan 已提交
1495
  char *         pMsg;
H
hzcheng 已提交
1496 1497
  int            msgLen = 0;

B
Bomin Zhang 已提交
1498 1499 1500 1501
  char *tmpData = NULL;
  uint32_t len = pSql->cmd.payloadLen;
  if (len > 0) {
    tmpData = calloc(1, len);
1502
    if (NULL == tmpData) {
1503
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
1504 1505
    }

H
hzcheng 已提交
1506
    // STagData is in binary format, strncpy is not available
B
Bomin Zhang 已提交
1507
    memcpy(tmpData, pSql->cmd.payload, len);
H
hzcheng 已提交
1508 1509
  }

1510 1511 1512
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
1513
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1514

1515
  pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
H
hjxilinx 已提交
1516
  strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
H
hjxilinx 已提交
1517
  pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
S
slguan 已提交
1518

1519
  pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
H
hzcheng 已提交
1520

B
Bomin Zhang 已提交
1521 1522 1523
  if (pSql->cmd.autoCreated && len > 0) {
    memcpy(pInfoMsg->tags, tmpData, len);
    pMsg += len;
H
hzcheng 已提交
1524 1525
  }

1526
  pCmd->payloadLen = pMsg - (char*)pInfoMsg;
S
slguan 已提交
1527
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
H
hzcheng 已提交
1528

S
Shengliang Guan 已提交
1529
  taosTFree(tmpData);
H
hzcheng 已提交
1530 1531

  assert(msgLen + minMsgSize() <= pCmd->allocSize);
1532
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1533 1534
}

S
slguan 已提交
1535
/**
1536
 *  multi table meta req pkg format:
1537
 *  | SMgmtHead | SCMMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
1538 1539
 *      no used         4B
 **/
1540
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
guanshengliang's avatar
guanshengliang 已提交
1541
#if 0
S
slguan 已提交
1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
    tmpData = calloc(1, pCmd->payloadLen + 1);
    if (NULL == tmpData) return -1;
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
S
slguan 已提交
1554
  memset(pMgmt->db, 0, TSDB_TABLE_ID_LEN);  // server don't need the db
S
slguan 已提交
1555

1556
  SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
S
slguan 已提交
1557
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
1558 1559

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
1560
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
1561 1562
  }

S
Shengliang Guan 已提交
1563
  taosTFree(tmpData);
S
slguan 已提交
1564

1565
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1566
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
S
slguan 已提交
1567 1568 1569

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

1570
  tscDebug("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
1571 1572 1573
           pCmd->payloadLen);

  return pCmd->payloadLen;
guanshengliang's avatar
guanshengliang 已提交
1574 1575
#endif
  return 0;  
S
slguan 已提交
1576 1577
}

H
hjxilinx 已提交
1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603
//static UNUSED_FUNC int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
////  const int32_t defaultSize =
////      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
////  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
////
////  int32_t n = 0;
////  size_t size = taosArrayGetSize(pQueryInfo->tagCond.pCond);
////  for (int32_t i = 0; i < size; ++i) {
////    assert(0);
//////    n += strlen(pQueryInfo->tagCond.cond[i].cond);
////  }
////
////  int32_t tagLen = n * TSDB_NCHAR_SIZE;
////  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
////    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
////  }
////
////  int32_t joinCondLen = (TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2;
////  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
////
////  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndex);
////
////  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
////
////  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
//}
H
hzcheng 已提交
1604

H
hjxilinx 已提交
1605 1606
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hjxilinx 已提交
1607 1608 1609 1610 1611 1612 1613 1614 1615 1616
  
  char* pMsg = pCmd->payload;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  
  SCMSTableVgroupMsg *pStableVgroupMsg = (SCMSTableVgroupMsg *) pMsg;
  pStableVgroupMsg->numOfTables = htonl(pQueryInfo->numOfTables);
  pMsg += sizeof(SCMSTableVgroupMsg);
  
  for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
1617 1618 1619
    size_t size = sizeof(pTableMetaInfo->name);
    tstrncpy(pMsg, pTableMetaInfo->name, size);
    pMsg += size;
H
hjxilinx 已提交
1620
  }
H
hjxilinx 已提交
1621 1622

  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
H
hjxilinx 已提交
1623
  pCmd->payloadLen = (pMsg - pCmd->payload);
H
hjxilinx 已提交
1624

1625
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1626 1627
}

1628 1629
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1630 1631
  STscObj *pObj = pSql->pTscObj;

1632
  pthread_mutex_lock(&pObj->mutex);
H
hzcheng 已提交
1633

S
Shengliang Guan 已提交
1634
  int32_t numOfQueries = 2;
H
hzcheng 已提交
1635 1636 1637
  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
    tpSql = tpSql->next;
1638
    numOfQueries++;
H
hzcheng 已提交
1639 1640
  }

S
Shengliang Guan 已提交
1641
  int32_t numOfStreams = 2;
H
hzcheng 已提交
1642 1643 1644
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
    pStream = pStream->next;
1645
    numOfStreams++;
H
hzcheng 已提交
1646 1647
  }

1648
  int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100;
S
slguan 已提交
1649
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
1650
    pthread_mutex_unlock(&pObj->mutex);
S
slguan 已提交
1651 1652 1653
    tscError("%p failed to malloc for heartbeat msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1654

1655
  SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
1656 1657
  pHeartbeat->numOfQueries = numOfQueries;
  pHeartbeat->numOfStreams = numOfStreams;
1658
  int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj);
H
hzcheng 已提交
1659 1660 1661 1662

  pthread_mutex_unlock(&pObj->mutex);

  pCmd->payloadLen = msgLen;
S
slguan 已提交
1663
  pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
H
hzcheng 已提交
1664

1665
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1666 1667
}

1668 1669
int tscProcessTableMetaRsp(SSqlObj *pSql) {
  STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
H
hzcheng 已提交
1670

1671 1672
  pMetaMsg->sid = htonl(pMetaMsg->sid);
  pMetaMsg->sversion = htons(pMetaMsg->sversion);
1673
  pMetaMsg->tversion = htons(pMetaMsg->tversion);
H
hjxilinx 已提交
1674 1675
  pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
  
1676 1677
  pMetaMsg->uid = htobe64(pMetaMsg->uid);
  pMetaMsg->contLen = htons(pMetaMsg->contLen);
1678
  pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1679

1680 1681
  if (pMetaMsg->sid < 0 || pMetaMsg->vgroup.numOfEps < 0) {
    tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgroup.numOfEps, pMetaMsg->sid);
1682
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1683 1684
  }

B
Bomin Zhang 已提交
1685
  if (pMetaMsg->numOfTags > TSDB_MAX_TAGS) {
1686
    tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
1687
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1688 1689
  }

1690 1691
  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
1692
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1693 1694
  }

1695 1696
  for (int i = 0; i < pMetaMsg->vgroup.numOfEps; ++i) {
    pMetaMsg->vgroup.epAddr[i].port = htons(pMetaMsg->vgroup.epAddr[i].port);
H
hzcheng 已提交
1697 1698
  }

1699
  SSchema* pSchema = pMetaMsg->schema;
H
hzcheng 已提交
1700

1701
  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
H
hzcheng 已提交
1702 1703 1704
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);
1705 1706 1707 1708 1709

    if (pSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
      assert(i == 0);
    }

H
hjxilinx 已提交
1710
    assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
1711 1712 1713
    pSchema++;
  }

1714 1715
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
1716
  
H
hzcheng 已提交
1717
  // todo add one more function: taosAddDataIfNotExists();
1718
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1719
  assert(pTableMetaInfo->pTableMeta == NULL);
H
hzcheng 已提交
1720

H
Haojun Liao 已提交
1721 1722
  pTableMetaInfo->pTableMeta = (STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name,
      strlen(pTableMetaInfo->name), pTableMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1723
  
1724
  // todo handle out of memory case
1725
  if (pTableMetaInfo->pTableMeta == NULL) {
B
Bomin Zhang 已提交
1726
    free(pTableMeta);
1727
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1728
  }
H
hzcheng 已提交
1729

1730
  tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->id.uid, pTableMeta->id.tid, pTableMetaInfo->name);
1731
  free(pTableMeta);
1732
  
H
hjxilinx 已提交
1733
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1734 1735
}

S
slguan 已提交
1736
/**
1737
 *  multi table meta rsp pkg format:
1738
 *  | STaosRsp | ieType | SCMMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
1739 1740 1741
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
1742
#if 0
S
slguan 已提交
1743 1744 1745 1746 1747
  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
1748
    pSql->res.code = TSDB_CODE_TSC_INVALID_IE;
S
slguan 已提交
1749
    pSql->res.numOfTotal = 0;
1750
    return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1751 1752 1753 1754
  }

  rsp++;

1755
  SCMMultiTableInfoMsg *pInfo = (SCMMultiTableInfoMsg *)rsp;
S
slguan 已提交
1756
  totalNum = htonl(pInfo->numOfTables);
1757
  rsp += sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1758 1759

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
1760
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
1761
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
1762 1763 1764

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
H
hjxilinx 已提交
1765
    pMeta->vgId = htonl(pMeta->vgId);
S
slguan 已提交
1766 1767
    pMeta->uid = htobe64(pMeta->uid);

H
hjxilinx 已提交
1768 1769
    if (pMeta->sid <= 0 || pMeta->vgId < 0) {
      tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
1770
      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
S
slguan 已提交
1771
      pSql->res.numOfTotal = i;
1772
      return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1773 1774
    }

H
hjxilinx 已提交
1775 1776 1777 1778
    //    pMeta->numOfColumns = htons(pMeta->numOfColumns);
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid tag value count:%d", pMeta->numOfTags);
1779
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1780
    //      pSql->res.numOfTotal = i;
1781
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1782 1783 1784 1785
    //    }
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid numOfTags:%d", pMeta->numOfTags);
1786
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1787
    //      pSql->res.numOfTotal = i;
1788
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1789 1790 1791 1792
    //    }
    //
    //    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
    //      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
1793
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1794
    //      pSql->res.numOfTotal = i;
1795
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1796 1797
    //    }
    //
H
hjxilinx 已提交
1798
    //    for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) {
H
hjxilinx 已提交
1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832
    //      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    //    }
    //
    //    pMeta->rowSize = 0;
    //    rsp += sizeof(SMultiTableMeta);
    //    pSchema = (SSchema *)rsp;
    //
    //    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    //    for (int j = 0; j < numOfTotalCols; ++j) {
    //      pSchema->bytes = htons(pSchema->bytes);
    //      pSchema->colId = htons(pSchema->colId);
    //
    //      // ignore the tags length
    //      if (j < pMeta->numOfColumns) {
    //        pMeta->rowSize += pSchema->bytes;
    //      }
    //      pSchema++;
    //    }
    //
    //    rsp += numOfTotalCols * sizeof(SSchema);
    //
    //    int32_t  tagLen = 0;
    //    SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
    //
    //    if (pMeta->tableType == TSDB_CHILD_TABLE) {
    //      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
    //        tagLen += pTagsSchema[j].bytes;
    //      }
    //    }
    //
    //    rsp += tagLen;
    //    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
    //
    //    pMeta->index = 0;
H
hjxilinx 已提交
1833
    //    (void)taosCachePut(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1834
    //  }
S
slguan 已提交
1835
  }
H
hjxilinx 已提交
1836
  
S
slguan 已提交
1837 1838
  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
1839
  tscDebug("%p load multi-metermeta resp from complete num:%d", pSql, pSql->res.numOfTotal);
1840 1841
#endif
  
S
slguan 已提交
1842 1843 1844
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
1845
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
1846
  SSqlRes* pRes = &pSql->res;
H
hjxilinx 已提交
1847
  
H
hjxilinx 已提交
1848
  // NOTE: the order of several table must be preserved.
1849
  SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pRes->pRsp;
H
hjxilinx 已提交
1850 1851
  pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables);
  char* pMsg = pRes->pRsp + sizeof(SCMSTableVgroupRspMsg);
H
hjxilinx 已提交
1852
  
1853 1854 1855
  // master sqlObj locates in param
  SSqlObj* parent = pSql->param;
  assert(parent != NULL);
H
hjxilinx 已提交
1856
  
1857
  SSqlCmd* pCmd = &parent->cmd;
H
hjxilinx 已提交
1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868
  for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) {
    STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
    SVgroupsInfo *  pVgroupInfo = (SVgroupsInfo *)pMsg;
    pVgroupInfo->numOfVgroups = htonl(pVgroupInfo->numOfVgroups);

    size_t size = sizeof(SCMVgroupInfo) * pVgroupInfo->numOfVgroups + sizeof(SVgroupsInfo);
    pInfo->vgroupList = calloc(1, size);
    assert(pInfo->vgroupList != NULL);

    memcpy(pInfo->vgroupList, pVgroupInfo, size);
    for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
dengyihao's avatar
dengyihao 已提交
1869
      //just init, no need to lock
H
hjxilinx 已提交
1870 1871
      SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
      pVgroups->vgId = htonl(pVgroups->vgId);
1872
      assert(pVgroups->numOfEps >= 1);
H
hjxilinx 已提交
1873

1874 1875
      for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
        pVgroups->epAddr[k].port = htons(pVgroups->epAddr[k].port);
H
hjxilinx 已提交
1876
      }
1877
    }
1878 1879

    pMsg += size;
H
hjxilinx 已提交
1880 1881
  }
  
S
slguan 已提交
1882
  return pSql->res.code;
H
hzcheng 已提交
1883 1884 1885 1886 1887 1888
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
H
hjxilinx 已提交
1889
  STableMetaMsg * pMetaMsg;
1890
  SCMShowRsp *pShow;
S
slguan 已提交
1891
  SSchema *    pSchema;
H
hzcheng 已提交
1892 1893
  char         key[20];

1894 1895 1896
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
1897
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
1898

H
hjxilinx 已提交
1899
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1900

1901
  pShow = (SCMShowRsp *)pRes->pRsp;
S
slguan 已提交
1902
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
1903 1904
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
1905
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
1906
  pMetaMsg = &(pShow->tableMeta);
H
hzcheng 已提交
1907

H
hjxilinx 已提交
1908
  pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1909

H
hjxilinx 已提交
1910
  pSchema = pMetaMsg->schema;
H
hjxilinx 已提交
1911 1912
  pMetaMsg->sid = ntohs(pMetaMsg->sid);
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
hzcheng 已提交
1913 1914 1915 1916
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

1917
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
1918 1919
  strcpy(key + 1, "showlist");

H
hjxilinx 已提交
1920
  taosCacheRelease(tscCacheHandle, (void *)&(pTableMetaInfo->pTableMeta), false);
H
hjxilinx 已提交
1921
  
H
hjxilinx 已提交
1922 1923 1924
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
  
H
Haojun Liao 已提交
1925 1926
  pTableMetaInfo->pTableMeta = taosCachePut(tscCacheHandle, key, strlen(key), (char *)pTableMeta, size,
      tsTableMetaKeepTimer);
H
hjxilinx 已提交
1927
  SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
1928

1929 1930 1931 1932
  if (pQueryInfo->colList == NULL) {
    pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
  }
  
H
hjxilinx 已提交
1933 1934
  SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
  
S
slguan 已提交
1935
  SColumnIndex index = {0};
H
hjxilinx 已提交
1936 1937 1938
  pSchema = pMetaMsg->schema;
  
  for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) {
S
slguan 已提交
1939
    index.columnIndex = i;
1940 1941
    tscColumnListInsert(pQueryInfo->colList, &index);
    
H
hjxilinx 已提交
1942 1943
    TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes);
    SFieldSupInfo* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
H
hjxilinx 已提交
1944
    
H
hjxilinx 已提交
1945
    pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
1946
                     pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false);
H
hzcheng 已提交
1947
  }
H
hjxilinx 已提交
1948 1949
  
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
1950
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
1951
  
S
Shengliang Guan 已提交
1952
  taosTFree(pTableMeta);
H
hzcheng 已提交
1953 1954 1955 1956
  return 0;
}

int tscProcessConnectRsp(SSqlObj *pSql) {
S
slguan 已提交
1957
  char temp[TSDB_TABLE_ID_LEN * 2];
H
hzcheng 已提交
1958 1959 1960
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

1961
  SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
B
Bomin Zhang 已提交
1962
  tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId));  // copy acctId from response
1963 1964
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

B
Bomin Zhang 已提交
1965 1966
  assert(len <= sizeof(pObj->db));
  tstrncpy(pObj->db, temp, sizeof(pObj->db));
H
hjxilinx 已提交
1967
  
dengyihao's avatar
dengyihao 已提交
1968 1969 1970
  if (pConnect->epSet.numOfEps > 0) {
    tscEpSetHtons(&pConnect->epSet);
    tscUpdateMgmtEpSet(&pConnect->epSet);
dengyihao's avatar
dengyihao 已提交
1971
  } 
H
hzcheng 已提交
1972

S
slguan 已提交
1973
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
1974 1975
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
S
Shengliang Guan 已提交
1976
  pObj->connId = htonl(pConnect->connId);
S
scripts  
slguan 已提交
1977
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
H
hzcheng 已提交
1978 1979 1980 1981 1982

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
1983
  STscObj *       pObj = pSql->pTscObj;
1984
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
1985

B
Bomin Zhang 已提交
1986
  tstrncpy(pObj->db, pTableMetaInfo->name, sizeof(pObj->db));
H
hzcheng 已提交
1987 1988 1989 1990
  return 0;
}

int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
1991
  taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
1992 1993 1994 1995
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
1996
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
1997

H
Haojun Liao 已提交
1998 1999
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2000 2001 2002 2003 2004 2005 2006
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
2007 2008
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
2009
   */
2010
  tscDebug("%p force release table meta after drop table:%s", pSql, pTableMetaInfo->name);
H
hjxilinx 已提交
2011
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2012

H
hjxilinx 已提交
2013 2014
  if (pTableMetaInfo->pTableMeta) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2015 2016 2017 2018 2019 2020
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
2021
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2022

H
Haojun Liao 已提交
2023
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2024
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2025 2026 2027
    return 0;
  }

2028
  tscDebug("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
H
hjxilinx 已提交
2029
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2030

H
hjxilinx 已提交
2031
  if (pTableMetaInfo->pTableMeta) {
weixin_48148422's avatar
weixin_48148422 已提交
2032
    bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
H
hjxilinx 已提交
2033
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2034

2035
    if (isSuperTable) {  // if it is a super table, reset whole query cache
2036
      tscDebug("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
2037
      taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

2052
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
2053 2054 2055
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
2056
  pRes->data = NULL;
S
slguan 已提交
2057
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2058 2059 2060
  return 0;
}

H
hjxilinx 已提交
2061
int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
S
slguan 已提交
2062 2063 2064
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
2065
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2066 2067 2068

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
2069 2070
  pRes->offset    = htobe64(pRetrieve->offset);
  pRes->useconds  = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
2071
  pRes->completed = (pRetrieve->completed == 1);
2072
  pRes->data      = pRetrieve->data;
H
hjxilinx 已提交
2073
  
2074
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2075 2076 2077 2078
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
    return pRes->code;
  }
  
weixin_48148422's avatar
weixin_48148422 已提交
2079
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
2080
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2081
    
H
hjxilinx 已提交
2082
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
H
hjxilinx 已提交
2083 2084
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
2085 2086
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
2087
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
2088
    p += sizeof(int32_t);
S
slguan 已提交
2089
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
2090 2091
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
weixin_48148422's avatar
weixin_48148422 已提交
2092
      p += sizeof(int32_t); // skip tid
weixin_48148422's avatar
weixin_48148422 已提交
2093 2094
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
2095
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
2096
    }
2097 2098
  }

H
hzcheng 已提交
2099
  pRes->row = 0;
2100
  tscDebug("%p numOfRows:%" PRId64 ", offset:%" PRId64 ", complete:%d", pSql, pRes->numOfRows, pRes->offset, pRes->completed);
H
hzcheng 已提交
2101 2102 2103 2104

  return 0;
}

H
hjxilinx 已提交
2105
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
H
hzcheng 已提交
2106

2107
static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
S
slguan 已提交
2108 2109
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
2110
    tscError("%p malloc failed for new sqlobj to get table meta", pSql);
2111
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2112
  }
2113

H
hzcheng 已提交
2114 2115 2116
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
2117

2118
  tscAddSubqueryInfo(&pNew->cmd);
2119 2120 2121 2122

  SQueryInfo *pNewQueryInfo = NULL;
  tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo);

H
hjxilinx 已提交
2123
  pNew->cmd.autoCreated = pSql->cmd.autoCreated;  // create table if not exists
B
Bomin Zhang 已提交
2124
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) {
2125
    tscError("%p malloc failed for payload to get table meta", pSql);
S
slguan 已提交
2126
    free(pNew);
2127

2128
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2129 2130
  }

H
hjxilinx 已提交
2131
  STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
2132
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
2133

B
Bomin Zhang 已提交
2134
  tstrncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, sizeof(pNewMeterMetaInfo->name));
B
Bomin Zhang 已提交
2135 2136
  memcpy(pNew->cmd.payload, pSql->cmd.payload, pSql->cmd.payloadLen);  // tag information if table does not exists.
  pNew->cmd.payloadLen = pSql->cmd.payloadLen;
2137
  tscDebug("%p new pSqlObj:%p to get tableMeta, auto create:%d", pSql, pNew, pNew->cmd.autoCreated);
H
hzcheng 已提交
2138

H
hjxilinx 已提交
2139 2140
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
H
hzcheng 已提交
2141

H
hjxilinx 已提交
2142 2143
  int32_t code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2144
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;  // notify upper application that current process need to be terminated
H
hzcheng 已提交
2145 2146 2147 2148 2149
  }

  return code;
}

H
hjxilinx 已提交
2150
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
H
hjxilinx 已提交
2151
  assert(strlen(pTableMetaInfo->name) != 0);
S
slguan 已提交
2152

2153
  // If this STableMetaInfo owns a table meta, release it first
H
hjxilinx 已提交
2154 2155
  if (pTableMetaInfo->pTableMeta != NULL) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), false);
2156 2157
  }
  
H
Haojun Liao 已提交
2158
  pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2159
  if (pTableMetaInfo->pTableMeta != NULL) {
H
hjxilinx 已提交
2160
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
2161
    tscDebug("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
2162
             tinfo.numOfTags, pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2163 2164 2165

    return TSDB_CODE_SUCCESS;
  }
2166 2167
  
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2168 2169
}

H
hjxilinx 已提交
2170
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
H
hjxilinx 已提交
2171
  pSql->cmd.autoCreated = createIfNotExists;
H
hjxilinx 已提交
2172
  return tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
2173 2174 2175
}

/**
H
Haojun Liao 已提交
2176
 * retrieve table meta from mnode, and update the local table meta cache.
H
hzcheng 已提交
2177
 * @param pSql          sql object
H
Haojun Liao 已提交
2178
 * @param tableId       table full name
H
hzcheng 已提交
2179 2180
 * @return              status code
 */
H
Haojun Liao 已提交
2181
int tscRenewTableMeta(SSqlObj *pSql, char *tableId) {
H
hzcheng 已提交
2182
  SSqlCmd *pCmd = &pSql->cmd;
2183 2184

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
2185
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
2186

H
Haojun Liao 已提交
2187 2188
  STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
  if (pTableMetaInfo->pTableMeta) {
2189
    tscDebug("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
2190
             tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid, pTableMeta);
H
hzcheng 已提交
2191 2192
  }

H
Haojun Liao 已提交
2193 2194
  taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2195 2196
}

H
hjxilinx 已提交
2197
static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
2198
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2199
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2200
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2201 2202
    if (pTableMetaInfo->vgroupList == NULL) {
      return false;
S
slguan 已提交
2203 2204
    }
  }
H
hjxilinx 已提交
2205 2206 2207 2208
  
  // all super tables vgroupinfo are retrieved, no need to retrieve vgroup info anymore
  return true;
}
H
hzcheng 已提交
2209

H
hjxilinx 已提交
2210
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
2211
  int      code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hjxilinx 已提交
2212 2213 2214
  SSqlCmd *pCmd = &pSql->cmd;
  
  if (allVgroupInfoRetrieved(pCmd, clauseIndex)) {
H
hzcheng 已提交
2215 2216
    return TSDB_CODE_SUCCESS;
  }
H
hjxilinx 已提交
2217

S
slguan 已提交
2218
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
2219 2220 2221
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

H
hjxilinx 已提交
2222
  pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
2223 2224
  
  SQueryInfo *pNewQueryInfo = NULL;
2225
  if ((code = tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo)) != TSDB_CODE_SUCCESS) {
B
Bomin Zhang 已提交
2226
    tscFreeSqlObj(pNew);
2227 2228
    return code;
  }
2229
  
H
hjxilinx 已提交
2230
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2231
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2232 2233 2234
    STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
    STableMeta *pTableMeta = taosCacheAcquireByData(tscCacheHandle, pMInfo->pTableMeta);
    tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList);
S
slguan 已提交
2235 2236 2237 2238 2239 2240
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
2241

2242
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
2243
  tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
H
hzcheng 已提交
2244

2245 2246 2247 2248
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
  code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
2249
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2250 2251 2252 2253 2254
  }

  return code;
}

2255
void tscInitMsgsFp() {
S
slguan 已提交
2256 2257
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
J
jtao1735 已提交
2258
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;
H
hzcheng 已提交
2259 2260

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
2261
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
2262

2263 2264
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
2265 2266

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
S
[TD-16]  
slguan 已提交
2267
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserMsg;
H
hzcheng 已提交
2268 2269 2270
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
2271
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
2272 2273 2274
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
2275
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
2276
  tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
H
hzcheng 已提交
2277 2278 2279 2280
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
H
hjxilinx 已提交
2281
  tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
H
hjxilinx 已提交
2282
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
S
slguan 已提交
2283
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
2284 2285 2286 2287

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
2288 2289 2290
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
2291 2292

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
H
hjxilinx 已提交
2293
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
H
hzcheng 已提交
2294 2295 2296 2297 2298

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
2299
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
H
hjxilinx 已提交
2300
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
S
slguan 已提交
2301
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
2302 2303

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
H
hjxilinx 已提交
2304
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode;  // rsp handled by same function.
H
hzcheng 已提交
2305
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
2306

H
Haojun Liao 已提交
2307 2308 2309 2310 2311
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB]   = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION]  = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS]  = tscProcessLocalRetrieveRsp;
2312

H
hzcheng 已提交
2313 2314
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

H
hjxilinx 已提交
2315
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp;
H
hzcheng 已提交
2316 2317 2318 2319 2320 2321 2322 2323 2324 2325

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;
}