tscServer.c 77.4 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
17
#include "qsqltype.h"
H
hzcheng 已提交
18 19
#include "tcache.h"
#include "trpc.h"
20 21
#include "tscLocalMerge.h"
#include "tscLog.h"
H
hzcheng 已提交
22 23 24 25 26 27 28 29 30 31
#include "tscProfile.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"

#define TSC_MGMT_VNODE 999

S
slguan 已提交
32
SRpcIpSet  tscMgmtIpSet;
S
slguan 已提交
33 34
SRpcIpSet  tscDnodeIpSet;

35 36
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
37 38 39
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
40

B
Bomin Zhang 已提交
41
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt);
42 43
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
44

S
slguan 已提交
45
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
hzcheng 已提交
46

47
static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
48 49
  SRpcIpSet* pIpList = &pSql->ipList;
  
50
  pIpList->numOfIps = pVgroupInfo->numOfIps;
51 52
  pIpList->inUse    = 0;
  
53 54 55
  for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) {
    strcpy(pIpList->fqdn[i], pVgroupInfo->ipAddr[i].fqdn);
    pIpList->port[i] = pVgroupInfo->ipAddr[i].port;
56 57 58
  }
}

S
slguan 已提交
59
void tscPrintMgmtIp() {
S
slguan 已提交
60
  if (tscMgmtIpSet.numOfIps <= 0) {
S
Shengliang Guan 已提交
61
    tscError("invalid mnode IP list:%d", tscMgmtIpSet.numOfIps);
S
slguan 已提交
62
  } else {
S
slguan 已提交
63
    for (int i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
64
      tscDebug("mnode index:%d %s:%d", i, tscMgmtIpSet.fqdn[i], tscMgmtIpSet.port[i]);
S
slguan 已提交
65
    }
S
slguan 已提交
66 67 68
  }
}

69
void tscSetMgmtIpList(SRpcIpSet *pIpList) {
S
slguan 已提交
70 71 72
  tscMgmtIpSet.numOfIps = pIpList->numOfIps;
  tscMgmtIpSet.inUse = pIpList->inUse;
  for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
J
jtao1735 已提交
73
    tscMgmtIpSet.port[i] = htons(pIpList->port[i]);
S
slguan 已提交
74 75 76
  }
}

S
slguan 已提交
77
void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) {
S
slguan 已提交
78
  tscMgmtIpSet = *pIpSet;
79
  tscDebug("mnode IP list is changed for ufp is called, numOfIps:%d inUse:%d", tscMgmtIpSet.numOfIps, tscMgmtIpSet.inUse);
S
slguan 已提交
80
  for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
81
    tscDebug("index:%d fqdn:%s port:%d", i, tscMgmtIpSet.fqdn[i], tscMgmtIpSet.port[i]);
S
slguan 已提交
82
  }
S
slguan 已提交
83 84
}

H
hjxilinx 已提交
85 86 87 88 89 90 91
/*
 * For each management node, try twice at least in case of poor network situation.
 * If the client start to connect to a non-management node from the client, and the first retry may fail due to
 * the poor network quality. And then, the second retry get the response with redirection command.
 * The retry will not be executed since only *two* retry is allowed in case of single management node in the cluster.
 * Therefore, we need to multiply the retry times by factor of 2 to fix this problem.
 */
92
UNUSED_FUNC
H
hjxilinx 已提交
93 94
static int32_t tscGetMgmtConnMaxRetryTimes() {
  int32_t factor = 2;
S
slguan 已提交
95
  return tscMgmtIpSet.numOfIps * factor;
H
hjxilinx 已提交
96 97
}

H
hzcheng 已提交
98 99 100 101 102 103 104 105 106 107 108 109
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
  if (pObj != pObj->signature) {
    tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
    return;
  }

  SSqlObj *pSql = pObj->pHb;
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
110
    SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
S
slguan 已提交
111
    SRpcIpSet *      pIpList = &pRsp->ipList;
112 113
    if (pIpList->numOfIps > 0) 
      tscSetMgmtIpList(pIpList);
S
slguan 已提交
114

S
Shengliang Guan 已提交
115 116
    pSql->pTscObj->connId = htonl(pRsp->connId);

H
hzcheng 已提交
117 118 119
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
    } else {
S
slguan 已提交
120 121
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
122 123
    }
  } else {
124
    tscDebug("heart beat failed, code:%s", tstrerror(code));
H
hzcheng 已提交
125 126 127 128 129 130 131 132 133 134 135 136 137
  }

  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;

  if (pObj == NULL) return;
  if (pObj->signature != pObj) return;
  if (pObj->pTimer != tmrId) return;

  if (pObj->pHb == NULL) {
S
slguan 已提交
138 139 140
    SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
    if (NULL == pSql) return;

H
hzcheng 已提交
141
    pSql->fp = tscProcessHeartBeatRsp;
H
hjxilinx 已提交
142
    
143 144 145 146
    SQueryInfo *pQueryInfo = NULL;
    tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
    pQueryInfo->command = TSDB_SQL_HB;
    
147
    pSql->cmd.command = TSDB_SQL_HB;
S
slguan 已提交
148 149 150 151 152
    if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
      tfree(pSql);
      return;
    }

153
    pSql->cmd.command = TSDB_SQL_HB;
H
hzcheng 已提交
154 155 156 157
    pSql->param = pObj;
    pSql->pTscObj = pObj;
    pSql->signature = pSql;
    pObj->pHb = pSql;
158
    tscAddSubqueryInfo(&pObj->pHb->cmd);
159

160
    tscDebug("%p pHb is allocated, pObj:%p", pObj->pHb, pObj);
H
hzcheng 已提交
161 162 163
  }

  if (tscShouldFreeHeatBeat(pObj->pHb)) {
164
    tscDebug("%p free HB object and release connection", pObj);
H
hzcheng 已提交
165 166 167 168 169 170 171 172 173
    tscFreeSqlObj(pObj->pHb);
    tscCloseTscObj(pObj);
    return;
  }

  tscProcessSql(pObj->pHb);
}

int tscSendMsgToServer(SSqlObj *pSql) {
174
  STscObj* pObj = pSql->pTscObj;
H
hjxilinx 已提交
175 176 177
  SSqlCmd* pCmd = &pSql->cmd;
  
  char *pMsg = rpcMallocCont(pCmd->payloadLen);
S
slguan 已提交
178
  if (NULL == pMsg) {
179
    tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]);
180
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
181 182
  }

183 184
  // set the mgmt ip list
  if (pSql->cmd.command >= TSDB_SQL_MGMT) {
J
jtao1735 已提交
185 186 187
    pSql->ipList = tscMgmtIpSet;
  }

188
  memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
189

J
jtao1735 已提交
190
  SRpcMsg rpcMsg = {
191 192 193 194
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
      .contLen = pSql->cmd.payloadLen,
      .handle  = pSql,
H
hjxilinx 已提交
195
      .code    = 0
J
jtao1735 已提交
196
  };
H
hzcheng 已提交
197

H
Haojun Liao 已提交
198 199 200 201
  // NOTE: the rpc context should be acquired before sending data to server.
  // Otherwise, the pSql object may have been released already during the response function, which is
  // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
  // cause crash.
H
Haojun Liao 已提交
202
  /*pSql->pRpcCtx = */rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg);
S
slguan 已提交
203
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
204 205
}

J
jtao1735 已提交
206
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
207
  SSqlObj *pSql = (SSqlObj *)rpcMsg->handle;
H
Haojun Liao 已提交
208
  if (pSql == NULL || pSql->signature != pSql) {
B
Bomin Zhang 已提交
209
    tscError("%p sql is already released", pSql);
210 211
    return;
  }
212

H
Haojun Liao 已提交
213
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
214 215
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
216

H
Haojun Liao 已提交
217
  if (pObj->signature != pObj) {
218
    tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
H
Haojun Liao 已提交
219 220 221 222 223 224 225 226

    tscFreeSqlObj(pSql);
    rpcFreeCont(rpcMsg->pCont);
    return;
  }

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
227
    tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
228
        pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
H
Haojun Liao 已提交
229

H
Haojun Liao 已提交
230
    tscFreeSqlObj(pSql);
231
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
232
    return;
H
hzcheng 已提交
233 234
  }

J
jtao1735 已提交
235 236 237 238 239 240
  if (pCmd->command < TSDB_SQL_MGMT) {
    if (pIpSet) pSql->ipList = *pIpSet;
  } else {
    if (pIpSet) tscMgmtIpSet = *pIpSet;
  }

241
  if (rpcMsg->pCont == NULL) {
242
    rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
S
slguan 已提交
243
  } else {
244
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
245 246
    if (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID || 
        rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
S
slguan 已提交
247
      if (pCmd->command == TSDB_SQL_CONNECT) {
248
        rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
249
        rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
250 251
        return;
      } else if (pCmd->command == TSDB_SQL_HB) {
252
        rpcMsg->code = TSDB_CODE_RPC_NOT_READY;
253
        rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
254
        return;
H
Haojun Liao 已提交
255
      } else if (pCmd->command == TSDB_SQL_META) {
H
Haojun Liao 已提交
256
        // get table meta query will not retry, do nothing
S
slguan 已提交
257
      } else {
258
        tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);
259
        
260
        pSql->res.code = rpcMsg->code;  // keep the previous error code
261 262 263
        if (pSql->retry > pSql->maxRetry) {
          tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
        } else {
H
Haojun Liao 已提交
264 265 266 267 268 269 270
          rpcMsg->code = tscRenewTableMeta(pSql, pTableMetaInfo->name);

          // if there is an error occurring, proceed to the following error handling procedure.
          // todo add test cases
          if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
            rpcFreeCont(rpcMsg->pCont);
            return;
271
          }
S
slguan 已提交
272
        }
H
hzcheng 已提交
273 274
      }
    }
S
slguan 已提交
275
  }
276
  
H
hzcheng 已提交
277
  pRes->rspLen = 0;
278
  
279 280
  if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
    pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hzcheng 已提交
281
  } else {
282
    tscDebug("%p query is cancelled, code:%s", pSql, tstrerror(pRes->code));
H
hzcheng 已提交
283 284
  }

S
slguan 已提交
285
  if (pRes->code == TSDB_CODE_SUCCESS) {
286
    tscDebug("%p reset retry counter to be 0 due to success rsp, old:%d", pSql, pSql->retry);
S
slguan 已提交
287 288 289
    pSql->retry = 0;
  }

290
  if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
291
    assert(rpcMsg->msgType == pCmd->msgType + 1);
292
    pRes->code    = rpcMsg->code;
293
    pRes->rspType = rpcMsg->msgType;
294
    pRes->rspLen  = rpcMsg->contLen;
H
hzcheng 已提交
295

296
    if (pRes->rspLen > 0 && rpcMsg->pCont) {
297 298
      char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
      if (tmp == NULL) {
299
        pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
300 301
      } else {
        pRes->pRsp = tmp;
302
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
303
      }
304 305
    } else {
      pRes->pRsp = NULL;
S
slguan 已提交
306 307
    }

H
hzcheng 已提交
308 309 310 311
    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
312
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
313
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
314 315 316 317 318 319 320
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
321
      tscDebug("%p SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql, sqlCmd[pCmd->command], 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
322
          tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
S
slguan 已提交
323
    } else {
324
      tscDebug("%p SQL cmd:%s, code:%s rspLen:%d", pSql, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
H
hzcheng 已提交
325 326
    }
  }
327
  
H
Haojun Liao 已提交
328
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
329
    rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
Haojun Liao 已提交
330
  }
S
Shengliang Guan 已提交
331

332
  if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
H
Haojun Liao 已提交
333
    rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? pRes->numOfRows: pRes->code;
334
    
H
hjxilinx 已提交
335
    bool shouldFree = tscShouldBeFreed(pSql);
H
Haojun Liao 已提交
336
    (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
H
hzcheng 已提交
337

338
    if (shouldFree) {
339
      tscDebug("%p sqlObj is automatically freed", pSql);
sangshuduo's avatar
sangshuduo 已提交
340
      tscFreeSqlObj(pSql);
H
hzcheng 已提交
341 342 343
    }
  }

344
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
345 346
}

S
slguan 已提交
347 348 349
int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
350

H
hjxilinx 已提交
351 352 353 354 355 356 357
  if (pCmd->command == TSDB_SQL_SELECT ||
      pCmd->command == TSDB_SQL_FETCH ||
      pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT ||
      pCmd->command == TSDB_SQL_CONNECT ||
      pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META ||
H
hjxilinx 已提交
358
      pCmd->command == TSDB_SQL_STABLEVGROUP) {
359 360 361 362 363 364
    pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
  }
  
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return pRes->code;
S
slguan 已提交
365
  }
366

367 368 369
  int32_t code = tscSendMsgToServer(pSql);

  // NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads.
370
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
371
    pRes->code = code;
H
hjxilinx 已提交
372
    tscQueueAsyncRes(pSql);
373
    return pRes->code;
S
slguan 已提交
374
  }
H
hjxilinx 已提交
375 376
  
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
377 378 379
}

int tscProcessSql(SSqlObj *pSql) {
380 381
  char *   name = NULL;
  SSqlCmd *pCmd = &pSql->cmd;
382 383
  
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
384
  STableMetaInfo *pTableMetaInfo = NULL;
385
  uint32_t        type = 0;
386

387
  if (pQueryInfo != NULL) {
H
hjxilinx 已提交
388
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
389 390
    if (pTableMetaInfo != NULL) {
      name = pTableMetaInfo->name;
391
    }
392

393
    type = pQueryInfo->type;
394
  
H
hjxilinx 已提交
395
    // while numOfTables equals to 0, it must be Heartbeat
H
hjxilinx 已提交
396
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
397
  }
398

399
  tscDebug("%p SQL cmd:%s will be processed, name:%s, type:%d", pSql, sqlCmd[pCmd->command], name, type);
H
hjxilinx 已提交
400
  if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL
H
hjxilinx 已提交
401
    if (pTableMetaInfo == NULL) {
402
      pSql->res.code = TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
403 404
      return pSql->res.code;
    }
H
hjxilinx 已提交
405 406
  } else if (pCmd->command < TSDB_SQL_LOCAL) {
    pSql->ipList = tscMgmtIpSet; //?
H
hzcheng 已提交
407 408 409
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }
410
  
S
slguan 已提交
411 412
  return doProcessSql(pSql);
}
H
hzcheng 已提交
413

H
hjxilinx 已提交
414
void tscKillSTableQuery(SSqlObj *pSql) {
415 416 417
  SSqlCmd* pCmd = &pSql->cmd;
  
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
418
  if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
H
hzcheng 已提交
419 420 421 422 423
    return;
  }

  for (int i = 0; i < pSql->numOfSubs; ++i) {
    SSqlObj *pSub = pSql->pSubs[i];
S
slguan 已提交
424
    if (pSub == NULL) {
H
hzcheng 已提交
425 426
      continue;
    }
S
slguan 已提交
427

H
hzcheng 已提交
428 429
    /*
     * here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
H
Haojun Liao 已提交
430
     * sub-queries not correctly released and master sql object of super table query reaches an abnormal state.
H
hzcheng 已提交
431
     */
432
    pSql->pSubs[i]->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
H
Haojun Liao 已提交
433
    rpcCancelRequest(pSql->pSubs[i]->pRpcCtx);
H
hzcheng 已提交
434 435 436 437 438
  }

  /*
   * 1. if the subqueries are not launched or partially launched, we need to waiting the launched
   * query return to successfully free allocated resources.
439
   * 2. if no any subqueries are launched yet, which means the super table query only in parse sql stage,
H
hzcheng 已提交
440 441 442 443 444
   * set the res.code, and return.
   */
  const int64_t MAX_WAITING_TIME = 10000;  // 10 Sec.
  int64_t       stime = taosGetTimestampMs();

H
hjxilinx 已提交
445
  while (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
H
hzcheng 已提交
446 447 448 449 450 451
    taosMsleep(100);
    if (taosGetTimestampMs() - stime > MAX_WAITING_TIME) {
      break;
    }
  }

452
  tscDebug("%p super table query cancelled", pSql);
H
hzcheng 已提交
453 454
}

J
jtao1735 已提交
455
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
456
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
S
slguan 已提交
457
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
S
slguan 已提交
458

459
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
S
slguan 已提交
460
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
461

462
  // todo valid the vgroupId at the client side
H
hjxilinx 已提交
463 464
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  
weixin_48148422's avatar
weixin_48148422 已提交
465
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
466
    int32_t vgIndex = pTableMetaInfo->vgroupIndex;
467
    
H
hjxilinx 已提交
468 469
    SVgroupsInfo* pVgroupInfo = pTableMetaInfo->vgroupList;
    pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
470
  } else {
H
hjxilinx 已提交
471
    STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
472
    pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
473
  }
474 475

  pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg);
J
jtao1735 已提交
476
  pSql->cmd.msgType = TSDB_MSG_TYPE_FETCH;
477 478 479

  pRetrieveMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));

480
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
481 482
}

483
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hjxilinx 已提交
484
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
485
  STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
H
hjxilinx 已提交
486
  
487
  char* pMsg = pSql->cmd.payload;
488 489 490
  
  // NOTE: shell message size should not include SMsgDesc
  int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
491 492
  int32_t vgId = pTableMeta->vgroupInfo.vgId;

493
  SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
494 495
  pMsgDesc->numOfVnodes = htonl(1); // always one vnode

496
  pMsg += sizeof(SMsgDesc);
497
  SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
498

H
hjxilinx 已提交
499
  pShellMsg->header.vgId = htonl(vgId);
H
Haojun Liao 已提交
500
  pShellMsg->header.contLen = htonl(size);      // the length not includes the size of SMsgDesc
501
  pShellMsg->length = pShellMsg->header.contLen;
H
hjxilinx 已提交
502
  
H
Haojun Liao 已提交
503
  pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit);  // number of tables to be inserted
H
hzcheng 已提交
504

H
hjxilinx 已提交
505
  // pSql->cmd.payloadLen is set during copying data into payload
S
slguan 已提交
506
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
507
  tscSetDnodeIpList(pSql, &pTableMeta->vgroupInfo);
508
  
509
  tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfIP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
510
      pSql->ipList.numOfIps);
511
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
512 513 514
}

/*
515
 * for table query, simply return the size <= 1k
H
hzcheng 已提交
516
 */
517
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
518
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
519
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
520

521
  int32_t srcColListSize = taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo);
H
hjxilinx 已提交
522 523 524 525
  
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
  int32_t exprSize = sizeof(SSqlFuncMsg) * numOfExprs;
  
526
  return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + 4096;
H
hzcheng 已提交
527 528
}

529
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
530
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
B
Bomin Zhang 已提交
531
  TSKEY dfltKey = htobe64(pQueryMsg->window.skey);
532

H
hjxilinx 已提交
533
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
weixin_48148422's avatar
weixin_48148422 已提交
534
  if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
535 536
    
    SCMVgroupInfo* pVgroupInfo = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
537
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
538 539
      int32_t index = pTableMetaInfo->vgroupIndex;
      assert(index >= 0);
H
hjxilinx 已提交
540
  
541
      pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
542
      tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups);
H
hjxilinx 已提交
543 544
    } else {
      pVgroupInfo = &pTableMeta->vgroupInfo;
545
    }
weixin_48148422's avatar
weixin_48148422 已提交
546

547 548
    tscSetDnodeIpList(pSql, pVgroupInfo);
    pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
weixin_48148422's avatar
weixin_48148422 已提交
549

550 551 552
    STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
    pTableIdInfo->tid = htonl(pTableMeta->sid);
    pTableIdInfo->uid = htobe64(pTableMeta->uid);
B
Bomin Zhang 已提交
553
    pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid, dfltKey));
weixin_48148422's avatar
weixin_48148422 已提交
554

555 556
    pQueryMsg->numOfTables = htonl(1);  // set the number of tables
    pMsg += sizeof(STableIdInfo);
H
Haojun Liao 已提交
557
  } else { // it is a subquery of the super table query, this IP info is acquired from vgroupInfo
558 559 560
    int32_t index = pTableMetaInfo->vgroupIndex;
    int32_t numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables);
    assert(index >= 0 && index < numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
561

562
    tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
563

564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579
    SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
    
    // set the vgroup info
    tscSetDnodeIpList(pSql, &pTableIdList->vgInfo);
    pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
    
    int32_t numOfTables = taosArrayGetSize(pTableIdList->itemList);
    pQueryMsg->numOfTables = htonl(numOfTables);  // set the number of tables
  
    // serialize each table id info
    for(int32_t i = 0; i < numOfTables; ++i) {
      STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i);
      
      STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
      pTableIdInfo->tid = htonl(pItem->tid);
      pTableIdInfo->uid = htobe64(pItem->uid);
B
Bomin Zhang 已提交
580
      pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pItem->uid, dfltKey));
581 582 583 584
      pMsg += sizeof(STableIdInfo);
    }
  }
  
585
  tscDebug("%p vgId:%d, query on table:%s, tid:%d, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name,
H
Haojun Liao 已提交
586
      pTableMeta->sid, pTableMeta->uid);
H
hjxilinx 已提交
587
  
588 589 590
  return pMsg;
}

591
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
592 593
  SSqlCmd *pCmd = &pSql->cmd;

594
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
595

S
slguan 已提交
596 597
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
H
Haojun Liao 已提交
598
    return -1;  // todo add test for this
S
slguan 已提交
599
  }
600
  
601
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
602
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
603
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
604
  
H
hjxilinx 已提交
605
  if (taosArrayGetSize(pQueryInfo->colList) <= 0 && !tscQueryTags(pQueryInfo)) {
606 607 608
    tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
    return -1;
  }
609 610 611 612 613 614 615 616 617 618
  
  if (pQueryInfo->intervalTime < 0) {
    tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
    return -1;
  }
  
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
    return -1;
  }
619

620
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
H
hzcheng 已提交
621

622
  int32_t numOfTags = taosArrayGetSize(pTableMetaInfo->tagColList);
623
  
624
  if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
H
hjxilinx 已提交
625 626
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey);
H
hzcheng 已提交
627
  } else {
H
hjxilinx 已提交
628 629
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.ekey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey);
H
hzcheng 已提交
630 631
  }

632 633
  pQueryMsg->order          = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId     = htons(pQueryInfo->order.orderColId);
H
Haojun Liao 已提交
634
  pQueryMsg->fillType       = htons(pQueryInfo->fillType);
635 636
  pQueryMsg->limit          = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset         = htobe64(pQueryInfo->limit.offset);
637
  pQueryMsg->numOfCols      = htons(taosArrayGetSize(pQueryInfo->colList));
638 639
  pQueryMsg->intervalTime   = htobe64(pQueryInfo->intervalTime);
  pQueryMsg->slidingTime    = htobe64(pQueryInfo->slidingTime);
H
hjxilinx 已提交
640
  pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
641
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
642
  pQueryMsg->numOfTags      = htonl(numOfTags);
weixin_48148422's avatar
weixin_48148422 已提交
643
  pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
H
Haojun Liao 已提交
644
  pQueryMsg->queryType      = htonl(pQueryInfo->type);
H
hjxilinx 已提交
645 646 647
  
  size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
  pQueryMsg->numOfOutput = htons(numOfOutput);
H
hzcheng 已提交
648 649

  // set column list ids
650 651
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
H
hjxilinx 已提交
652
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
653
  
654 655 656
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
    SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
657

658
    if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
659 660
        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
      tscError("%p sid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
B
Bomin Zhang 已提交
661
          pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
662 663
               pColSchema->name);

664
      return TSDB_CODE_TSC_INVALID_SQL;
665
    }
H
hzcheng 已提交
666 667 668

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
669
    pQueryMsg->colList[i].type  = htons(pColSchema->type);
S
slguan 已提交
670
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
671

S
slguan 已提交
672 673 674
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
675

S
slguan 已提交
676
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
677
      pFilterMsg->filterstr = htons(pColFilter->filterstr);
S
slguan 已提交
678 679

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
680

681
      if (pColFilter->filterstr) {
S
slguan 已提交
682 683 684 685 686 687 688 689 690 691
        pFilterMsg->len = htobe64(pColFilter->len);
        memcpy(pMsg, (void *)pColFilter->pz, pColFilter->len + 1);
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
692

S
slguan 已提交
693 694 695 696 697
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
        return -1;
      }
    }
H
hzcheng 已提交
698 699
  }

H
hjxilinx 已提交
700
  SSqlFuncMsg *pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hjxilinx 已提交
701
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
702
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
703

H
hjxilinx 已提交
704
    if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId)) {
705
      /* column id is not valid according to the cached table meta, the table meta is expired */
H
hzcheng 已提交
706 707 708 709
      tscError("%p table schema is not matched with parsed sql", pSql);
      return -1;
    }

710 711 712
    pSqlFuncExpr->colInfo.colId    = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
    pSqlFuncExpr->colInfo.flag     = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
713

714
    pSqlFuncExpr->functionId  = htons(pExpr->functionId);
H
hzcheng 已提交
715
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
H
hjxilinx 已提交
716
    pMsg += sizeof(SSqlFuncMsg);
H
hzcheng 已提交
717 718

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
H
Haojun Liao 已提交
719
      // todo add log
H
hzcheng 已提交
720 721 722 723 724
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
725
        pMsg += pExpr->param[j].nLen;
H
hzcheng 已提交
726 727 728 729 730
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

H
hjxilinx 已提交
731
    pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hzcheng 已提交
732
  }
733
  
734
  // serialize the table info (sid, uid, tags)
735 736
  pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
  
737
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
738
  if (pGroupbyExpr->numOfGroupCols > 0) {
S
slguan 已提交
739
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
740 741
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
742
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
743 744
      SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
  
S
slguan 已提交
745 746 747
      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

748 749
      *((int16_t *)pMsg) += pCol->colIndex;
      pMsg += sizeof(pCol->colIndex);
S
slguan 已提交
750 751 752

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
753 754 755
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
756 757 758
    }
  }

759
  if (pQueryInfo->fillType != TSDB_FILL_NONE) {
H
hjxilinx 已提交
760
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
761 762
      *((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]);
      pMsg += sizeof(pQueryInfo->fillVal[0]);
S
slguan 已提交
763 764
    }
  }
765 766 767 768 769 770 771 772 773
  
  if (numOfTags != 0) {
    int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
    int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
    int32_t total = numOfTagColumns + numOfColumns;
    
    pSchema = tscGetTableTagSchema(pTableMeta);
    
    for (int32_t i = 0; i < numOfTags; ++i) {
H
hjxilinx 已提交
774
      SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
775 776 777 778 779 780
      SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];

      if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
          (pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
        tscError("%p sid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
                 pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, total, numOfTagColumns,
B
Bomin Zhang 已提交
781
                 pCol->colIndex.columnIndex, pColSchema->name);
782

783
        return TSDB_CODE_TSC_INVALID_SQL;
784 785 786 787 788 789 790 791 792 793 794 795
      }
  
      SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
  
      pTagCol->colId = htons(pColSchema->colId);
      pTagCol->bytes = htons(pColSchema->bytes);
      pTagCol->type  = htons(pColSchema->type);
      pTagCol->numOfFilters = 0;
      
      pMsg += sizeof(SColumnInfo);
    }
  }
S
slguan 已提交
796

H
Haojun Liao 已提交
797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817
  // serialize tag column query condition
  if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
    STagCond* pTagCond = &pQueryInfo->tagCond;
    
    SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->uid);
    if (pCond != NULL && pCond->cond != NULL) {
      pQueryMsg->tagCondLen = htons(pCond->len);
      memcpy(pMsg, pCond->cond, pCond->len);
      
      pMsg += pCond->len;
    }
  }
  
  if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
    *pMsg = 0;
    pMsg++;
  } else {
    strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
    pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
  }

S
slguan 已提交
818
  // compressed ts block
819
  pQueryMsg->tsOffset = htonl(pMsg - pCmd->payload);
S
slguan 已提交
820 821 822
  int32_t tsLen = 0;
  int32_t numOfBlocks = 0;

823
  if (pQueryInfo->tsBuf != NULL) {
H
hjxilinx 已提交
824
    STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vgroupIndex);
825
    assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL);  // this query should not be sent
S
slguan 已提交
826 827

    // todo refactor
B
Bomin Zhang 已提交
828 829 830 831 832
    if (fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET) != 0) {
      int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
      tscError("%p: fseek failed: %s", pSql, tstrerror(code));
      return code;
    }
H
Haojun Liao 已提交
833 834 835

    size_t s = fread(pMsg, 1, pBlockInfo->compLen, pQueryInfo->tsBuf->f);
    if (s != pBlockInfo->compLen) {
B
Bomin Zhang 已提交
836 837 838 839
      int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
      tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
      return code;
    }
S
slguan 已提交
840 841 842 843

    pMsg += pBlockInfo->compLen;
    tsLen = pBlockInfo->compLen;
    numOfBlocks = pBlockInfo->numOfBlocks;
H
hzcheng 已提交
844 845
  }

S
slguan 已提交
846 847
  pQueryMsg->tsLen = htonl(tsLen);
  pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
848 849
  if (pQueryInfo->tsBuf != NULL) {
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
H
hzcheng 已提交
850 851
  }

852
  int32_t msgLen = pMsg - pCmd->payload;
H
hzcheng 已提交
853

854
  tscDebug("%p msg built success,len:%d bytes", pSql, msgLen);
H
hzcheng 已提交
855
  pCmd->payloadLen = msgLen;
S
slguan 已提交
856
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hjxilinx 已提交
857
  
858
  pQueryMsg->head.contLen = htonl(msgLen);
H
hzcheng 已提交
859
  assert(msgLen + minMsgSize() <= size);
860 861

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
862 863
}

864 865
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
866
  pCmd->payloadLen = sizeof(SCMCreateDbMsg);
S
slguan 已提交
867
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
H
hzcheng 已提交
868

869
  SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload;
870

871
  assert(pCmd->numOfClause == 1);
872
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
873
  tstrncpy(pCreateDbMsg->db, pTableMetaInfo->name, sizeof(pCreateDbMsg->db));
H
hzcheng 已提交
874

875
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
876 877
}

878 879
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
880
  pCmd->payloadLen = sizeof(SCMCreateDnodeMsg);
S
slguan 已提交
881 882
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
883
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
884
  }
H
hzcheng 已提交
885

886
  SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload;
J
jtao1735 已提交
887 888
  strncpy(pCreate->ep, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
  
S
slguan 已提交
889
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
H
hzcheng 已提交
890

891
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
892 893
}

894 895
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
896
  pCmd->payloadLen = sizeof(SCMCreateAcctMsg);
S
slguan 已提交
897 898
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
899
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
900
  }
H
hzcheng 已提交
901

902
  SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
903

904 905
  SSQLToken *pName = &pInfo->pDCLInfo->user.user;
  SSQLToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
906

907 908
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
909

910
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
911

912 913 914 915 916 917 918 919
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
920

921 922 923 924 925 926 927 928 929 930 931 932 933
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
934

S
slguan 已提交
935
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
936
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
937 938
}

939 940
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
941
  pCmd->payloadLen = sizeof(SCMCreateUserMsg);
S
slguan 已提交
942

S
slguan 已提交
943 944
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
945
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
946 947
  }

948
  SCMCreateUserMsg *pAlterMsg = (SCMCreateUserMsg*)pCmd->payload;
H
hzcheng 已提交
949

950 951 952
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
  pAlterMsg->flag = pUser->type;
H
hzcheng 已提交
953

954 955 956 957
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
958 959
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
960
  }
H
hzcheng 已提交
961

962
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
S
slguan 已提交
963
    pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
964
  } else {
S
slguan 已提交
965
    pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
966
  }
H
hzcheng 已提交
967

968
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
969 970
}

971 972
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
973
  pCmd->payloadLen = sizeof(SCMCfgDnodeMsg);
guanshengliang's avatar
guanshengliang 已提交
974
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONFIG_DNODE;
975 976
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
977

978 979
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
980
  pCmd->payloadLen = sizeof(SCMDropDbMsg);
H
hzcheng 已提交
981

S
slguan 已提交
982 983
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
984
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
985 986
  }

987
  SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
988

989
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
990
  tstrncpy(pDropDbMsg->db, pTableMetaInfo->name, sizeof(pDropDbMsg->db));
991
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
992

S
slguan 已提交
993
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
994
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
995 996
}

997 998
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
999
  pCmd->payloadLen = sizeof(SCMDropTableMsg);
H
hzcheng 已提交
1000

S
slguan 已提交
1001 1002
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1003
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1004
  }
H
hzcheng 已提交
1005

1006
  SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
1007
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1008
  strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
S
slguan 已提交
1009
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1010

S
slguan 已提交
1011
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
1012
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1013 1014
}

1015
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1016
  SSqlCmd *pCmd = &pSql->cmd;
1017
  pCmd->payloadLen = sizeof(SCMDropDnodeMsg);
S
slguan 已提交
1018 1019
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1020
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1021
  }
H
hzcheng 已提交
1022

1023
  SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
1024
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1025
  tstrncpy(pDrop->ep, pTableMetaInfo->name, sizeof(pDrop->ep));
S
slguan 已提交
1026
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
H
hzcheng 已提交
1027

1028
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1029 1030
}

S
[TD-16]  
slguan 已提交
1031
int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1032
  SSqlCmd *pCmd = &pSql->cmd;
1033
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
S
slguan 已提交
1034
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
H
hzcheng 已提交
1035

S
slguan 已提交
1036 1037
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1038
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1039
  }
H
hzcheng 已提交
1040

1041
  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
1042
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1043
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
H
hzcheng 已提交
1044

1045
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1046 1047
}

S
[TD-16]  
slguan 已提交
1048 1049 1050 1051 1052 1053 1054
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_ACCT;

  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1055
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
[TD-16]  
slguan 已提交
1056 1057 1058 1059
  }

  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1060
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
S
[TD-16]  
slguan 已提交
1061 1062 1063 1064

  return TSDB_CODE_SUCCESS;
}

1065 1066
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1067
  pCmd->payloadLen = sizeof(SCMUseDbMsg);
H
hzcheng 已提交
1068

S
slguan 已提交
1069 1070
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1071
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1072
  }
1073

1074
  SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload;
1075
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1076
  strcpy(pUseDbMsg->db, pTableMetaInfo->name);
S
slguan 已提交
1077
  pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
H
hzcheng 已提交
1078

1079
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1080 1081
}

1082
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1083
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1084
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1085
  pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
1086
  pCmd->payloadLen = sizeof(SCMShowMsg) + 100;
H
hzcheng 已提交
1087

S
slguan 已提交
1088 1089
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1090
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1091
  }
H
hzcheng 已提交
1092

1093
  SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload;
S
slguan 已提交
1094

1095
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1096
  size_t nameLen = strlen(pTableMetaInfo->name);
S
slguan 已提交
1097
  if (nameLen > 0) {
B
Bomin Zhang 已提交
1098
    tstrncpy(pShowMsg->db, pTableMetaInfo->name, sizeof(pShowMsg->db));  // prefix is set here
H
hzcheng 已提交
1099
  } else {
B
Bomin Zhang 已提交
1100
    tstrncpy(pShowMsg->db, pObj->db, sizeof(pShowMsg->db));
H
hzcheng 已提交
1101 1102
  }

1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
    SSQLToken *pPattern = &pShowInfo->pattern;
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
    SSQLToken *pIpAddr = &pShowInfo->prefix;
    assert(pIpAddr->n > 0 && pIpAddr->type > 0);
H
hzcheng 已提交
1115

1116 1117 1118 1119
    strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n);
    pShowMsg->payloadLen = htons(pIpAddr->n);
  }

1120
  pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
1121
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1122 1123
}

1124
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1125
  SSqlCmd *pCmd = &pSql->cmd;
1126
  pCmd->payloadLen = sizeof(SCMKillQueryMsg);
H
hzcheng 已提交
1127

1128 1129
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
S
slguan 已提交
1130
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
1131 1132
      break;
    case TSDB_SQL_KILL_CONNECTION:
S
slguan 已提交
1133
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
1134 1135
      break;
    case TSDB_SQL_KILL_STREAM:
S
slguan 已提交
1136
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
1137 1138 1139
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1140 1141
}

1142
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1143 1144
  SSqlCmd *pCmd = &(pSql->cmd);

guanshengliang's avatar
guanshengliang 已提交
1145
  int32_t size = minMsgSize() + sizeof(SCMCreateTableMsg);
H
hzcheng 已提交
1146

1147
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1148
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
1149 1150
    size += sizeof(STagData);
  } else {
S
slguan 已提交
1151
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1152
  }
1153

1154 1155 1156
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1157 1158 1159 1160

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1161
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1162
  int              msgLen = 0;
S
slguan 已提交
1163
  SSchema *        pSchema;
H
hzcheng 已提交
1164
  int              size = 0;
1165 1166 1167
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1168
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1169 1170

  // Reallocate the payload size
1171
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1172 1173
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1174
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1175
  }
H
hzcheng 已提交
1176 1177


1178
  SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
H
hjxilinx 已提交
1179
  strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
1180 1181

  // use dbinfo from table id without modifying current db info
H
Haojun Liao 已提交
1182
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pCreateTableMsg->db);
H
hzcheng 已提交
1183

1184 1185 1186
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;
H
hzcheng 已提交
1187 1188 1189 1190
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
S
slguan 已提交
1191
  char *pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
1192

1193 1194
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
1195 1196 1197 1198 1199 1200 1201
    STagData* pTag = &pInfo->pCreateTableInfo->usingInfo.tagdata;
    *(int32_t*)pMsg = htonl(pTag->dataLen);
    pMsg += sizeof(int32_t);
    memcpy(pMsg, pTag->name, sizeof(pTag->name));
    pMsg += sizeof(pTag->name);
    memcpy(pMsg, pTag->data, pTag->dataLen);
    pMsg += pTag->dataLen;
1202
  } else {  // create (super) table
1203
    pSchema = (SSchema *)pCreateTableMsg->schema;
1204

H
hzcheng 已提交
1205
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
H
hjxilinx 已提交
1206
      TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hzcheng 已提交
1207 1208 1209 1210

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
1211

H
hzcheng 已提交
1212 1213 1214 1215
      pSchema++;
    }

    pMsg = (char *)pSchema;
1216 1217
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
1218

1219 1220 1221
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
1222 1223 1224
    }
  }

H
hjxilinx 已提交
1225
  tscFieldInfoClear(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
1226

S
slguan 已提交
1227
  msgLen = pMsg - (char*)pCreateTableMsg;
S
slguan 已提交
1228
  pCreateTableMsg->contLen = htonl(msgLen);
H
hzcheng 已提交
1229
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1230
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
H
hzcheng 已提交
1231 1232

  assert(msgLen + minMsgSize() <= size);
1233
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1234 1235 1236
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
1237
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
guanshengliang's avatar
guanshengliang 已提交
1238
  return minMsgSize() + sizeof(SCMAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
1239 1240 1241
         TSDB_EXTRA_PAYLOAD_SIZE;
}

1242
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1243 1244
  char *pMsg;
  int   msgLen = 0;
H
hzcheng 已提交
1245

1246
  SSqlCmd    *pCmd = &pSql->cmd;
1247 1248
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
1249
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1250 1251 1252
  
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;
  int size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
1253 1254 1255 1256
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
    return -1;
  }
1257 1258
  
  SCMAlterTableMsg *pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
H
Haojun Liao 已提交
1259
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
1260

H
hjxilinx 已提交
1261
  strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
1262
  pAlterTableMsg->type = htons(pAlterInfo->type);
1263

1264
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
S
slguan 已提交
1265
  SSchema *pSchema = pAlterTableMsg->schema;
1266
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
H
hjxilinx 已提交
1267
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
1268
  
H
hzcheng 已提交
1269 1270 1271 1272 1273 1274 1275
    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;
1276 1277 1278
  pAlterTableMsg->tagValLen = htonl(pAlterInfo->tagData.dataLen);
  memcpy(pMsg, pAlterInfo->tagData.data, pAlterInfo->tagData.dataLen);
  pMsg += pAlterInfo->tagData.dataLen;
H
hzcheng 已提交
1279

S
slguan 已提交
1280
  msgLen = pMsg - (char*)pAlterTableMsg;
1281

H
hzcheng 已提交
1282
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1283
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
H
hzcheng 已提交
1284 1285

  assert(msgLen + minMsgSize() <= size);
1286

1287
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1288 1289
}

1290 1291 1292 1293
int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
  SSqlCmd* pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL;
  
1294
  SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
1295
  pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
1296

1297 1298
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1299

1300 1301
  tscSetDnodeIpList(pSql, &pTableMetaInfo->pTableMeta->vgroupInfo);

1302 1303 1304
  return TSDB_CODE_SUCCESS;
}

1305
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1306
  SSqlCmd *pCmd = &pSql->cmd;
1307
  pCmd->payloadLen = sizeof(SCMAlterDbMsg);
S
slguan 已提交
1308
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
H
hzcheng 已提交
1309

1310
  SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
1311
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1312
  tstrncpy(pAlterDbMsg->db, pTableMetaInfo->name, sizeof(pAlterDbMsg->db));
H
hzcheng 已提交
1313

1314
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1315 1316
}

1317
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1318
  SSqlCmd *pCmd = &pSql->cmd;
J
jtao1735 已提交
1319
  pCmd->msgType = TSDB_MSG_TYPE_CM_RETRIEVE;
S
slguan 已提交
1320
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
1321

S
slguan 已提交
1322 1323
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1324
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
1325
  }
S
slguan 已提交
1326

S
slguan 已提交
1327 1328 1329 1330
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
1331

1332
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1333 1334
}

1335
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
1336
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1337 1338 1339
    return pRes->code;
  }

H
hjxilinx 已提交
1340
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
H
hjxilinx 已提交
1341
    int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
H
hjxilinx 已提交
1342
    pRes->tsrow[i] = ((char*) pRes->data + offset * pRes->numOfRows);
H
hzcheng 已提交
1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
1357

1358
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1359

H
hzcheng 已提交
1360 1361 1362 1363 1364 1365 1366
  pRes->code = TSDB_CODE_SUCCESS;

  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

1367
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1368
  } else {
S
slguan 已提交
1369
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
1385
  SSqlCmd *       pCmd = &pSql->cmd;
1386
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
1387

H
hjxilinx 已提交
1388
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1389 1390
  
  int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
H
hzcheng 已提交
1391 1392 1393
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
Haojun Liao 已提交
1394 1395 1396
int tscProcessLocalRetrieveRsp(SSqlObj *pSql) {
  int32_t numOfRes = 1;
  pSql->res.completed = true;
H
hzcheng 已提交
1397 1398 1399
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
hjxilinx 已提交
1400
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
H
hzcheng 已提交
1401 1402 1403
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

1404
  pRes->code = tscDoLocalMerge(pSql);
H
hjxilinx 已提交
1405
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1406 1407

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
1408
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1409 1410 1411
  }

  pRes->row = 0;
1412
  pRes->completed = (pRes->numOfRows == 0);
H
hzcheng 已提交
1413

1414
  int32_t code = pRes->code;
H
hjxilinx 已提交
1415 1416 1417 1418
  if (pRes->code == TSDB_CODE_SUCCESS) {
    (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
  } else {
    tscQueueAsyncRes(pSql);
H
hzcheng 已提交
1419 1420 1421 1422 1423
  }

  return code;
}

S
slguan 已提交
1424
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
1425

1426
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1427
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1428
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1429
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
1430
  pCmd->payloadLen = sizeof(SCMConnectMsg);
H
hzcheng 已提交
1431

S
slguan 已提交
1432 1433
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1434
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1435 1436
  }

1437
  SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
H
hzcheng 已提交
1438 1439 1440 1441

  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
B
Bomin Zhang 已提交
1442 1443 1444
  tstrncpy(pConnect->db, db, sizeof(pConnect->db));
  tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion));
  tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion));
H
hzcheng 已提交
1445

1446
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1447 1448
}

H
hjxilinx 已提交
1449
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1450
  SCMTableInfoMsg *pInfoMsg;
S
slguan 已提交
1451
  char *         pMsg;
H
hzcheng 已提交
1452 1453
  int            msgLen = 0;

B
Bomin Zhang 已提交
1454 1455 1456 1457
  char *tmpData = NULL;
  uint32_t len = pSql->cmd.payloadLen;
  if (len > 0) {
    tmpData = calloc(1, len);
1458
    if (NULL == tmpData) {
1459
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
1460 1461
    }

H
hzcheng 已提交
1462
    // STagData is in binary format, strncpy is not available
B
Bomin Zhang 已提交
1463
    memcpy(tmpData, pSql->cmd.payload, len);
H
hzcheng 已提交
1464 1465
  }

1466 1467 1468
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
1469
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1470

1471
  pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
H
hjxilinx 已提交
1472
  strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
H
hjxilinx 已提交
1473
  pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
S
slguan 已提交
1474

1475
  pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
H
hzcheng 已提交
1476

B
Bomin Zhang 已提交
1477 1478 1479
  if (pSql->cmd.autoCreated && len > 0) {
    memcpy(pInfoMsg->tags, tmpData, len);
    pMsg += len;
H
hzcheng 已提交
1480 1481
  }

1482
  pCmd->payloadLen = pMsg - (char*)pInfoMsg;
S
slguan 已提交
1483
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
H
hzcheng 已提交
1484 1485 1486 1487

  tfree(tmpData);

  assert(msgLen + minMsgSize() <= pCmd->allocSize);
1488
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1489 1490
}

S
slguan 已提交
1491
/**
1492
 *  multi table meta req pkg format:
1493
 *  | SMgmtHead | SCMMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
1494 1495
 *      no used         4B
 **/
1496
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
guanshengliang's avatar
guanshengliang 已提交
1497
#if 0
S
slguan 已提交
1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
    tmpData = calloc(1, pCmd->payloadLen + 1);
    if (NULL == tmpData) return -1;
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
S
slguan 已提交
1510
  memset(pMgmt->db, 0, TSDB_TABLE_ID_LEN);  // server don't need the db
S
slguan 已提交
1511

1512
  SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
S
slguan 已提交
1513
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
1514 1515

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
1516
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
1517 1518 1519 1520
  }

  tfree(tmpData);

1521
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1522
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
S
slguan 已提交
1523 1524 1525

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

1526
  tscDebug("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
1527 1528 1529
           pCmd->payloadLen);

  return pCmd->payloadLen;
guanshengliang's avatar
guanshengliang 已提交
1530 1531
#endif
  return 0;  
S
slguan 已提交
1532 1533
}

H
hjxilinx 已提交
1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559
//static UNUSED_FUNC int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
////  const int32_t defaultSize =
////      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
////  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
////
////  int32_t n = 0;
////  size_t size = taosArrayGetSize(pQueryInfo->tagCond.pCond);
////  for (int32_t i = 0; i < size; ++i) {
////    assert(0);
//////    n += strlen(pQueryInfo->tagCond.cond[i].cond);
////  }
////
////  int32_t tagLen = n * TSDB_NCHAR_SIZE;
////  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
////    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
////  }
////
////  int32_t joinCondLen = (TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2;
////  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
////
////  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndex);
////
////  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
////
////  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
//}
H
hzcheng 已提交
1560

H
hjxilinx 已提交
1561 1562
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hjxilinx 已提交
1563 1564 1565 1566 1567 1568 1569 1570 1571 1572
  
  char* pMsg = pCmd->payload;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  
  SCMSTableVgroupMsg *pStableVgroupMsg = (SCMSTableVgroupMsg *) pMsg;
  pStableVgroupMsg->numOfTables = htonl(pQueryInfo->numOfTables);
  pMsg += sizeof(SCMSTableVgroupMsg);
  
  for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
1573 1574 1575
    size_t size = sizeof(pTableMetaInfo->name);
    tstrncpy(pMsg, pTableMetaInfo->name, size);
    pMsg += size;
H
hjxilinx 已提交
1576
  }
H
hjxilinx 已提交
1577 1578

  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
H
hjxilinx 已提交
1579
  pCmd->payloadLen = (pMsg - pCmd->payload);
H
hjxilinx 已提交
1580

1581
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1582 1583
}

1584 1585
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1586 1587
  STscObj *pObj = pSql->pTscObj;

1588
  pthread_mutex_lock(&pObj->mutex);
H
hzcheng 已提交
1589

S
Shengliang Guan 已提交
1590
  int32_t numOfQueries = 2;
H
hzcheng 已提交
1591 1592 1593
  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
    tpSql = tpSql->next;
1594
    numOfQueries++;
H
hzcheng 已提交
1595 1596
  }

S
Shengliang Guan 已提交
1597
  int32_t numOfStreams = 2;
H
hzcheng 已提交
1598 1599 1600
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
    pStream = pStream->next;
1601
    numOfStreams++;
H
hzcheng 已提交
1602 1603
  }

1604
  int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100;
S
slguan 已提交
1605
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
1606
    pthread_mutex_unlock(&pObj->mutex);
S
slguan 已提交
1607 1608 1609
    tscError("%p failed to malloc for heartbeat msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1610

1611
  SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
1612 1613
  pHeartbeat->numOfQueries = numOfQueries;
  pHeartbeat->numOfStreams = numOfStreams;
1614
  int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj);
H
hzcheng 已提交
1615 1616 1617 1618

  pthread_mutex_unlock(&pObj->mutex);

  pCmd->payloadLen = msgLen;
S
slguan 已提交
1619
  pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
H
hzcheng 已提交
1620

1621
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1622 1623
}

1624 1625
int tscProcessTableMetaRsp(SSqlObj *pSql) {
  STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
H
hzcheng 已提交
1626

1627 1628
  pMetaMsg->sid = htonl(pMetaMsg->sid);
  pMetaMsg->sversion = htons(pMetaMsg->sversion);
1629
  pMetaMsg->tversion = htons(pMetaMsg->tversion);
H
hjxilinx 已提交
1630 1631
  pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
  
1632 1633
  pMetaMsg->uid = htobe64(pMetaMsg->uid);
  pMetaMsg->contLen = htons(pMetaMsg->contLen);
1634
  pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1635

H
hjxilinx 已提交
1636 1637
  if (pMetaMsg->sid < 0 || pMetaMsg->vgroup.numOfIps < 0) {
    tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgroup.numOfIps, pMetaMsg->sid);
1638
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1639 1640
  }

B
Bomin Zhang 已提交
1641
  if (pMetaMsg->numOfTags > TSDB_MAX_TAGS) {
1642
    tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
1643
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1644 1645
  }

1646 1647
  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
1648
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1649 1650
  }

H
hjxilinx 已提交
1651 1652
  for (int i = 0; i < pMetaMsg->vgroup.numOfIps; ++i) {
    pMetaMsg->vgroup.ipAddr[i].port = htons(pMetaMsg->vgroup.ipAddr[i].port);
H
hzcheng 已提交
1653 1654
  }

1655
  SSchema* pSchema = pMetaMsg->schema;
H
hzcheng 已提交
1656

1657
  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
H
hzcheng 已提交
1658 1659 1660
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);
1661 1662 1663 1664 1665

    if (pSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
      assert(i == 0);
    }

H
hjxilinx 已提交
1666
    assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
1667 1668 1669
    pSchema++;
  }

1670 1671
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
1672
  
H
hzcheng 已提交
1673
  // todo add one more function: taosAddDataIfNotExists();
1674
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1675
  assert(pTableMetaInfo->pTableMeta == NULL);
H
hzcheng 已提交
1676

H
hjxilinx 已提交
1677
  pTableMetaInfo->pTableMeta =
H
hjxilinx 已提交
1678
      (STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name, pTableMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1679
  
1680
  // todo handle out of memory case
1681
  if (pTableMetaInfo->pTableMeta == NULL) {
B
Bomin Zhang 已提交
1682
    free(pTableMeta);
1683
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1684
  }
H
hzcheng 已提交
1685

1686
  tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->uid, pTableMeta->sid, pTableMetaInfo->name);
1687
  free(pTableMeta);
1688
  
H
hjxilinx 已提交
1689
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1690 1691
}

S
slguan 已提交
1692
/**
1693
 *  multi table meta rsp pkg format:
1694
 *  | STaosRsp | ieType | SCMMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
1695 1696 1697
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
1698
#if 0
S
slguan 已提交
1699 1700 1701 1702 1703
  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
1704
    pSql->res.code = TSDB_CODE_TSC_INVALID_IE;
S
slguan 已提交
1705
    pSql->res.numOfTotal = 0;
1706
    return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1707 1708 1709 1710
  }

  rsp++;

1711
  SCMMultiTableInfoMsg *pInfo = (SCMMultiTableInfoMsg *)rsp;
S
slguan 已提交
1712
  totalNum = htonl(pInfo->numOfTables);
1713
  rsp += sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1714 1715

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
1716
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
1717
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
1718 1719 1720

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
H
hjxilinx 已提交
1721
    pMeta->vgId = htonl(pMeta->vgId);
S
slguan 已提交
1722 1723
    pMeta->uid = htobe64(pMeta->uid);

H
hjxilinx 已提交
1724 1725
    if (pMeta->sid <= 0 || pMeta->vgId < 0) {
      tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
1726
      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
S
slguan 已提交
1727
      pSql->res.numOfTotal = i;
1728
      return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1729 1730
    }

H
hjxilinx 已提交
1731 1732 1733 1734
    //    pMeta->numOfColumns = htons(pMeta->numOfColumns);
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid tag value count:%d", pMeta->numOfTags);
1735
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1736
    //      pSql->res.numOfTotal = i;
1737
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1738 1739 1740 1741
    //    }
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid numOfTags:%d", pMeta->numOfTags);
1742
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1743
    //      pSql->res.numOfTotal = i;
1744
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1745 1746 1747 1748
    //    }
    //
    //    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
    //      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
1749
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1750
    //      pSql->res.numOfTotal = i;
1751
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1752 1753
    //    }
    //
H
hjxilinx 已提交
1754
    //    for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) {
H
hjxilinx 已提交
1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788
    //      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    //    }
    //
    //    pMeta->rowSize = 0;
    //    rsp += sizeof(SMultiTableMeta);
    //    pSchema = (SSchema *)rsp;
    //
    //    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    //    for (int j = 0; j < numOfTotalCols; ++j) {
    //      pSchema->bytes = htons(pSchema->bytes);
    //      pSchema->colId = htons(pSchema->colId);
    //
    //      // ignore the tags length
    //      if (j < pMeta->numOfColumns) {
    //        pMeta->rowSize += pSchema->bytes;
    //      }
    //      pSchema++;
    //    }
    //
    //    rsp += numOfTotalCols * sizeof(SSchema);
    //
    //    int32_t  tagLen = 0;
    //    SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
    //
    //    if (pMeta->tableType == TSDB_CHILD_TABLE) {
    //      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
    //        tagLen += pTagsSchema[j].bytes;
    //      }
    //    }
    //
    //    rsp += tagLen;
    //    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
    //
    //    pMeta->index = 0;
H
hjxilinx 已提交
1789
    //    (void)taosCachePut(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1790
    //  }
S
slguan 已提交
1791
  }
H
hjxilinx 已提交
1792
  
S
slguan 已提交
1793 1794
  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
1795
  tscDebug("%p load multi-metermeta resp from complete num:%d", pSql, pSql->res.numOfTotal);
1796 1797
#endif
  
S
slguan 已提交
1798 1799 1800
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
1801
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
1802
  SSqlRes* pRes = &pSql->res;
H
hjxilinx 已提交
1803
  
H
hjxilinx 已提交
1804
  // NOTE: the order of several table must be preserved.
1805
  SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pRes->pRsp;
H
hjxilinx 已提交
1806 1807
  pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables);
  char* pMsg = pRes->pRsp + sizeof(SCMSTableVgroupRspMsg);
H
hjxilinx 已提交
1808
  
1809 1810 1811
  // master sqlObj locates in param
  SSqlObj* parent = pSql->param;
  assert(parent != NULL);
H
hjxilinx 已提交
1812
  
1813
  SSqlCmd* pCmd = &parent->cmd;
H
hjxilinx 已提交
1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834
  for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) {
    STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
    SVgroupsInfo *  pVgroupInfo = (SVgroupsInfo *)pMsg;
    pVgroupInfo->numOfVgroups = htonl(pVgroupInfo->numOfVgroups);

    size_t size = sizeof(SCMVgroupInfo) * pVgroupInfo->numOfVgroups + sizeof(SVgroupsInfo);
    pInfo->vgroupList = calloc(1, size);
    assert(pInfo->vgroupList != NULL);

    memcpy(pInfo->vgroupList, pVgroupInfo, size);
    for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
      SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];

      pVgroups->vgId = htonl(pVgroups->vgId);
      assert(pVgroups->numOfIps >= 1);

      for (int32_t k = 0; k < pVgroups->numOfIps; ++k) {
        pVgroups->ipAddr[k].port = htons(pVgroups->ipAddr[k].port);
      }

      pMsg += size;
1835
    }
H
hjxilinx 已提交
1836 1837
  }
  
S
slguan 已提交
1838
  return pSql->res.code;
H
hzcheng 已提交
1839 1840 1841 1842 1843 1844
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
H
hjxilinx 已提交
1845
  STableMetaMsg * pMetaMsg;
1846
  SCMShowRsp *pShow;
S
slguan 已提交
1847
  SSchema *    pSchema;
H
hzcheng 已提交
1848 1849
  char         key[20];

1850 1851 1852
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
1853
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
1854

H
hjxilinx 已提交
1855
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1856

1857
  pShow = (SCMShowRsp *)pRes->pRsp;
S
slguan 已提交
1858
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
1859 1860
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
1861
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
1862
  pMetaMsg = &(pShow->tableMeta);
H
hzcheng 已提交
1863

H
hjxilinx 已提交
1864
  pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1865

H
hjxilinx 已提交
1866
  pSchema = pMetaMsg->schema;
H
hjxilinx 已提交
1867 1868
  pMetaMsg->sid = ntohs(pMetaMsg->sid);
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
hzcheng 已提交
1869 1870 1871 1872
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

1873
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
1874 1875
  strcpy(key + 1, "showlist");

H
hjxilinx 已提交
1876
  taosCacheRelease(tscCacheHandle, (void *)&(pTableMetaInfo->pTableMeta), false);
H
hjxilinx 已提交
1877
  
H
hjxilinx 已提交
1878 1879 1880
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
  
H
hjxilinx 已提交
1881
  pTableMetaInfo->pTableMeta = taosCachePut(tscCacheHandle, key, (char *)pTableMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1882
  SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
1883

1884 1885 1886 1887
  if (pQueryInfo->colList == NULL) {
    pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
  }
  
H
hjxilinx 已提交
1888 1889
  SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
  
S
slguan 已提交
1890
  SColumnIndex index = {0};
H
hjxilinx 已提交
1891 1892 1893
  pSchema = pMetaMsg->schema;
  
  for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) {
S
slguan 已提交
1894
    index.columnIndex = i;
1895 1896
    tscColumnListInsert(pQueryInfo->colList, &index);
    
H
hjxilinx 已提交
1897 1898
    TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes);
    SFieldSupInfo* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
H
hjxilinx 已提交
1899
    
H
hjxilinx 已提交
1900
    pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
1901
                     pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false);
H
hzcheng 已提交
1902
  }
H
hjxilinx 已提交
1903 1904
  
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
1905
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
1906 1907
  
  tfree(pTableMeta);
H
hzcheng 已提交
1908 1909 1910 1911
  return 0;
}

int tscProcessConnectRsp(SSqlObj *pSql) {
S
slguan 已提交
1912
  char temp[TSDB_TABLE_ID_LEN * 2];
H
hzcheng 已提交
1913 1914 1915
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

1916
  SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
B
Bomin Zhang 已提交
1917
  tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId));  // copy acctId from response
1918 1919
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

B
Bomin Zhang 已提交
1920 1921
  assert(len <= sizeof(pObj->db));
  tstrncpy(pObj->db, temp, sizeof(pObj->db));
H
hjxilinx 已提交
1922
  
1923 1924
  if (pConnect->ipList.numOfIps > 0) 
    tscSetMgmtIpList(&pConnect->ipList);
H
hzcheng 已提交
1925

S
slguan 已提交
1926
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
1927 1928
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
S
Shengliang Guan 已提交
1929
  pObj->connId = htonl(pConnect->connId);
S
scripts  
slguan 已提交
1930
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
H
hzcheng 已提交
1931 1932 1933 1934 1935

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
1936
  STscObj *       pObj = pSql->pTscObj;
1937
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
1938

B
Bomin Zhang 已提交
1939
  tstrncpy(pObj->db, pTableMetaInfo->name, sizeof(pObj->db));
H
hzcheng 已提交
1940 1941 1942 1943
  return 0;
}

int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
1944
  taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
1945 1946 1947 1948
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
1949
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
1950

H
hjxilinx 已提交
1951 1952
  STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMeta == NULL) {
H
hzcheng 已提交
1953 1954 1955 1956 1957 1958 1959 1960
    /* not in cache, abort */
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
1961 1962
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
1963
   */
1964
  tscDebug("%p force release table meta after drop table:%s", pSql, pTableMetaInfo->name);
H
hjxilinx 已提交
1965
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
1966

H
hjxilinx 已提交
1967 1968
  if (pTableMetaInfo->pTableMeta) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
1969 1970 1971 1972 1973 1974
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
1975
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
1976

H
hjxilinx 已提交
1977 1978
  STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
1979 1980 1981
    return 0;
  }

1982
  tscDebug("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
H
hjxilinx 已提交
1983
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
1984

H
hjxilinx 已提交
1985
  if (pTableMetaInfo->pTableMeta) {
weixin_48148422's avatar
weixin_48148422 已提交
1986
    bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
H
hjxilinx 已提交
1987
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
1988

1989
    if (isSuperTable) {  // if it is a super table, reset whole query cache
1990
      tscDebug("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
1991
      taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

2006
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
2007 2008 2009
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
2010
  pRes->data = NULL;
S
slguan 已提交
2011
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2012 2013 2014
  return 0;
}

H
hjxilinx 已提交
2015
int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
S
slguan 已提交
2016 2017 2018
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
2019
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2020 2021 2022

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
2023 2024
  pRes->offset    = htobe64(pRetrieve->offset);
  pRes->useconds  = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
2025
  pRes->completed = (pRetrieve->completed == 1);
2026
  pRes->data      = pRetrieve->data;
H
hjxilinx 已提交
2027
  
2028
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2029 2030 2031 2032
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
    return pRes->code;
  }
  
weixin_48148422's avatar
weixin_48148422 已提交
2033
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
2034
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2035
    
H
hjxilinx 已提交
2036
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
H
hjxilinx 已提交
2037 2038
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
2039 2040
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
2041
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
2042
    p += sizeof(int32_t);
S
slguan 已提交
2043
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
2044 2045
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
weixin_48148422's avatar
weixin_48148422 已提交
2046
      p += sizeof(int32_t); // skip tid
weixin_48148422's avatar
weixin_48148422 已提交
2047 2048
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
2049
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
2050
    }
2051 2052
  }

H
hzcheng 已提交
2053
  pRes->row = 0;
2054
  tscDebug("%p numOfRows:%" PRId64 ", offset:%" PRId64 ", complete:%d", pSql, pRes->numOfRows, pRes->offset, pRes->completed);
H
hzcheng 已提交
2055 2056 2057 2058 2059

  return 0;
}

int tscProcessRetrieveRspFromLocal(SSqlObj *pSql) {
2060 2061
  SSqlRes *   pRes = &pSql->res;
  SSqlCmd *   pCmd = &pSql->cmd;
2062
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2063

S
slguan 已提交
2064
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2065 2066 2067

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->data = pRetrieve->data;
2068

2069
  tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
2070 2071 2072 2073
  pRes->row = 0;
  return 0;
}

H
hjxilinx 已提交
2074
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
H
hzcheng 已提交
2075

2076
static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
S
slguan 已提交
2077 2078
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
2079
    tscError("%p malloc failed for new sqlobj to get table meta", pSql);
2080
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2081
  }
2082

H
hzcheng 已提交
2083 2084 2085
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
2086

2087
  tscAddSubqueryInfo(&pNew->cmd);
2088 2089 2090 2091

  SQueryInfo *pNewQueryInfo = NULL;
  tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo);

H
hjxilinx 已提交
2092
  pNew->cmd.autoCreated = pSql->cmd.autoCreated;  // create table if not exists
B
Bomin Zhang 已提交
2093
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) {
2094
    tscError("%p malloc failed for payload to get table meta", pSql);
S
slguan 已提交
2095
    free(pNew);
2096

2097
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2098 2099
  }

H
hjxilinx 已提交
2100
  STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
2101
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
2102

B
Bomin Zhang 已提交
2103
  tstrncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, sizeof(pNewMeterMetaInfo->name));
B
Bomin Zhang 已提交
2104 2105
  memcpy(pNew->cmd.payload, pSql->cmd.payload, pSql->cmd.payloadLen);  // tag information if table does not exists.
  pNew->cmd.payloadLen = pSql->cmd.payloadLen;
2106
  tscDebug("%p new pSqlObj:%p to get tableMeta, auto create:%d", pSql, pNew, pNew->cmd.autoCreated);
H
hzcheng 已提交
2107

H
hjxilinx 已提交
2108 2109
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
H
hzcheng 已提交
2110

H
hjxilinx 已提交
2111 2112
  int32_t code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2113
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;  // notify upper application that current process need to be terminated
H
hzcheng 已提交
2114 2115 2116 2117 2118
  }

  return code;
}

H
hjxilinx 已提交
2119
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
H
hjxilinx 已提交
2120
  assert(strlen(pTableMetaInfo->name) != 0);
S
slguan 已提交
2121

2122
  // If this STableMetaInfo owns a table meta, release it first
H
hjxilinx 已提交
2123 2124
  if (pTableMetaInfo->pTableMeta != NULL) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), false);
2125 2126
  }
  
H
hjxilinx 已提交
2127 2128
  pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMetaInfo->pTableMeta != NULL) {
H
hjxilinx 已提交
2129
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
2130
    tscDebug("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
2131
             tinfo.numOfTags, pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2132 2133 2134

    return TSDB_CODE_SUCCESS;
  }
2135 2136
  
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2137 2138
}

H
hjxilinx 已提交
2139
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
H
hjxilinx 已提交
2140
  pSql->cmd.autoCreated = createIfNotExists;
H
hjxilinx 已提交
2141
  return tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
2142 2143 2144
}

/**
H
Haojun Liao 已提交
2145
 * retrieve table meta from mnode, and update the local table meta cache.
H
hzcheng 已提交
2146
 * @param pSql          sql object
H
Haojun Liao 已提交
2147
 * @param tableId       table full name
H
hzcheng 已提交
2148 2149
 * @return              status code
 */
H
Haojun Liao 已提交
2150
int tscRenewTableMeta(SSqlObj *pSql, char *tableId) {
H
hzcheng 已提交
2151
  SSqlCmd *pCmd = &pSql->cmd;
2152 2153

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
2154
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
2155

H
Haojun Liao 已提交
2156 2157
  STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
  if (pTableMetaInfo->pTableMeta) {
2158
    tscDebug("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
H
Haojun Liao 已提交
2159
             tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->uid, pTableMeta);
H
hzcheng 已提交
2160 2161
  }

H
Haojun Liao 已提交
2162 2163
  taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2164 2165
}

H
hjxilinx 已提交
2166
static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
2167
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2168
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2169
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2170 2171
    if (pTableMetaInfo->vgroupList == NULL) {
      return false;
S
slguan 已提交
2172 2173
    }
  }
H
hjxilinx 已提交
2174 2175 2176 2177
  
  // all super tables vgroupinfo are retrieved, no need to retrieve vgroup info anymore
  return true;
}
H
hzcheng 已提交
2178

H
hjxilinx 已提交
2179
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
2180
  int      code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hjxilinx 已提交
2181 2182 2183
  SSqlCmd *pCmd = &pSql->cmd;
  
  if (allVgroupInfoRetrieved(pCmd, clauseIndex)) {
H
hzcheng 已提交
2184 2185
    return TSDB_CODE_SUCCESS;
  }
H
hjxilinx 已提交
2186

S
slguan 已提交
2187
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
2188 2189 2190
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

H
hjxilinx 已提交
2191
  pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
2192 2193
  
  SQueryInfo *pNewQueryInfo = NULL;
2194
  if ((code = tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo)) != TSDB_CODE_SUCCESS) {
B
Bomin Zhang 已提交
2195
    tscFreeSqlObj(pNew);
2196 2197
    return code;
  }
2198
  
H
hjxilinx 已提交
2199
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2200
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2201 2202 2203
    STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
    STableMeta *pTableMeta = taosCacheAcquireByData(tscCacheHandle, pMInfo->pTableMeta);
    tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList);
S
slguan 已提交
2204 2205 2206 2207 2208 2209
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
2210

2211
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
2212
  tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
H
hzcheng 已提交
2213

2214 2215 2216 2217
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
  code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
2218
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2219 2220 2221 2222 2223
  }

  return code;
}

2224
void tscInitMsgsFp() {
S
slguan 已提交
2225 2226
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
J
jtao1735 已提交
2227
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;
H
hzcheng 已提交
2228 2229

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
2230
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
2231

2232 2233
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
2234 2235

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
S
[TD-16]  
slguan 已提交
2236
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserMsg;
H
hzcheng 已提交
2237 2238 2239
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
2240
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
2241 2242 2243
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
2244
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
2245
  tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
H
hzcheng 已提交
2246 2247 2248 2249
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
H
hjxilinx 已提交
2250
  tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
H
hjxilinx 已提交
2251
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
S
slguan 已提交
2252
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
2253 2254 2255 2256

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
2257 2258 2259
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
2260 2261

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
H
hjxilinx 已提交
2262
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
H
hzcheng 已提交
2263 2264 2265 2266 2267

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
2268
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
H
hjxilinx 已提交
2269
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
S
slguan 已提交
2270
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
2271 2272

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
H
hjxilinx 已提交
2273
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode;  // rsp handled by same function.
H
hzcheng 已提交
2274
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
2275

H
Haojun Liao 已提交
2276 2277 2278 2279 2280
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB]   = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION]  = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS]  = tscProcessLocalRetrieveRsp;
2281

H
hzcheng 已提交
2282 2283
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

H
hjxilinx 已提交
2284
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp;
H
hzcheng 已提交
2285 2286 2287 2288 2289 2290 2291 2292 2293 2294

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;
}