tscServer.c 79.1 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17
#include "tcache.h"
H
Haojun Liao 已提交
18
#include "tcmdtype.h"
H
hzcheng 已提交
19
#include "trpc.h"
20 21
#include "tscLocalMerge.h"
#include "tscLog.h"
H
hzcheng 已提交
22 23 24 25 26 27 28 29 30 31
#include "tscProfile.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"

#define TSC_MGMT_VNODE 999

32
SRpcCorIpSet  tscMgmtIpSet;
S
slguan 已提交
33 34
SRpcIpSet  tscDnodeIpSet;

35 36
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
37 38 39
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
40

B
Bomin Zhang 已提交
41
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt);
42 43
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
44

S
slguan 已提交
45
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
hzcheng 已提交
46

47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
static void tscDumpMgmtIpSet(SRpcIpSet *ipSet) {
  taosCorBeginRead(&tscMgmtIpSet.version);
  *ipSet = tscMgmtIpSet.ipSet;
  taosCorEndRead(&tscMgmtIpSet.version);
}  

bool tscIpSetIsEqual(SRpcIpSet *s1, SRpcIpSet *s2) {
   if (s1->numOfIps != s2->numOfIps 
        || s1->inUse != s1->inUse) {
     return false;
   } 
   for (int32_t i = 0; i < s1->numOfIps; i++) {
     if (s1->port[i] != s2->port[i] 
        || strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0)
        return false;
   }
   return true;
}
void tscSetMgmtIpList(SRpcIpSet *pIpSet) {
  // no need to update if equal
  SRpcIpSet dump;
  tscDumpMgmtIpSet(&dump);
  if (tscIpSetIsEqual(&dump, pIpSet)) {
    return; 
  }
   
  taosCorBeginWrite(&tscMgmtIpSet.version);
  SRpcIpSet *mgmtIpSet = &tscMgmtIpSet.ipSet;
  mgmtIpSet->numOfIps = pIpSet->numOfIps;
  mgmtIpSet->inUse = pIpSet->inUse;
  for (int32_t i = 0; i < mgmtIpSet->numOfIps; ++i) {
     mgmtIpSet->port[i] = htons(pIpSet->port[i]);
     strncpy(mgmtIpSet->fqdn[i], pIpSet->fqdn[i], TSDB_FQDN_LEN);
  }
  taosCorEndWrite(&tscMgmtIpSet.version);
}
83
static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
84 85
  SRpcIpSet* pIpList = &pSql->ipList;
  pIpList->inUse    = 0;
B
Bomin Zhang 已提交
86 87 88 89
  if (pVgroupInfo == NULL) {
    pIpList->numOfIps = 0;
    return;
  }
90
  
B
Bomin Zhang 已提交
91
  pIpList->numOfIps = pVgroupInfo->numOfIps;
92 93 94
  for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) {
    strcpy(pIpList->fqdn[i], pVgroupInfo->ipAddr[i].fqdn);
    pIpList->port[i] = pVgroupInfo->ipAddr[i].port;
95 96 97
  }
}

S
slguan 已提交
98
void tscPrintMgmtIp() {
99 100 101 102
  SRpcIpSet dump;
  tscDumpMgmtIpSet(&dump);
  if (dump.numOfIps <= 0) {
    tscError("invalid mnode IP list:%d", dump.numOfIps);
S
slguan 已提交
103
  } else {
104 105
    for (int i = 0; i < dump.numOfIps; ++i) {
      tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]);
S
slguan 已提交
106
    }
S
slguan 已提交
107 108 109
  }
}

H
hjxilinx 已提交
110 111 112 113 114 115 116
/*
 * For each management node, try twice at least in case of poor network situation.
 * If the client start to connect to a non-management node from the client, and the first retry may fail due to
 * the poor network quality. And then, the second retry get the response with redirection command.
 * The retry will not be executed since only *two* retry is allowed in case of single management node in the cluster.
 * Therefore, we need to multiply the retry times by factor of 2 to fix this problem.
 */
117
UNUSED_FUNC
H
hjxilinx 已提交
118 119
static int32_t tscGetMgmtConnMaxRetryTimes() {
  int32_t factor = 2;
120 121 122
  SRpcIpSet dump;
  tscDumpMgmtIpSet(&dump);
  return dump.numOfIps * factor;
H
hjxilinx 已提交
123 124
}

H
hzcheng 已提交
125 126 127 128 129 130 131 132 133 134 135 136
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
  if (pObj != pObj->signature) {
    tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
    return;
  }

  SSqlObj *pSql = pObj->pHb;
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
137
    SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
S
slguan 已提交
138
    SRpcIpSet *      pIpList = &pRsp->ipList;
139 140
    if (pIpList->numOfIps > 0) 
      tscSetMgmtIpList(pIpList);
S
slguan 已提交
141

S
Shengliang Guan 已提交
142 143
    pSql->pTscObj->connId = htonl(pRsp->connId);

H
hzcheng 已提交
144 145 146
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
    } else {
S
slguan 已提交
147 148
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
149 150
    }
  } else {
151
    tscDebug("heart beat failed, code:%s", tstrerror(code));
H
hzcheng 已提交
152 153 154 155 156 157 158 159 160 161 162 163 164
  }

  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;

  if (pObj == NULL) return;
  if (pObj->signature != pObj) return;
  if (pObj->pTimer != tmrId) return;

  if (pObj->pHb == NULL) {
S
slguan 已提交
165 166 167
    SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
    if (NULL == pSql) return;

H
hzcheng 已提交
168
    pSql->fp = tscProcessHeartBeatRsp;
H
hjxilinx 已提交
169
    
170 171 172 173
    SQueryInfo *pQueryInfo = NULL;
    tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
    pQueryInfo->command = TSDB_SQL_HB;
    
174
    pSql->cmd.command = TSDB_SQL_HB;
S
slguan 已提交
175 176 177 178 179
    if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
      tfree(pSql);
      return;
    }

180
    pSql->cmd.command = TSDB_SQL_HB;
H
hzcheng 已提交
181 182 183 184
    pSql->param = pObj;
    pSql->pTscObj = pObj;
    pSql->signature = pSql;
    pObj->pHb = pSql;
185
    tscAddSubqueryInfo(&pObj->pHb->cmd);
186

187
    tscDebug("%p pHb is allocated, pObj:%p", pObj->pHb, pObj);
H
hzcheng 已提交
188 189 190
  }

  if (tscShouldFreeHeatBeat(pObj->pHb)) {
191
    tscDebug("%p free HB object and release connection", pObj);
H
hzcheng 已提交
192 193 194 195 196 197 198 199 200
    tscFreeSqlObj(pObj->pHb);
    tscCloseTscObj(pObj);
    return;
  }

  tscProcessSql(pObj->pHb);
}

int tscSendMsgToServer(SSqlObj *pSql) {
201
  STscObj* pObj = pSql->pTscObj;
H
hjxilinx 已提交
202 203 204
  SSqlCmd* pCmd = &pSql->cmd;
  
  char *pMsg = rpcMallocCont(pCmd->payloadLen);
S
slguan 已提交
205
  if (NULL == pMsg) {
206
    tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]);
207
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
208 209
  }

210 211
  // set the mgmt ip list
  if (pSql->cmd.command >= TSDB_SQL_MGMT) {
212 213 214
    SRpcIpSet dump;
    tscDumpMgmtIpSet(&dump);
    pSql->ipList = dump;
J
jtao1735 已提交
215 216
  }

217
  memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
218

J
jtao1735 已提交
219
  SRpcMsg rpcMsg = {
220 221 222
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
      .contLen = pSql->cmd.payloadLen,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
223 224
      .ahandle = pSql,
      .handle  = &pSql->pRpcCtx,
H
hjxilinx 已提交
225
      .code    = 0
J
jtao1735 已提交
226
  };
H
hzcheng 已提交
227

H
Haojun Liao 已提交
228 229 230 231
  // NOTE: the rpc context should be acquired before sending data to server.
  // Otherwise, the pSql object may have been released already during the response function, which is
  // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
  // cause crash.
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
232
  rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg);
S
slguan 已提交
233
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
234 235
}

J
jtao1735 已提交
236
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
237
  SSqlObj *pSql = (SSqlObj *)rpcMsg->ahandle;
H
Haojun Liao 已提交
238
  if (pSql == NULL || pSql->signature != pSql) {
B
Bomin Zhang 已提交
239
    tscError("%p sql is already released", pSql);
240 241
    return;
  }
242

H
Haojun Liao 已提交
243
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
244 245
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
246

H
Haojun Liao 已提交
247
  if (pObj->signature != pObj) {
248
    tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
H
Haojun Liao 已提交
249 250 251 252 253 254 255 256

    tscFreeSqlObj(pSql);
    rpcFreeCont(rpcMsg->pCont);
    return;
  }

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
257
    tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
258
        pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
H
Haojun Liao 已提交
259

H
Haojun Liao 已提交
260
    tscFreeSqlObj(pSql);
261
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
262
    return;
H
hzcheng 已提交
263 264
  }

J
jtao1735 已提交
265 266 267
  if (pCmd->command < TSDB_SQL_MGMT) {
    if (pIpSet) pSql->ipList = *pIpSet;
  } else {
268
    if (pIpSet) tscSetMgmtIpList(pIpSet);  
J
jtao1735 已提交
269 270
  }

271
  if (rpcMsg->pCont == NULL) {
272
    rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
S
slguan 已提交
273
  } else {
274
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
    // if (rpcMsg->code != TSDB_CODE_RPC_NETWORK_UNAVAIL) {
    //   if (pCmd->command == TSDB_SQL_CONNECT) {
    //     rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
    //     rpcFreeCont(rpcMsg->pCont);
    //     return;
    //   }

    //   if (pCmd->command == TSDB_SQL_HB) {
    //     rpcMsg->code = TSDB_CODE_RPC_NOT_READY;
    //     rpcFreeCont(rpcMsg->pCont);
    //     return;
    //   }

    //   if (pCmd->command == TSDB_SQL_META || pCmd->command == TSDB_SQL_DESCRIBE_TABLE ||
    //       pCmd->command == TSDB_SQL_STABLEVGROUP || pCmd->command == TSDB_SQL_SHOW ||
    //       pCmd->command == TSDB_SQL_RETRIEVE) {
    //     // get table meta/vgroup query will not retry, do nothing
    //   }
    // }
294

295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
    if ((pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_INSERT ||
         pCmd->command == TSDB_SQL_UPDATE_TAGS_VAL) &&
        (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
         rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) {
      tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);
      // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
      if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
        pSql->cmd.submitSchema = 1;
      }

      pSql->res.code = rpcMsg->code;  // keep the previous error code
      if (pSql->retry > pSql->maxRetry) {
        tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
      } else {
        rpcMsg->code = tscRenewTableMeta(pSql, pTableMetaInfo->name);

        // if there is an error occurring, proceed to the following error handling procedure.
        // todo add test cases
        if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
          rpcFreeCont(rpcMsg->pCont);
          return;
S
slguan 已提交
316
        }
H
hzcheng 已提交
317 318
      }
    }
S
slguan 已提交
319
  }
320

H
hzcheng 已提交
321
  pRes->rspLen = 0;
322
  
323 324
  if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
    pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hzcheng 已提交
325
  } else {
326
    tscDebug("%p query is cancelled, code:%s", pSql, tstrerror(pRes->code));
H
hzcheng 已提交
327 328
  }

S
slguan 已提交
329
  if (pRes->code == TSDB_CODE_SUCCESS) {
330
    tscDebug("%p reset retry counter to be 0 due to success rsp, old:%d", pSql, pSql->retry);
S
slguan 已提交
331 332 333
    pSql->retry = 0;
  }

334
  if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
335
    assert(rpcMsg->msgType == pCmd->msgType + 1);
336
    pRes->code    = rpcMsg->code;
337
    pRes->rspType = rpcMsg->msgType;
338
    pRes->rspLen  = rpcMsg->contLen;
H
hzcheng 已提交
339

340
    if (pRes->rspLen > 0 && rpcMsg->pCont) {
341 342
      char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
      if (tmp == NULL) {
343
        pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
344 345
      } else {
        pRes->pRsp = tmp;
346
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
347
      }
348 349
    } else {
      pRes->pRsp = NULL;
S
slguan 已提交
350 351
    }

H
hzcheng 已提交
352 353 354 355
    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
356
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
357
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
358 359 360 361 362 363 364
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
365
      tscDebug("%p SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql, sqlCmd[pCmd->command], 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
366
          tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
S
slguan 已提交
367
    } else {
368
      tscDebug("%p SQL cmd:%s, code:%s rspLen:%d", pSql, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
H
hzcheng 已提交
369 370
    }
  }
371
  
H
Haojun Liao 已提交
372
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
373
    rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
Haojun Liao 已提交
374
  }
S
Shengliang Guan 已提交
375

376
  if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
H
Haojun Liao 已提交
377
    rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS)? pRes->numOfRows: pRes->code;
378
    
H
hjxilinx 已提交
379
    bool shouldFree = tscShouldBeFreed(pSql);
H
Haojun Liao 已提交
380
    (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
H
hzcheng 已提交
381

382
    if (shouldFree) {
383
      tscDebug("%p sqlObj is automatically freed", pSql);
sangshuduo's avatar
sangshuduo 已提交
384
      tscFreeSqlObj(pSql);
H
hzcheng 已提交
385 386 387
    }
  }

388
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
389 390
}

S
slguan 已提交
391 392 393
int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
394

H
hjxilinx 已提交
395 396 397 398 399 400 401
  if (pCmd->command == TSDB_SQL_SELECT ||
      pCmd->command == TSDB_SQL_FETCH ||
      pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT ||
      pCmd->command == TSDB_SQL_CONNECT ||
      pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META ||
H
hjxilinx 已提交
402
      pCmd->command == TSDB_SQL_STABLEVGROUP) {
403 404 405 406 407 408
    pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
  }
  
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return pRes->code;
S
slguan 已提交
409
  }
410

411 412 413
  int32_t code = tscSendMsgToServer(pSql);

  // NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads.
414
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
415
    pRes->code = code;
H
hjxilinx 已提交
416
    tscQueueAsyncRes(pSql);
417
    return pRes->code;
S
slguan 已提交
418
  }
H
hjxilinx 已提交
419 420
  
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
421 422 423
}

int tscProcessSql(SSqlObj *pSql) {
424 425
  char *   name = NULL;
  SSqlCmd *pCmd = &pSql->cmd;
426 427
  
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
428
  STableMetaInfo *pTableMetaInfo = NULL;
429
  uint32_t        type = 0;
430

431
  if (pQueryInfo != NULL) {
H
hjxilinx 已提交
432
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
433 434
    if (pTableMetaInfo != NULL) {
      name = pTableMetaInfo->name;
435
    }
436

437
    type = pQueryInfo->type;
438
  
H
hjxilinx 已提交
439
    // while numOfTables equals to 0, it must be Heartbeat
H
hjxilinx 已提交
440
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
441
  }
442

443
  tscDebug("%p SQL cmd:%s will be processed, name:%s, type:%d", pSql, sqlCmd[pCmd->command], name, type);
H
hjxilinx 已提交
444
  if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL
H
hjxilinx 已提交
445
    if (pTableMetaInfo == NULL) {
446
      pSql->res.code = TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
447 448
      return pSql->res.code;
    }
H
hjxilinx 已提交
449
  } else if (pCmd->command < TSDB_SQL_LOCAL) {
450 451 452
    SRpcIpSet dump;
    tscDumpMgmtIpSet(&dump);
    pSql->ipList = dump;
H
hzcheng 已提交
453 454 455
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }
456
  
S
slguan 已提交
457 458
  return doProcessSql(pSql);
}
H
hzcheng 已提交
459

H
hjxilinx 已提交
460
void tscKillSTableQuery(SSqlObj *pSql) {
461 462 463
  SSqlCmd* pCmd = &pSql->cmd;
  
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
464
  if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
H
hzcheng 已提交
465 466 467 468 469
    return;
  }

  for (int i = 0; i < pSql->numOfSubs; ++i) {
    SSqlObj *pSub = pSql->pSubs[i];
S
slguan 已提交
470
    if (pSub == NULL) {
H
hzcheng 已提交
471 472
      continue;
    }
S
slguan 已提交
473

H
hzcheng 已提交
474 475
    /*
     * here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
H
Haojun Liao 已提交
476
     * sub-queries not correctly released and master sql object of super table query reaches an abnormal state.
H
hzcheng 已提交
477
     */
dengyihao's avatar
dengyihao 已提交
478 479 480
    rpcCancelRequest(pSub->pRpcCtx);
    pSub->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
    tscQueueAsyncRes(pSub);
H
hzcheng 已提交
481 482 483 484 485
  }

  /*
   * 1. if the subqueries are not launched or partially launched, we need to waiting the launched
   * query return to successfully free allocated resources.
486
   * 2. if no any subqueries are launched yet, which means the super table query only in parse sql stage,
H
hzcheng 已提交
487 488 489 490 491
   * set the res.code, and return.
   */
  const int64_t MAX_WAITING_TIME = 10000;  // 10 Sec.
  int64_t       stime = taosGetTimestampMs();

H
hjxilinx 已提交
492
  while (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
H
hzcheng 已提交
493 494 495 496 497 498
    taosMsleep(100);
    if (taosGetTimestampMs() - stime > MAX_WAITING_TIME) {
      break;
    }
  }

499
  tscDebug("%p super table query cancelled", pSql);
H
hzcheng 已提交
500 501
}

J
jtao1735 已提交
502
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
503
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
S
slguan 已提交
504
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
S
slguan 已提交
505

506
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
S
slguan 已提交
507
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
508

509
  // todo valid the vgroupId at the client side
H
hjxilinx 已提交
510 511
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  
weixin_48148422's avatar
weixin_48148422 已提交
512
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
513
    int32_t vgIndex = pTableMetaInfo->vgroupIndex;
514
    
H
hjxilinx 已提交
515
    SVgroupsInfo* pVgroupInfo = pTableMetaInfo->vgroupList;
H
Haojun Liao 已提交
516 517
    assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);

H
hjxilinx 已提交
518
    pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
519
  } else {
H
hjxilinx 已提交
520
    STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
521
    pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
522
  }
523 524

  pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg);
J
jtao1735 已提交
525
  pSql->cmd.msgType = TSDB_MSG_TYPE_FETCH;
526 527 528

  pRetrieveMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));

529
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
530 531
}

532
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hjxilinx 已提交
533
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
534
  STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
H
hjxilinx 已提交
535
  
536
  char* pMsg = pSql->cmd.payload;
537 538 539
  
  // NOTE: shell message size should not include SMsgDesc
  int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
540 541
  int32_t vgId = pTableMeta->vgroupInfo.vgId;

542
  SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
543 544
  pMsgDesc->numOfVnodes = htonl(1); // always one vnode

545
  pMsg += sizeof(SMsgDesc);
546
  SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
547

H
hjxilinx 已提交
548
  pShellMsg->header.vgId = htonl(vgId);
H
Haojun Liao 已提交
549
  pShellMsg->header.contLen = htonl(size);      // the length not includes the size of SMsgDesc
550
  pShellMsg->length = pShellMsg->header.contLen;
H
hjxilinx 已提交
551
  
H
Haojun Liao 已提交
552
  pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit);  // number of tables to be inserted
H
hzcheng 已提交
553

H
hjxilinx 已提交
554
  // pSql->cmd.payloadLen is set during copying data into payload
S
slguan 已提交
555
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
556
  tscSetDnodeIpList(pSql, &pTableMeta->vgroupInfo);
557
  
558
  tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfIP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
559
      pSql->ipList.numOfIps);
560
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
561 562 563
}

/*
564
 * for table query, simply return the size <= 1k
H
hzcheng 已提交
565
 */
566
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
567
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
568
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
569

570
  int32_t srcColListSize = taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo);
H
hjxilinx 已提交
571 572 573 574
  
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
  int32_t exprSize = sizeof(SSqlFuncMsg) * numOfExprs;
  
575
  return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + 4096;
H
hzcheng 已提交
576 577
}

578
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
579
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
B
Bomin Zhang 已提交
580
  TSKEY dfltKey = htobe64(pQueryMsg->window.skey);
581

H
hjxilinx 已提交
582
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
weixin_48148422's avatar
weixin_48148422 已提交
583
  if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
584 585
    
    SCMVgroupInfo* pVgroupInfo = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
586
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
587 588
      int32_t index = pTableMetaInfo->vgroupIndex;
      assert(index >= 0);
H
hjxilinx 已提交
589
  
B
Bomin Zhang 已提交
590
      if (pTableMetaInfo->vgroupList->numOfVgroups > 0) {
H
Haojun Liao 已提交
591
        assert(index < pTableMetaInfo->vgroupList->numOfVgroups);
B
Bomin Zhang 已提交
592 593
        pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
      }
594
      tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups);
H
hjxilinx 已提交
595 596
    } else {
      pVgroupInfo = &pTableMeta->vgroupInfo;
597
    }
weixin_48148422's avatar
weixin_48148422 已提交
598

599
    tscSetDnodeIpList(pSql, pVgroupInfo);
B
Bomin Zhang 已提交
600 601 602
    if (pVgroupInfo != NULL) {
      pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
    }
weixin_48148422's avatar
weixin_48148422 已提交
603

604 605 606
    STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
    pTableIdInfo->tid = htonl(pTableMeta->sid);
    pTableIdInfo->uid = htobe64(pTableMeta->uid);
B
Bomin Zhang 已提交
607
    pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid, dfltKey));
weixin_48148422's avatar
weixin_48148422 已提交
608

609 610
    pQueryMsg->numOfTables = htonl(1);  // set the number of tables
    pMsg += sizeof(STableIdInfo);
H
Haojun Liao 已提交
611
  } else { // it is a subquery of the super table query, this IP info is acquired from vgroupInfo
612 613 614
    int32_t index = pTableMetaInfo->vgroupIndex;
    int32_t numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables);
    assert(index >= 0 && index < numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
615

616
    tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
617

618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633
    SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
    
    // set the vgroup info
    tscSetDnodeIpList(pSql, &pTableIdList->vgInfo);
    pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
    
    int32_t numOfTables = taosArrayGetSize(pTableIdList->itemList);
    pQueryMsg->numOfTables = htonl(numOfTables);  // set the number of tables
  
    // serialize each table id info
    for(int32_t i = 0; i < numOfTables; ++i) {
      STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i);
      
      STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
      pTableIdInfo->tid = htonl(pItem->tid);
      pTableIdInfo->uid = htobe64(pItem->uid);
B
Bomin Zhang 已提交
634
      pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pItem->uid, dfltKey));
635 636 637 638
      pMsg += sizeof(STableIdInfo);
    }
  }
  
639
  tscDebug("%p vgId:%d, query on table:%s, tid:%d, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name,
H
Haojun Liao 已提交
640
      pTableMeta->sid, pTableMeta->uid);
H
hjxilinx 已提交
641
  
642 643 644
  return pMsg;
}

645
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
646 647
  SSqlCmd *pCmd = &pSql->cmd;

648
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
649

S
slguan 已提交
650 651
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
H
Haojun Liao 已提交
652
    return -1;  // todo add test for this
S
slguan 已提交
653
  }
654
  
655
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
656
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
657
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
658
  
H
hjxilinx 已提交
659
  if (taosArrayGetSize(pQueryInfo->colList) <= 0 && !tscQueryTags(pQueryInfo)) {
660 661 662
    tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
    return -1;
  }
663 664 665 666 667 668 669 670 671 672
  
  if (pQueryInfo->intervalTime < 0) {
    tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
    return -1;
  }
  
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
    return -1;
  }
673

674
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
H
hzcheng 已提交
675

676
  int32_t numOfTags = taosArrayGetSize(pTableMetaInfo->tagColList);
677
  
678
  if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
H
hjxilinx 已提交
679 680
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey);
H
hzcheng 已提交
681
  } else {
H
hjxilinx 已提交
682 683
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.ekey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey);
H
hzcheng 已提交
684 685
  }

686 687
  pQueryMsg->order          = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId     = htons(pQueryInfo->order.orderColId);
H
Haojun Liao 已提交
688
  pQueryMsg->fillType       = htons(pQueryInfo->fillType);
689 690
  pQueryMsg->limit          = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset         = htobe64(pQueryInfo->limit.offset);
691
  pQueryMsg->numOfCols      = htons(taosArrayGetSize(pQueryInfo->colList));
692 693
  pQueryMsg->intervalTime   = htobe64(pQueryInfo->intervalTime);
  pQueryMsg->slidingTime    = htobe64(pQueryInfo->slidingTime);
H
hjxilinx 已提交
694
  pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
695
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
696
  pQueryMsg->numOfTags      = htonl(numOfTags);
weixin_48148422's avatar
weixin_48148422 已提交
697
  pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
H
Haojun Liao 已提交
698
  pQueryMsg->queryType      = htonl(pQueryInfo->type);
H
hjxilinx 已提交
699 700 701
  
  size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
  pQueryMsg->numOfOutput = htons(numOfOutput);
H
hzcheng 已提交
702 703

  // set column list ids
704 705
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
H
hjxilinx 已提交
706
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
707
  
708 709 710
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
    SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
711

712
    if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
713 714
        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
      tscError("%p sid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
B
Bomin Zhang 已提交
715
          pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
716 717
               pColSchema->name);

718
      return TSDB_CODE_TSC_INVALID_SQL;
719
    }
H
hzcheng 已提交
720 721 722

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
723
    pQueryMsg->colList[i].type  = htons(pColSchema->type);
S
slguan 已提交
724
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
725

S
slguan 已提交
726 727 728
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
729

S
slguan 已提交
730
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
731
      pFilterMsg->filterstr = htons(pColFilter->filterstr);
S
slguan 已提交
732 733

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
734

735
      if (pColFilter->filterstr) {
S
slguan 已提交
736 737 738 739 740 741 742 743 744 745
        pFilterMsg->len = htobe64(pColFilter->len);
        memcpy(pMsg, (void *)pColFilter->pz, pColFilter->len + 1);
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
746

S
slguan 已提交
747 748 749 750 751
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
        return -1;
      }
    }
H
hzcheng 已提交
752 753
  }

H
hjxilinx 已提交
754
  SSqlFuncMsg *pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hjxilinx 已提交
755
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
756
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
757

H
hjxilinx 已提交
758
    if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId)) {
759
      /* column id is not valid according to the cached table meta, the table meta is expired */
H
hzcheng 已提交
760 761 762 763
      tscError("%p table schema is not matched with parsed sql", pSql);
      return -1;
    }

764 765 766
    pSqlFuncExpr->colInfo.colId    = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
    pSqlFuncExpr->colInfo.flag     = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
767

768
    pSqlFuncExpr->functionId  = htons(pExpr->functionId);
H
hzcheng 已提交
769
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
H
hjxilinx 已提交
770
    pMsg += sizeof(SSqlFuncMsg);
H
hzcheng 已提交
771 772

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
H
Haojun Liao 已提交
773
      // todo add log
H
hzcheng 已提交
774 775 776 777 778
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
779
        pMsg += pExpr->param[j].nLen;
H
hzcheng 已提交
780 781 782 783 784
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

H
hjxilinx 已提交
785
    pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hzcheng 已提交
786
  }
787
  
788
  // serialize the table info (sid, uid, tags)
789 790
  pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
  
791
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
792
  if (pGroupbyExpr->numOfGroupCols > 0) {
S
slguan 已提交
793
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
794 795
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
796
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
797 798
      SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
  
S
slguan 已提交
799 800 801
      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

802 803
      *((int16_t *)pMsg) += pCol->colIndex;
      pMsg += sizeof(pCol->colIndex);
S
slguan 已提交
804 805 806

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
807 808 809
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
810 811 812
    }
  }

813
  if (pQueryInfo->fillType != TSDB_FILL_NONE) {
H
hjxilinx 已提交
814
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
815 816
      *((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]);
      pMsg += sizeof(pQueryInfo->fillVal[0]);
S
slguan 已提交
817 818
    }
  }
819 820 821 822 823 824 825 826 827
  
  if (numOfTags != 0) {
    int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
    int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
    int32_t total = numOfTagColumns + numOfColumns;
    
    pSchema = tscGetTableTagSchema(pTableMeta);
    
    for (int32_t i = 0; i < numOfTags; ++i) {
H
hjxilinx 已提交
828
      SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
829 830 831 832 833 834
      SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];

      if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
          (pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
        tscError("%p sid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
                 pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, total, numOfTagColumns,
B
Bomin Zhang 已提交
835
                 pCol->colIndex.columnIndex, pColSchema->name);
836

837
        return TSDB_CODE_TSC_INVALID_SQL;
838 839 840 841 842 843 844 845 846 847 848 849
      }
  
      SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
  
      pTagCol->colId = htons(pColSchema->colId);
      pTagCol->bytes = htons(pColSchema->bytes);
      pTagCol->type  = htons(pColSchema->type);
      pTagCol->numOfFilters = 0;
      
      pMsg += sizeof(SColumnInfo);
    }
  }
S
slguan 已提交
850

H
Haojun Liao 已提交
851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871
  // serialize tag column query condition
  if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
    STagCond* pTagCond = &pQueryInfo->tagCond;
    
    SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->uid);
    if (pCond != NULL && pCond->cond != NULL) {
      pQueryMsg->tagCondLen = htons(pCond->len);
      memcpy(pMsg, pCond->cond, pCond->len);
      
      pMsg += pCond->len;
    }
  }
  
  if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
    *pMsg = 0;
    pMsg++;
  } else {
    strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
    pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
  }

S
slguan 已提交
872
  // compressed ts block
873
  pQueryMsg->tsOffset = htonl(pMsg - pCmd->payload);
S
slguan 已提交
874 875 876
  int32_t tsLen = 0;
  int32_t numOfBlocks = 0;

877
  if (pQueryInfo->tsBuf != NULL) {
H
hjxilinx 已提交
878
    STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vgroupIndex);
879
    assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL);  // this query should not be sent
S
slguan 已提交
880 881

    // todo refactor
B
Bomin Zhang 已提交
882 883 884 885 886
    if (fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET) != 0) {
      int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
      tscError("%p: fseek failed: %s", pSql, tstrerror(code));
      return code;
    }
H
Haojun Liao 已提交
887 888 889

    size_t s = fread(pMsg, 1, pBlockInfo->compLen, pQueryInfo->tsBuf->f);
    if (s != pBlockInfo->compLen) {
B
Bomin Zhang 已提交
890 891 892 893
      int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
      tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
      return code;
    }
S
slguan 已提交
894 895 896 897

    pMsg += pBlockInfo->compLen;
    tsLen = pBlockInfo->compLen;
    numOfBlocks = pBlockInfo->numOfBlocks;
H
hzcheng 已提交
898 899
  }

S
slguan 已提交
900 901
  pQueryMsg->tsLen = htonl(tsLen);
  pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
902 903
  if (pQueryInfo->tsBuf != NULL) {
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
H
hzcheng 已提交
904 905
  }

906
  int32_t msgLen = pMsg - pCmd->payload;
H
hzcheng 已提交
907

908
  tscDebug("%p msg built success,len:%d bytes", pSql, msgLen);
H
hzcheng 已提交
909
  pCmd->payloadLen = msgLen;
S
slguan 已提交
910
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hjxilinx 已提交
911
  
912
  pQueryMsg->head.contLen = htonl(msgLen);
H
hzcheng 已提交
913
  assert(msgLen + minMsgSize() <= size);
914 915

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
916 917
}

918 919
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
920
  pCmd->payloadLen = sizeof(SCMCreateDbMsg);
S
slguan 已提交
921
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
H
hzcheng 已提交
922

923
  SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload;
924

925
  assert(pCmd->numOfClause == 1);
926
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
927
  tstrncpy(pCreateDbMsg->db, pTableMetaInfo->name, sizeof(pCreateDbMsg->db));
H
hzcheng 已提交
928

929
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
930 931
}

932 933
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
934
  pCmd->payloadLen = sizeof(SCMCreateDnodeMsg);
S
slguan 已提交
935 936
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
937
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
938
  }
H
hzcheng 已提交
939

940
  SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload;
J
jtao1735 已提交
941 942
  strncpy(pCreate->ep, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
  
S
slguan 已提交
943
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
H
hzcheng 已提交
944

945
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
946 947
}

948 949
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
950
  pCmd->payloadLen = sizeof(SCMCreateAcctMsg);
S
slguan 已提交
951 952
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
953
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
954
  }
H
hzcheng 已提交
955

956
  SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
957

958 959
  SSQLToken *pName = &pInfo->pDCLInfo->user.user;
  SSQLToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
960

961 962
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
963

964
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
965

966 967 968 969 970 971 972 973
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
974

975 976 977 978 979 980 981 982 983 984 985 986 987
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
988

S
slguan 已提交
989
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
990
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
991 992
}

993 994
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
995
  pCmd->payloadLen = sizeof(SCMCreateUserMsg);
S
slguan 已提交
996

S
slguan 已提交
997 998
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
999
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1000 1001
  }

1002
  SCMCreateUserMsg *pAlterMsg = (SCMCreateUserMsg*)pCmd->payload;
H
hzcheng 已提交
1003

1004 1005 1006
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
  pAlterMsg->flag = pUser->type;
H
hzcheng 已提交
1007

1008 1009 1010 1011
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
1012 1013
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
1014
  }
H
hzcheng 已提交
1015

1016
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
S
slguan 已提交
1017
    pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
1018
  } else {
S
slguan 已提交
1019
    pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
1020
  }
H
hzcheng 已提交
1021

1022
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1023 1024
}

1025 1026
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1027
  pCmd->payloadLen = sizeof(SCMCfgDnodeMsg);
guanshengliang's avatar
guanshengliang 已提交
1028
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONFIG_DNODE;
1029 1030
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
1031

1032 1033
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1034
  pCmd->payloadLen = sizeof(SCMDropDbMsg);
H
hzcheng 已提交
1035

S
slguan 已提交
1036 1037
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1038
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1039 1040
  }

1041
  SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
1042

1043
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1044
  tstrncpy(pDropDbMsg->db, pTableMetaInfo->name, sizeof(pDropDbMsg->db));
1045
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1046

S
slguan 已提交
1047
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
1048
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1049 1050
}

1051 1052
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1053
  pCmd->payloadLen = sizeof(SCMDropTableMsg);
H
hzcheng 已提交
1054

S
slguan 已提交
1055 1056
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1057
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1058
  }
H
hzcheng 已提交
1059

1060
  SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
1061
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1062
  strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
S
slguan 已提交
1063
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1064

S
slguan 已提交
1065
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
1066
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1067 1068
}

1069
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1070
  SSqlCmd *pCmd = &pSql->cmd;
1071
  pCmd->payloadLen = sizeof(SCMDropDnodeMsg);
S
slguan 已提交
1072 1073
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1074
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1075
  }
H
hzcheng 已提交
1076

1077
  SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
1078
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1079
  tstrncpy(pDrop->ep, pTableMetaInfo->name, sizeof(pDrop->ep));
S
slguan 已提交
1080
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
H
hzcheng 已提交
1081

1082
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1083 1084
}

S
[TD-16]  
slguan 已提交
1085
int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1086
  SSqlCmd *pCmd = &pSql->cmd;
1087
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
S
slguan 已提交
1088
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
H
hzcheng 已提交
1089

S
slguan 已提交
1090 1091
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1092
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1093
  }
H
hzcheng 已提交
1094

1095
  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
1096
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1097
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
H
hzcheng 已提交
1098

1099
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1100 1101
}

S
[TD-16]  
slguan 已提交
1102 1103 1104 1105 1106 1107 1108
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_ACCT;

  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1109
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
[TD-16]  
slguan 已提交
1110 1111 1112 1113
  }

  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1114
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
S
[TD-16]  
slguan 已提交
1115 1116 1117 1118

  return TSDB_CODE_SUCCESS;
}

1119 1120
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1121
  pCmd->payloadLen = sizeof(SCMUseDbMsg);
H
hzcheng 已提交
1122

S
slguan 已提交
1123 1124
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1125
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1126
  }
1127

1128
  SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload;
1129
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1130
  strcpy(pUseDbMsg->db, pTableMetaInfo->name);
S
slguan 已提交
1131
  pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
H
hzcheng 已提交
1132

1133
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1134 1135
}

1136
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1137
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1138
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1139
  pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
1140
  pCmd->payloadLen = sizeof(SCMShowMsg) + 100;
H
hzcheng 已提交
1141

S
slguan 已提交
1142 1143
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1144
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1145
  }
H
hzcheng 已提交
1146

1147
  SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload;
S
slguan 已提交
1148

1149
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1150
  size_t nameLen = strlen(pTableMetaInfo->name);
S
slguan 已提交
1151
  if (nameLen > 0) {
B
Bomin Zhang 已提交
1152
    tstrncpy(pShowMsg->db, pTableMetaInfo->name, sizeof(pShowMsg->db));  // prefix is set here
H
hzcheng 已提交
1153
  } else {
B
Bomin Zhang 已提交
1154
    tstrncpy(pShowMsg->db, pObj->db, sizeof(pShowMsg->db));
H
hzcheng 已提交
1155 1156
  }

1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
    SSQLToken *pPattern = &pShowInfo->pattern;
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
    SSQLToken *pIpAddr = &pShowInfo->prefix;
    assert(pIpAddr->n > 0 && pIpAddr->type > 0);
H
hzcheng 已提交
1169

1170 1171 1172 1173
    strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n);
    pShowMsg->payloadLen = htons(pIpAddr->n);
  }

1174
  pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
1175
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1176 1177
}

1178
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1179
  SSqlCmd *pCmd = &pSql->cmd;
1180
  pCmd->payloadLen = sizeof(SCMKillQueryMsg);
H
hzcheng 已提交
1181

1182 1183
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
S
slguan 已提交
1184
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
1185 1186
      break;
    case TSDB_SQL_KILL_CONNECTION:
S
slguan 已提交
1187
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
1188 1189
      break;
    case TSDB_SQL_KILL_STREAM:
S
slguan 已提交
1190
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
1191 1192 1193
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1194 1195
}

1196
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1197 1198
  SSqlCmd *pCmd = &(pSql->cmd);

guanshengliang's avatar
guanshengliang 已提交
1199
  int32_t size = minMsgSize() + sizeof(SCMCreateTableMsg);
H
hzcheng 已提交
1200

1201
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1202
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
1203 1204
    size += sizeof(STagData);
  } else {
S
slguan 已提交
1205
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1206
  }
1207

1208 1209 1210
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1211 1212 1213 1214

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1215
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1216
  int              msgLen = 0;
S
slguan 已提交
1217
  SSchema *        pSchema;
H
hzcheng 已提交
1218
  int              size = 0;
1219 1220 1221
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1222
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1223 1224

  // Reallocate the payload size
1225
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1226 1227
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1228
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1229
  }
H
hzcheng 已提交
1230 1231


1232
  SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
H
hjxilinx 已提交
1233
  strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
1234 1235

  // use dbinfo from table id without modifying current db info
H
Haojun Liao 已提交
1236
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pCreateTableMsg->db);
H
hzcheng 已提交
1237

1238 1239 1240
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;
H
hzcheng 已提交
1241 1242 1243 1244
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
S
slguan 已提交
1245
  char *pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
1246

1247 1248
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
1249 1250 1251 1252 1253 1254 1255
    STagData* pTag = &pInfo->pCreateTableInfo->usingInfo.tagdata;
    *(int32_t*)pMsg = htonl(pTag->dataLen);
    pMsg += sizeof(int32_t);
    memcpy(pMsg, pTag->name, sizeof(pTag->name));
    pMsg += sizeof(pTag->name);
    memcpy(pMsg, pTag->data, pTag->dataLen);
    pMsg += pTag->dataLen;
1256
  } else {  // create (super) table
1257
    pSchema = (SSchema *)pCreateTableMsg->schema;
1258

H
hzcheng 已提交
1259
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
H
hjxilinx 已提交
1260
      TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hzcheng 已提交
1261 1262 1263 1264

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
1265

H
hzcheng 已提交
1266 1267 1268 1269
      pSchema++;
    }

    pMsg = (char *)pSchema;
1270 1271
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
1272

1273 1274 1275
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
1276 1277 1278
    }
  }

H
hjxilinx 已提交
1279
  tscFieldInfoClear(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
1280

S
slguan 已提交
1281
  msgLen = pMsg - (char*)pCreateTableMsg;
S
slguan 已提交
1282
  pCreateTableMsg->contLen = htonl(msgLen);
H
hzcheng 已提交
1283
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1284
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
H
hzcheng 已提交
1285 1286

  assert(msgLen + minMsgSize() <= size);
1287
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1288 1289 1290
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
1291
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
guanshengliang's avatar
guanshengliang 已提交
1292
  return minMsgSize() + sizeof(SCMAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
1293 1294 1295
         TSDB_EXTRA_PAYLOAD_SIZE;
}

1296
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1297 1298
  char *pMsg;
  int   msgLen = 0;
H
hzcheng 已提交
1299

1300
  SSqlCmd    *pCmd = &pSql->cmd;
1301 1302
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
1303
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1304 1305 1306
  
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;
  int size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
1307 1308 1309 1310
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
    return -1;
  }
1311 1312
  
  SCMAlterTableMsg *pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
H
Haojun Liao 已提交
1313
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
1314

H
hjxilinx 已提交
1315
  strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
1316
  pAlterTableMsg->type = htons(pAlterInfo->type);
1317

1318
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
S
slguan 已提交
1319
  SSchema *pSchema = pAlterTableMsg->schema;
1320
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
H
hjxilinx 已提交
1321
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
1322
  
H
hzcheng 已提交
1323 1324 1325 1326 1327 1328 1329
    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;
1330 1331 1332
  pAlterTableMsg->tagValLen = htonl(pAlterInfo->tagData.dataLen);
  memcpy(pMsg, pAlterInfo->tagData.data, pAlterInfo->tagData.dataLen);
  pMsg += pAlterInfo->tagData.dataLen;
H
hzcheng 已提交
1333

S
slguan 已提交
1334
  msgLen = pMsg - (char*)pAlterTableMsg;
1335

H
hzcheng 已提交
1336
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1337
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
H
hzcheng 已提交
1338 1339

  assert(msgLen + minMsgSize() <= size);
1340

1341
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1342 1343
}

1344 1345 1346 1347
int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
  SSqlCmd* pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL;
  
1348
  SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
1349
  pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
1350

1351 1352
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1353

1354 1355
  tscSetDnodeIpList(pSql, &pTableMetaInfo->pTableMeta->vgroupInfo);

1356 1357 1358
  return TSDB_CODE_SUCCESS;
}

1359
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1360
  SSqlCmd *pCmd = &pSql->cmd;
1361
  pCmd->payloadLen = sizeof(SCMAlterDbMsg);
S
slguan 已提交
1362
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
H
hzcheng 已提交
1363

1364
  SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
1365
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1366
  tstrncpy(pAlterDbMsg->db, pTableMetaInfo->name, sizeof(pAlterDbMsg->db));
H
hzcheng 已提交
1367

1368
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1369 1370
}

1371
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1372
  SSqlCmd *pCmd = &pSql->cmd;
J
jtao1735 已提交
1373
  pCmd->msgType = TSDB_MSG_TYPE_CM_RETRIEVE;
S
slguan 已提交
1374
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
1375

S
slguan 已提交
1376 1377
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1378
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
1379
  }
S
slguan 已提交
1380

S
slguan 已提交
1381 1382 1383 1384
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
1385

1386
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1387 1388
}

1389
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
1390
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1391 1392 1393
    return pRes->code;
  }

H
hjxilinx 已提交
1394
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
H
hjxilinx 已提交
1395
    int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
H
hjxilinx 已提交
1396
    pRes->tsrow[i] = ((char*) pRes->data + offset * pRes->numOfRows);
H
hzcheng 已提交
1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
1411

1412
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1413

H
hzcheng 已提交
1414 1415 1416 1417 1418 1419
  pRes->code = TSDB_CODE_SUCCESS;
  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

1420
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1421
  } else {
S
slguan 已提交
1422
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
1438
  SSqlCmd *       pCmd = &pSql->cmd;
1439
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
1440

H
hjxilinx 已提交
1441
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1442 1443
  
  int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
H
hzcheng 已提交
1444 1445 1446
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
Haojun Liao 已提交
1447 1448 1449
int tscProcessLocalRetrieveRsp(SSqlObj *pSql) {
  int32_t numOfRes = 1;
  pSql->res.completed = true;
H
hzcheng 已提交
1450 1451 1452
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
hjxilinx 已提交
1453
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
H
hzcheng 已提交
1454 1455 1456
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

1457
  pRes->code = tscDoLocalMerge(pSql);
H
hjxilinx 已提交
1458
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1459 1460

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
1461
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1462 1463 1464
  }

  pRes->row = 0;
1465
  pRes->completed = (pRes->numOfRows == 0);
H
hzcheng 已提交
1466

1467
  int32_t code = pRes->code;
H
hjxilinx 已提交
1468 1469 1470 1471
  if (pRes->code == TSDB_CODE_SUCCESS) {
    (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
  } else {
    tscQueueAsyncRes(pSql);
H
hzcheng 已提交
1472 1473 1474 1475 1476
  }

  return code;
}

S
slguan 已提交
1477
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
1478

1479
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1480
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1481
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1482
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
1483
  pCmd->payloadLen = sizeof(SCMConnectMsg);
H
hzcheng 已提交
1484

S
slguan 已提交
1485 1486
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1487
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1488 1489
  }

1490
  SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
H
hzcheng 已提交
1491 1492 1493 1494

  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
B
Bomin Zhang 已提交
1495 1496 1497
  tstrncpy(pConnect->db, db, sizeof(pConnect->db));
  tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion));
  tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion));
H
hzcheng 已提交
1498

1499
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1500 1501
}

H
hjxilinx 已提交
1502
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1503
  SCMTableInfoMsg *pInfoMsg;
S
slguan 已提交
1504
  char *         pMsg;
H
hzcheng 已提交
1505 1506
  int            msgLen = 0;

B
Bomin Zhang 已提交
1507 1508 1509 1510
  char *tmpData = NULL;
  uint32_t len = pSql->cmd.payloadLen;
  if (len > 0) {
    tmpData = calloc(1, len);
1511
    if (NULL == tmpData) {
1512
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
1513 1514
    }

H
hzcheng 已提交
1515
    // STagData is in binary format, strncpy is not available
B
Bomin Zhang 已提交
1516
    memcpy(tmpData, pSql->cmd.payload, len);
H
hzcheng 已提交
1517 1518
  }

1519 1520 1521
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
1522
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1523

1524
  pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
H
hjxilinx 已提交
1525
  strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
H
hjxilinx 已提交
1526
  pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
S
slguan 已提交
1527

1528
  pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
H
hzcheng 已提交
1529

B
Bomin Zhang 已提交
1530 1531 1532
  if (pSql->cmd.autoCreated && len > 0) {
    memcpy(pInfoMsg->tags, tmpData, len);
    pMsg += len;
H
hzcheng 已提交
1533 1534
  }

1535
  pCmd->payloadLen = pMsg - (char*)pInfoMsg;
S
slguan 已提交
1536
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
H
hzcheng 已提交
1537 1538 1539 1540

  tfree(tmpData);

  assert(msgLen + minMsgSize() <= pCmd->allocSize);
1541
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1542 1543
}

S
slguan 已提交
1544
/**
1545
 *  multi table meta req pkg format:
1546
 *  | SMgmtHead | SCMMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
1547 1548
 *      no used         4B
 **/
1549
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
guanshengliang's avatar
guanshengliang 已提交
1550
#if 0
S
slguan 已提交
1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
    tmpData = calloc(1, pCmd->payloadLen + 1);
    if (NULL == tmpData) return -1;
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
S
slguan 已提交
1563
  memset(pMgmt->db, 0, TSDB_TABLE_ID_LEN);  // server don't need the db
S
slguan 已提交
1564

1565
  SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
S
slguan 已提交
1566
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
1567 1568

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
1569
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
1570 1571 1572 1573
  }

  tfree(tmpData);

1574
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1575
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
S
slguan 已提交
1576 1577 1578

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

1579
  tscDebug("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
1580 1581 1582
           pCmd->payloadLen);

  return pCmd->payloadLen;
guanshengliang's avatar
guanshengliang 已提交
1583 1584
#endif
  return 0;  
S
slguan 已提交
1585 1586
}

H
hjxilinx 已提交
1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612
//static UNUSED_FUNC int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
////  const int32_t defaultSize =
////      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
////  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
////
////  int32_t n = 0;
////  size_t size = taosArrayGetSize(pQueryInfo->tagCond.pCond);
////  for (int32_t i = 0; i < size; ++i) {
////    assert(0);
//////    n += strlen(pQueryInfo->tagCond.cond[i].cond);
////  }
////
////  int32_t tagLen = n * TSDB_NCHAR_SIZE;
////  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
////    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
////  }
////
////  int32_t joinCondLen = (TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2;
////  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
////
////  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndex);
////
////  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
////
////  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
//}
H
hzcheng 已提交
1613

H
hjxilinx 已提交
1614 1615
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hjxilinx 已提交
1616 1617 1618 1619 1620 1621 1622 1623 1624 1625
  
  char* pMsg = pCmd->payload;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  
  SCMSTableVgroupMsg *pStableVgroupMsg = (SCMSTableVgroupMsg *) pMsg;
  pStableVgroupMsg->numOfTables = htonl(pQueryInfo->numOfTables);
  pMsg += sizeof(SCMSTableVgroupMsg);
  
  for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
1626 1627 1628
    size_t size = sizeof(pTableMetaInfo->name);
    tstrncpy(pMsg, pTableMetaInfo->name, size);
    pMsg += size;
H
hjxilinx 已提交
1629
  }
H
hjxilinx 已提交
1630 1631

  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
H
hjxilinx 已提交
1632
  pCmd->payloadLen = (pMsg - pCmd->payload);
H
hjxilinx 已提交
1633

1634
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1635 1636
}

1637 1638
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1639 1640
  STscObj *pObj = pSql->pTscObj;

1641
  pthread_mutex_lock(&pObj->mutex);
H
hzcheng 已提交
1642

S
Shengliang Guan 已提交
1643
  int32_t numOfQueries = 2;
H
hzcheng 已提交
1644 1645 1646
  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
    tpSql = tpSql->next;
1647
    numOfQueries++;
H
hzcheng 已提交
1648 1649
  }

S
Shengliang Guan 已提交
1650
  int32_t numOfStreams = 2;
H
hzcheng 已提交
1651 1652 1653
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
    pStream = pStream->next;
1654
    numOfStreams++;
H
hzcheng 已提交
1655 1656
  }

1657
  int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100;
S
slguan 已提交
1658
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
1659
    pthread_mutex_unlock(&pObj->mutex);
S
slguan 已提交
1660 1661 1662
    tscError("%p failed to malloc for heartbeat msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1663

1664
  SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
1665 1666
  pHeartbeat->numOfQueries = numOfQueries;
  pHeartbeat->numOfStreams = numOfStreams;
1667
  int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj);
H
hzcheng 已提交
1668 1669 1670 1671

  pthread_mutex_unlock(&pObj->mutex);

  pCmd->payloadLen = msgLen;
S
slguan 已提交
1672
  pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
H
hzcheng 已提交
1673

1674
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1675 1676
}

1677 1678
int tscProcessTableMetaRsp(SSqlObj *pSql) {
  STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
H
hzcheng 已提交
1679

1680 1681
  pMetaMsg->sid = htonl(pMetaMsg->sid);
  pMetaMsg->sversion = htons(pMetaMsg->sversion);
1682
  pMetaMsg->tversion = htons(pMetaMsg->tversion);
H
hjxilinx 已提交
1683 1684
  pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
  
1685 1686
  pMetaMsg->uid = htobe64(pMetaMsg->uid);
  pMetaMsg->contLen = htons(pMetaMsg->contLen);
1687
  pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1688

H
hjxilinx 已提交
1689 1690
  if (pMetaMsg->sid < 0 || pMetaMsg->vgroup.numOfIps < 0) {
    tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgroup.numOfIps, pMetaMsg->sid);
1691
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1692 1693
  }

B
Bomin Zhang 已提交
1694
  if (pMetaMsg->numOfTags > TSDB_MAX_TAGS) {
1695
    tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
1696
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1697 1698
  }

1699 1700
  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
1701
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1702 1703
  }

H
hjxilinx 已提交
1704 1705
  for (int i = 0; i < pMetaMsg->vgroup.numOfIps; ++i) {
    pMetaMsg->vgroup.ipAddr[i].port = htons(pMetaMsg->vgroup.ipAddr[i].port);
H
hzcheng 已提交
1706 1707
  }

1708
  SSchema* pSchema = pMetaMsg->schema;
H
hzcheng 已提交
1709

1710
  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
H
hzcheng 已提交
1711 1712 1713
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);
1714 1715 1716 1717 1718

    if (pSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
      assert(i == 0);
    }

H
hjxilinx 已提交
1719
    assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
1720 1721 1722
    pSchema++;
  }

1723 1724
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
1725
  
H
hzcheng 已提交
1726
  // todo add one more function: taosAddDataIfNotExists();
1727
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1728
  assert(pTableMetaInfo->pTableMeta == NULL);
H
hzcheng 已提交
1729

H
Haojun Liao 已提交
1730 1731
  pTableMetaInfo->pTableMeta = (STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name,
      strlen(pTableMetaInfo->name), pTableMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1732
  
1733
  // todo handle out of memory case
1734
  if (pTableMetaInfo->pTableMeta == NULL) {
B
Bomin Zhang 已提交
1735
    free(pTableMeta);
1736
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1737
  }
H
hzcheng 已提交
1738

1739
  tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->uid, pTableMeta->sid, pTableMetaInfo->name);
1740
  free(pTableMeta);
1741
  
H
hjxilinx 已提交
1742
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1743 1744
}

S
slguan 已提交
1745
/**
1746
 *  multi table meta rsp pkg format:
1747
 *  | STaosRsp | ieType | SCMMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
1748 1749 1750
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
1751
#if 0
S
slguan 已提交
1752 1753 1754 1755 1756
  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
1757
    pSql->res.code = TSDB_CODE_TSC_INVALID_IE;
S
slguan 已提交
1758
    pSql->res.numOfTotal = 0;
1759
    return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1760 1761 1762 1763
  }

  rsp++;

1764
  SCMMultiTableInfoMsg *pInfo = (SCMMultiTableInfoMsg *)rsp;
S
slguan 已提交
1765
  totalNum = htonl(pInfo->numOfTables);
1766
  rsp += sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1767 1768

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
1769
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
1770
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
1771 1772 1773

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
H
hjxilinx 已提交
1774
    pMeta->vgId = htonl(pMeta->vgId);
S
slguan 已提交
1775 1776
    pMeta->uid = htobe64(pMeta->uid);

H
hjxilinx 已提交
1777 1778
    if (pMeta->sid <= 0 || pMeta->vgId < 0) {
      tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
1779
      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
S
slguan 已提交
1780
      pSql->res.numOfTotal = i;
1781
      return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1782 1783
    }

H
hjxilinx 已提交
1784 1785 1786 1787
    //    pMeta->numOfColumns = htons(pMeta->numOfColumns);
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid tag value count:%d", pMeta->numOfTags);
1788
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1789
    //      pSql->res.numOfTotal = i;
1790
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1791 1792 1793 1794
    //    }
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid numOfTags:%d", pMeta->numOfTags);
1795
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1796
    //      pSql->res.numOfTotal = i;
1797
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1798 1799 1800 1801
    //    }
    //
    //    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
    //      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
1802
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1803
    //      pSql->res.numOfTotal = i;
1804
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1805 1806
    //    }
    //
H
hjxilinx 已提交
1807
    //    for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) {
H
hjxilinx 已提交
1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841
    //      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    //    }
    //
    //    pMeta->rowSize = 0;
    //    rsp += sizeof(SMultiTableMeta);
    //    pSchema = (SSchema *)rsp;
    //
    //    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    //    for (int j = 0; j < numOfTotalCols; ++j) {
    //      pSchema->bytes = htons(pSchema->bytes);
    //      pSchema->colId = htons(pSchema->colId);
    //
    //      // ignore the tags length
    //      if (j < pMeta->numOfColumns) {
    //        pMeta->rowSize += pSchema->bytes;
    //      }
    //      pSchema++;
    //    }
    //
    //    rsp += numOfTotalCols * sizeof(SSchema);
    //
    //    int32_t  tagLen = 0;
    //    SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
    //
    //    if (pMeta->tableType == TSDB_CHILD_TABLE) {
    //      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
    //        tagLen += pTagsSchema[j].bytes;
    //      }
    //    }
    //
    //    rsp += tagLen;
    //    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
    //
    //    pMeta->index = 0;
H
hjxilinx 已提交
1842
    //    (void)taosCachePut(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1843
    //  }
S
slguan 已提交
1844
  }
H
hjxilinx 已提交
1845
  
S
slguan 已提交
1846 1847
  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
1848
  tscDebug("%p load multi-metermeta resp from complete num:%d", pSql, pSql->res.numOfTotal);
1849 1850
#endif
  
S
slguan 已提交
1851 1852 1853
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
1854
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
1855
  SSqlRes* pRes = &pSql->res;
H
hjxilinx 已提交
1856
  
H
hjxilinx 已提交
1857
  // NOTE: the order of several table must be preserved.
1858
  SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pRes->pRsp;
H
hjxilinx 已提交
1859 1860
  pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables);
  char* pMsg = pRes->pRsp + sizeof(SCMSTableVgroupRspMsg);
H
hjxilinx 已提交
1861
  
1862 1863 1864
  // master sqlObj locates in param
  SSqlObj* parent = pSql->param;
  assert(parent != NULL);
H
hjxilinx 已提交
1865
  
1866
  SSqlCmd* pCmd = &parent->cmd;
H
hjxilinx 已提交
1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887
  for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) {
    STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
    SVgroupsInfo *  pVgroupInfo = (SVgroupsInfo *)pMsg;
    pVgroupInfo->numOfVgroups = htonl(pVgroupInfo->numOfVgroups);

    size_t size = sizeof(SCMVgroupInfo) * pVgroupInfo->numOfVgroups + sizeof(SVgroupsInfo);
    pInfo->vgroupList = calloc(1, size);
    assert(pInfo->vgroupList != NULL);

    memcpy(pInfo->vgroupList, pVgroupInfo, size);
    for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
      SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];

      pVgroups->vgId = htonl(pVgroups->vgId);
      assert(pVgroups->numOfIps >= 1);

      for (int32_t k = 0; k < pVgroups->numOfIps; ++k) {
        pVgroups->ipAddr[k].port = htons(pVgroups->ipAddr[k].port);
      }

      pMsg += size;
1888
    }
H
hjxilinx 已提交
1889 1890
  }
  
S
slguan 已提交
1891
  return pSql->res.code;
H
hzcheng 已提交
1892 1893 1894 1895 1896 1897
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
H
hjxilinx 已提交
1898
  STableMetaMsg * pMetaMsg;
1899
  SCMShowRsp *pShow;
S
slguan 已提交
1900
  SSchema *    pSchema;
H
hzcheng 已提交
1901 1902
  char         key[20];

1903 1904 1905
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
1906
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
1907

H
hjxilinx 已提交
1908
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1909

1910
  pShow = (SCMShowRsp *)pRes->pRsp;
S
slguan 已提交
1911
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
1912 1913
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
1914
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
1915
  pMetaMsg = &(pShow->tableMeta);
H
hzcheng 已提交
1916

H
hjxilinx 已提交
1917
  pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1918

H
hjxilinx 已提交
1919
  pSchema = pMetaMsg->schema;
H
hjxilinx 已提交
1920 1921
  pMetaMsg->sid = ntohs(pMetaMsg->sid);
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
hzcheng 已提交
1922 1923 1924 1925
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

1926
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
1927 1928
  strcpy(key + 1, "showlist");

H
hjxilinx 已提交
1929
  taosCacheRelease(tscCacheHandle, (void *)&(pTableMetaInfo->pTableMeta), false);
H
hjxilinx 已提交
1930
  
H
hjxilinx 已提交
1931 1932 1933
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
  
H
Haojun Liao 已提交
1934 1935
  pTableMetaInfo->pTableMeta = taosCachePut(tscCacheHandle, key, strlen(key), (char *)pTableMeta, size,
      tsTableMetaKeepTimer);
H
hjxilinx 已提交
1936
  SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
1937

1938 1939 1940 1941
  if (pQueryInfo->colList == NULL) {
    pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
  }
  
H
hjxilinx 已提交
1942 1943
  SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
  
S
slguan 已提交
1944
  SColumnIndex index = {0};
H
hjxilinx 已提交
1945 1946 1947
  pSchema = pMetaMsg->schema;
  
  for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) {
S
slguan 已提交
1948
    index.columnIndex = i;
1949 1950
    tscColumnListInsert(pQueryInfo->colList, &index);
    
H
hjxilinx 已提交
1951 1952
    TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes);
    SFieldSupInfo* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
H
hjxilinx 已提交
1953
    
H
hjxilinx 已提交
1954
    pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
1955
                     pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false);
H
hzcheng 已提交
1956
  }
H
hjxilinx 已提交
1957 1958
  
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
1959
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
1960 1961
  
  tfree(pTableMeta);
H
hzcheng 已提交
1962 1963 1964 1965
  return 0;
}

int tscProcessConnectRsp(SSqlObj *pSql) {
S
slguan 已提交
1966
  char temp[TSDB_TABLE_ID_LEN * 2];
H
hzcheng 已提交
1967 1968 1969
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

1970
  SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
B
Bomin Zhang 已提交
1971
  tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId));  // copy acctId from response
1972 1973
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

B
Bomin Zhang 已提交
1974 1975
  assert(len <= sizeof(pObj->db));
  tstrncpy(pObj->db, temp, sizeof(pObj->db));
H
hjxilinx 已提交
1976
  
1977 1978
  if (pConnect->ipList.numOfIps > 0) 
    tscSetMgmtIpList(&pConnect->ipList);
H
hzcheng 已提交
1979

S
slguan 已提交
1980
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
1981 1982
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
S
Shengliang Guan 已提交
1983
  pObj->connId = htonl(pConnect->connId);
S
scripts  
slguan 已提交
1984
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
H
hzcheng 已提交
1985 1986 1987 1988 1989

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
1990
  STscObj *       pObj = pSql->pTscObj;
1991
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
1992

B
Bomin Zhang 已提交
1993
  tstrncpy(pObj->db, pTableMetaInfo->name, sizeof(pObj->db));
H
hzcheng 已提交
1994 1995 1996 1997
  return 0;
}

int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
1998
  taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
1999 2000 2001 2002
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
2003
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2004

H
Haojun Liao 已提交
2005 2006
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2007 2008 2009 2010 2011 2012 2013
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
2014 2015
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
2016
   */
2017
  tscDebug("%p force release table meta after drop table:%s", pSql, pTableMetaInfo->name);
H
hjxilinx 已提交
2018
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2019

H
hjxilinx 已提交
2020 2021
  if (pTableMetaInfo->pTableMeta) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2022 2023 2024 2025 2026 2027
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
2028
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2029

H
Haojun Liao 已提交
2030
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2031
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2032 2033 2034
    return 0;
  }

2035
  tscDebug("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
H
hjxilinx 已提交
2036
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2037

H
hjxilinx 已提交
2038
  if (pTableMetaInfo->pTableMeta) {
weixin_48148422's avatar
weixin_48148422 已提交
2039
    bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
H
hjxilinx 已提交
2040
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2041

2042
    if (isSuperTable) {  // if it is a super table, reset whole query cache
2043
      tscDebug("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
2044
      taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

2059
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
2060 2061 2062
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
2063
  pRes->data = NULL;
S
slguan 已提交
2064
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2065 2066 2067
  return 0;
}

H
hjxilinx 已提交
2068
int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
S
slguan 已提交
2069 2070 2071
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
2072
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2073 2074 2075

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
2076 2077
  pRes->offset    = htobe64(pRetrieve->offset);
  pRes->useconds  = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
2078
  pRes->completed = (pRetrieve->completed == 1);
2079
  pRes->data      = pRetrieve->data;
H
hjxilinx 已提交
2080
  
2081
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2082 2083 2084 2085
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
    return pRes->code;
  }
  
weixin_48148422's avatar
weixin_48148422 已提交
2086
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
2087
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2088
    
H
hjxilinx 已提交
2089
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
H
hjxilinx 已提交
2090 2091
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
2092 2093
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
2094
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
2095
    p += sizeof(int32_t);
S
slguan 已提交
2096
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
2097 2098
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
weixin_48148422's avatar
weixin_48148422 已提交
2099
      p += sizeof(int32_t); // skip tid
weixin_48148422's avatar
weixin_48148422 已提交
2100 2101
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
2102
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
2103
    }
2104 2105
  }

H
hzcheng 已提交
2106
  pRes->row = 0;
2107
  tscDebug("%p numOfRows:%" PRId64 ", offset:%" PRId64 ", complete:%d", pSql, pRes->numOfRows, pRes->offset, pRes->completed);
H
hzcheng 已提交
2108 2109 2110 2111 2112

  return 0;
}

int tscProcessRetrieveRspFromLocal(SSqlObj *pSql) {
2113 2114
  SSqlRes *   pRes = &pSql->res;
  SSqlCmd *   pCmd = &pSql->cmd;
2115
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2116

S
slguan 已提交
2117
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2118 2119 2120

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->data = pRetrieve->data;
2121

2122
  tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
2123 2124 2125 2126
  pRes->row = 0;
  return 0;
}

H
hjxilinx 已提交
2127
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
H
hzcheng 已提交
2128

2129
static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
S
slguan 已提交
2130 2131
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
2132
    tscError("%p malloc failed for new sqlobj to get table meta", pSql);
2133
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2134
  }
2135

H
hzcheng 已提交
2136 2137 2138
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
2139

2140
  tscAddSubqueryInfo(&pNew->cmd);
2141 2142 2143 2144

  SQueryInfo *pNewQueryInfo = NULL;
  tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo);

H
hjxilinx 已提交
2145
  pNew->cmd.autoCreated = pSql->cmd.autoCreated;  // create table if not exists
B
Bomin Zhang 已提交
2146
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) {
2147
    tscError("%p malloc failed for payload to get table meta", pSql);
S
slguan 已提交
2148
    free(pNew);
2149

2150
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2151 2152
  }

H
hjxilinx 已提交
2153
  STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
2154
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
2155

B
Bomin Zhang 已提交
2156
  tstrncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, sizeof(pNewMeterMetaInfo->name));
B
Bomin Zhang 已提交
2157 2158
  memcpy(pNew->cmd.payload, pSql->cmd.payload, pSql->cmd.payloadLen);  // tag information if table does not exists.
  pNew->cmd.payloadLen = pSql->cmd.payloadLen;
2159
  tscDebug("%p new pSqlObj:%p to get tableMeta, auto create:%d", pSql, pNew, pNew->cmd.autoCreated);
H
hzcheng 已提交
2160

H
hjxilinx 已提交
2161 2162
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
H
hzcheng 已提交
2163

H
hjxilinx 已提交
2164 2165
  int32_t code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2166
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;  // notify upper application that current process need to be terminated
H
hzcheng 已提交
2167 2168 2169 2170 2171
  }

  return code;
}

H
hjxilinx 已提交
2172
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
H
hjxilinx 已提交
2173
  assert(strlen(pTableMetaInfo->name) != 0);
S
slguan 已提交
2174

2175
  // If this STableMetaInfo owns a table meta, release it first
H
hjxilinx 已提交
2176 2177
  if (pTableMetaInfo->pTableMeta != NULL) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), false);
2178 2179
  }
  
H
Haojun Liao 已提交
2180
  pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2181
  if (pTableMetaInfo->pTableMeta != NULL) {
H
hjxilinx 已提交
2182
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
2183
    tscDebug("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
2184
             tinfo.numOfTags, pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2185 2186 2187

    return TSDB_CODE_SUCCESS;
  }
2188 2189
  
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2190 2191
}

H
hjxilinx 已提交
2192
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
H
hjxilinx 已提交
2193
  pSql->cmd.autoCreated = createIfNotExists;
H
hjxilinx 已提交
2194
  return tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
2195 2196 2197
}

/**
H
Haojun Liao 已提交
2198
 * retrieve table meta from mnode, and update the local table meta cache.
H
hzcheng 已提交
2199
 * @param pSql          sql object
H
Haojun Liao 已提交
2200
 * @param tableId       table full name
H
hzcheng 已提交
2201 2202
 * @return              status code
 */
H
Haojun Liao 已提交
2203
int tscRenewTableMeta(SSqlObj *pSql, char *tableId) {
H
hzcheng 已提交
2204
  SSqlCmd *pCmd = &pSql->cmd;
2205 2206

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
2207
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
2208

H
Haojun Liao 已提交
2209 2210
  STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
  if (pTableMetaInfo->pTableMeta) {
2211
    tscDebug("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
H
Haojun Liao 已提交
2212
             tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->uid, pTableMeta);
H
hzcheng 已提交
2213 2214
  }

H
Haojun Liao 已提交
2215 2216
  taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2217 2218
}

H
hjxilinx 已提交
2219
static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
2220
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2221
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2222
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2223 2224
    if (pTableMetaInfo->vgroupList == NULL) {
      return false;
S
slguan 已提交
2225 2226
    }
  }
H
hjxilinx 已提交
2227 2228 2229 2230
  
  // all super tables vgroupinfo are retrieved, no need to retrieve vgroup info anymore
  return true;
}
H
hzcheng 已提交
2231

H
hjxilinx 已提交
2232
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
2233
  int      code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hjxilinx 已提交
2234 2235 2236
  SSqlCmd *pCmd = &pSql->cmd;
  
  if (allVgroupInfoRetrieved(pCmd, clauseIndex)) {
H
hzcheng 已提交
2237 2238
    return TSDB_CODE_SUCCESS;
  }
H
hjxilinx 已提交
2239

S
slguan 已提交
2240
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
2241 2242 2243
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

H
hjxilinx 已提交
2244
  pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
2245 2246
  
  SQueryInfo *pNewQueryInfo = NULL;
2247
  if ((code = tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo)) != TSDB_CODE_SUCCESS) {
B
Bomin Zhang 已提交
2248
    tscFreeSqlObj(pNew);
2249 2250
    return code;
  }
2251
  
H
hjxilinx 已提交
2252
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2253
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2254 2255 2256
    STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
    STableMeta *pTableMeta = taosCacheAcquireByData(tscCacheHandle, pMInfo->pTableMeta);
    tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList);
S
slguan 已提交
2257 2258 2259 2260 2261 2262
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
2263

2264
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
2265
  tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
H
hzcheng 已提交
2266

2267 2268 2269 2270
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
  code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
2271
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2272 2273 2274 2275 2276
  }

  return code;
}

2277
void tscInitMsgsFp() {
S
slguan 已提交
2278 2279
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
J
jtao1735 已提交
2280
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;
H
hzcheng 已提交
2281 2282

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
2283
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
2284

2285 2286
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
2287 2288

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
S
[TD-16]  
slguan 已提交
2289
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserMsg;
H
hzcheng 已提交
2290 2291 2292
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
2293
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
2294 2295 2296
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
2297
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
2298
  tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
H
hzcheng 已提交
2299 2300 2301 2302
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
H
hjxilinx 已提交
2303
  tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
H
hjxilinx 已提交
2304
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
S
slguan 已提交
2305
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
2306 2307 2308 2309

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
2310 2311 2312
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
2313 2314

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
H
hjxilinx 已提交
2315
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
H
hzcheng 已提交
2316 2317 2318 2319 2320

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
2321
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
H
hjxilinx 已提交
2322
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
S
slguan 已提交
2323
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
2324 2325

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
H
hjxilinx 已提交
2326
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode;  // rsp handled by same function.
H
hzcheng 已提交
2327
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
2328

H
Haojun Liao 已提交
2329 2330 2331 2332 2333
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB]   = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION]  = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS]  = tscProcessLocalRetrieveRsp;
2334

H
hzcheng 已提交
2335 2336
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

H
hjxilinx 已提交
2337
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp;
H
hzcheng 已提交
2338 2339 2340 2341 2342 2343 2344 2345 2346 2347

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;
}