tscServer.c 87.8 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17 18 19 20
#include "tcache.h"
#include "trpc.h"
#include "tscProfile.h"
#include "tscSecondaryMerge.h"
H
hjxilinx 已提交
21
#include "tscSubquery.h"
H
hzcheng 已提交
22 23 24 25 26 27 28
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "tsocket.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
S
slguan 已提交
29
#include "tscLog.h"
H
hzcheng 已提交
30 31 32

#define TSC_MGMT_VNODE 999

S
slguan 已提交
33
SRpcIpSet  tscMgmtIpList;
S
slguan 已提交
34 35
SRpcIpSet  tscDnodeIpSet;

36 37
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
38 39 40
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
41

42 43 44
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid);
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
45

S
slguan 已提交
46
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
hzcheng 已提交
47

48 49 50
static void tscSetDnodeIpList(SSqlObj* pSql, STableMeta* pTableMeta) {
  SRpcIpSet* pIpList = &pSql->ipList;
  
H
hjxilinx 已提交
51
  pIpList->numOfIps = pTableMeta->vgroupInfo.numOfIps;
52 53 54
  pIpList->port     = tsDnodeShellPort;
  pIpList->inUse    = 0;
  
H
hjxilinx 已提交
55 56
  for(int32_t i = 0; i < pTableMeta->vgroupInfo.numOfIps; ++i) {
    pIpList->ip[i] = pTableMeta->vgroupInfo.ipAddr[i].ip;
57 58 59
  }
}

S
slguan 已提交
60 61
void tscPrintMgmtIp() {
  if (tscMgmtIpList.numOfIps <= 0) {
S
slguan 已提交
62
    tscError("invalid mgmt IP list:%d", tscMgmtIpList.numOfIps);
S
slguan 已提交
63
  } else {
S
slguan 已提交
64
    for (int i = 0; i < tscMgmtIpList.numOfIps; ++i) {
S
slguan 已提交
65
      tscTrace("mgmt index:%d ip:%d", i, tscMgmtIpList.ip[i]);
S
slguan 已提交
66
    }
S
slguan 已提交
67 68 69
  }
}

S
slguan 已提交
70 71
void tscSetMgmtIpListFromCluster(SRpcIpSet *pIpList) {
  tscMgmtIpList.numOfIps = htons(pIpList->numOfIps);
S
slguan 已提交
72
  tscMgmtIpList.inUse = htons(pIpList->inUse);
S
slguan 已提交
73 74 75
  tscMgmtIpList.port = htons(pIpList->port);
  for (int32_t i = 0; i <tscMgmtIpList.numOfIps; ++i) {
    tscMgmtIpList.ip[i] = pIpList->ip[i];
S
slguan 已提交
76 77 78 79
  }
}

void tscSetMgmtIpListFromEdge() {
S
slguan 已提交
80 81
  if (tscMgmtIpList.numOfIps != 1) {
    tscMgmtIpList.numOfIps = 1;
S
slguan 已提交
82
    tscMgmtIpList.inUse = 0;
S
slguan 已提交
83
    tscMgmtIpList.port = tsMnodeShellPort;
S
slguan 已提交
84 85 86 87 88 89
    tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
    tscTrace("edge mgmt IP list:");
    tscPrintMgmtIp();
  }
}

S
slguan 已提交
90
void tscSetMgmtIpList(SRpcIpSet *pIpList) {
S
slguan 已提交
91 92 93 94 95 96 97 98
  /*
    * The iplist returned by the cluster edition is the current management nodes
    * and the iplist returned by the edge edition is empty
    */
  if (pIpList->numOfIps != 0) {
    tscSetMgmtIpListFromCluster(pIpList);
  } else {
    tscSetMgmtIpListFromEdge();
S
slguan 已提交
99 100 101
  }
}

H
hjxilinx 已提交
102 103 104 105 106 107 108
/*
 * For each management node, try twice at least in case of poor network situation.
 * If the client start to connect to a non-management node from the client, and the first retry may fail due to
 * the poor network quality. And then, the second retry get the response with redirection command.
 * The retry will not be executed since only *two* retry is allowed in case of single management node in the cluster.
 * Therefore, we need to multiply the retry times by factor of 2 to fix this problem.
 */
109
UNUSED_FUNC
H
hjxilinx 已提交
110 111 112 113 114
static int32_t tscGetMgmtConnMaxRetryTimes() {
  int32_t factor = 2;
  return tscMgmtIpList.numOfIps * factor;
}

H
hzcheng 已提交
115 116 117 118 119 120 121 122 123 124 125 126
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
  if (pObj != pObj->signature) {
    tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
    return;
  }

  SSqlObj *pSql = pObj->pHb;
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
127
    SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
S
slguan 已提交
128
    SRpcIpSet *      pIpList = &pRsp->ipList;
S
slguan 已提交
129
    tscSetMgmtIpList(pIpList);
S
slguan 已提交
130

H
hzcheng 已提交
131 132 133
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
    } else {
S
slguan 已提交
134 135
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
    }
  } else {
    tscTrace("heart beat failed, code:%d", code);
  }

  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;

  if (pObj == NULL) return;
  if (pObj->signature != pObj) return;
  if (pObj->pTimer != tmrId) return;

  if (pObj->pHb == NULL) {
S
slguan 已提交
152 153 154
    SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
    if (NULL == pSql) return;

H
hzcheng 已提交
155
    pSql->fp = tscProcessHeartBeatRsp;
H
hjxilinx 已提交
156
    
157 158 159 160
    SQueryInfo *pQueryInfo = NULL;
    tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
    pQueryInfo->command = TSDB_SQL_HB;
    
S
slguan 已提交
161 162 163 164 165
    if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
      tfree(pSql);
      return;
    }

H
hzcheng 已提交
166 167 168 169
    pSql->param = pObj;
    pSql->pTscObj = pObj;
    pSql->signature = pSql;
    pObj->pHb = pSql;
170
    tscAddSubqueryInfo(&pObj->pHb->cmd);
171

S
slguan 已提交
172
    tscTrace("%p pHb is allocated, pObj:%p", pObj->pHb, pObj);
H
hzcheng 已提交
173 174 175
  }

  if (tscShouldFreeHeatBeat(pObj->pHb)) {
S
slguan 已提交
176
    tscTrace("%p free HB object and release connection", pObj);
H
hzcheng 已提交
177 178 179 180 181 182 183 184 185
    tscFreeSqlObj(pObj->pHb);
    tscCloseTscObj(pObj);
    return;
  }

  tscProcessSql(pObj->pHb);
}

int tscSendMsgToServer(SSqlObj *pSql) {
H
hjxilinx 已提交
186 187 188
  SSqlCmd* pCmd = &pSql->cmd;
  
  char *pMsg = rpcMallocCont(pCmd->payloadLen);
S
slguan 已提交
189
  if (NULL == pMsg) {
S
slguan 已提交
190 191
    tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
H
hzcheng 已提交
192 193
  }

S
slguan 已提交
194
  if (pSql->cmd.command < TSDB_SQL_MGMT) {
195
    tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port);
S
slguan 已提交
196
    memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen);
197 198 199 200 201 202

    SRpcMsg rpcMsg = {
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
      .contLen = pSql->cmd.payloadLen,
      .handle  = pSql,
H
hjxilinx 已提交
203
      .code    = 0
204
    };
H
hjxilinx 已提交
205
    rpcSendRequest(pVnodeConn, &pSql->ipList, &rpcMsg);
S
slguan 已提交
206
  } else {
H
hjxilinx 已提交
207 208 209 210
    pSql->ipList = tscMgmtIpList;
    pSql->ipList.port = tsMnodeShellPort;
    
    tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port);
S
slguan 已提交
211
    memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
212 213 214 215 216 217 218
    SRpcMsg rpcMsg = {
        .msgType = pSql->cmd.msgType,
        .pCont   = pMsg,
        .contLen = pSql->cmd.payloadLen,
        .handle  = pSql,
        .code   = 0
    };
H
hjxilinx 已提交
219
    rpcSendRequest(pTscMgmtConn, &pSql->ipList, &rpcMsg);
H
hzcheng 已提交
220 221
  }

S
slguan 已提交
222
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
223 224
}

225 226
void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
  SSqlObj *pSql = (SSqlObj *)rpcMsg->handle;
S
slguan 已提交
227
  if (pSql == NULL || pSql->signature != pSql) {
H
hzcheng 已提交
228
    tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
S
slguan 已提交
229
    return;
H
hzcheng 已提交
230 231
  }

S
slguan 已提交
232 233 234
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;
235
  tscTrace("%p msg:%p is received from server", pSql, rpcMsg->pCont);
H
hzcheng 已提交
236 237

  if (pSql->freed || pObj->signature != pObj) {
S
slguan 已提交
238 239
    tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed,
             pObj, pObj->signature);
H
hzcheng 已提交
240
    tscFreeSqlObj(pSql);
241
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
242
    return;
H
hzcheng 已提交
243 244
  }

245 246
  if (rpcMsg->pCont == NULL) {
    rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
S
slguan 已提交
247
  } else {
248
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
249 250
    if (rpcMsg->code == TSDB_CODE_NOT_ACTIVE_TABLE || rpcMsg->code == TSDB_CODE_INVALID_TABLE_ID ||
        rpcMsg->code == TSDB_CODE_INVALID_VNODE_ID || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_VNODE ||
S
slguan 已提交
251
        rpcMsg->code == TSDB_CODE_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_TABLE ||
252
        rpcMsg->code == TSDB_CODE_TABLE_ID_MISMATCH) {
S
slguan 已提交
253 254 255 256 257 258 259 260 261 262 263
      /*
       * not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized,
       *                   the virtual node may have not create table till now, so try again by using the new metermeta.
       *                   2. this requested table may have been removed by other client, so we need to renew the
       *                   metermeta here.
       *
       * not_active_vnode: current vnode is move to other node due to node balance procedure or virtual node have been
       *                   removed. So, renew metermeta and try again.
       * not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore.
       */
      if (pCmd->command == TSDB_SQL_CONNECT) {
264 265
        rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
        rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
266 267
        return;
      } else if (pCmd->command == TSDB_SQL_HB) {
268 269
        rpcMsg->code = TSDB_CODE_NOT_READY;
        rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
270 271
        return;
      } else {
272
        tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);
273
        
274
        pSql->res.code = rpcMsg->code;  // keep the previous error code
275 276 277 278 279 280 281 282
        if (pSql->retry > pSql->maxRetry) {
          tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
        } else {
          rpcMsg->code = tscRenewMeterMeta(pSql, pTableMetaInfo->name);
          if (pTableMetaInfo->pTableMeta) {
            tscSendMsgToServer(pSql);
          }
  
283
          rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
284 285
          return;
        }
H
hzcheng 已提交
286 287
      }
    }
S
slguan 已提交
288
  }
H
hzcheng 已提交
289

290 291 292 293 294
  if (pRes->code == TSDB_CODE_SUCCESS) {
    tscTrace("%p reset retry counter to be 0 due to success rsp, old:%d", pSql, pSql->retry);
    pSql->retry = 0;
  }
  
H
hzcheng 已提交
295
  pRes->rspLen = 0;
296
  
H
hzcheng 已提交
297
  if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
298
    pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_NETWORK_UNAVAIL;
H
hzcheng 已提交
299
  } else {
H
hjxilinx 已提交
300
    tscTrace("%p query is cancelled, code:%d", pSql, tstrerror(pRes->code));
H
hzcheng 已提交
301 302
  }

S
slguan 已提交
303
  if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
304
    assert(rpcMsg->msgType == pCmd->msgType + 1);
305
    pRes->code    = rpcMsg->code;
306
    pRes->rspType = rpcMsg->msgType;
307
    pRes->rspLen  = rpcMsg->contLen;
H
hzcheng 已提交
308

309 310 311 312 313 314
    if (pRes->rspLen > 0) {
      char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
      if (tmp == NULL) {
        pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      } else {
        pRes->pRsp = tmp;
315
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
316
      }
317 318
    } else {
      pRes->pRsp = NULL;
S
slguan 已提交
319 320 321
    }

    // ignore the error information returned from mnode when set ignore flag in sql
S
slguan 已提交
322
    if (pRes->code == TSDB_CODE_DB_ALREADY_EXIST && pCmd->existsCheck && pRes->rspType == TSDB_MSG_TYPE_CM_CREATE_DB_RSP) {
H
hzcheng 已提交
323 324 325 326 327 328 329
      pRes->code = TSDB_CODE_SUCCESS;
    }

    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
330
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
331
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
332 333 334 335 336 337 338
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
339
      tscTrace("%p cmd:%d code:%s, inserted rows:%d, rsp len:%d", pSql, pCmd->command, tstrerror(pRes->code),
H
hjxilinx 已提交
340
          pMsg->affectedRows, pRes->rspLen);
S
slguan 已提交
341
    } else {
H
hjxilinx 已提交
342
      tscTrace("%p cmd:%d code:%s rsp len:%d", pSql, pCmd->command, tstrerror(pRes->code), pRes->rspLen);
H
hzcheng 已提交
343 344 345
    }
  }

346 347
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command])
    rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
hjxilinx 已提交
348
  
349
  if (rpcMsg->code != TSDB_CODE_ACTION_IN_PROGRESS) {
350
    void *taosres = tscKeepConn[pCmd->command] ? pSql : NULL;
H
[td-32]  
hjxilinx 已提交
351
    rpcMsg->code = pRes->code ? pRes->code : pRes->numOfRows;
352
    
353
    tscTrace("%p SQL result:%s res:%p", pSql, tstrerror(pRes->code), pSql);
H
hzcheng 已提交
354

355 356 357 358 359 360 361 362 363
    /*
     * Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj
     * may be freed in UDF, and reused by other threads before tscShouldFreeAsyncSqlObj called, in which case
     * tscShouldFreeAsyncSqlObj checks an object which is actually allocated by other threads.
     *
     * If this block of memory is re-allocated for an insert thread, in which tscKeepConn[command] equals to 0,
     * the tscShouldFreeAsyncSqlObj will success and tscFreeSqlObj free it immediately.
     */
    bool shouldFree = tscShouldFreeAsyncSqlObj(pSql);
H
hjxilinx 已提交
364
    (*pSql->fp)(pSql->param, taosres, rpcMsg->code);
H
hzcheng 已提交
365

366
    if (shouldFree) {
367
      tscTrace("%p sqlObj is automatically freed", pSql);
sangshuduo's avatar
sangshuduo 已提交
368
      tscFreeSqlObj(pSql);
H
hzcheng 已提交
369 370 371
    }
  }

372
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
373 374
}

S
slguan 已提交
375 376 377 378
int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

H
hjxilinx 已提交
379 380 381 382 383 384 385
  if (pCmd->command == TSDB_SQL_SELECT ||
      pCmd->command == TSDB_SQL_FETCH ||
      pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT ||
      pCmd->command == TSDB_SQL_CONNECT ||
      pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META ||
H
hjxilinx 已提交
386
      pCmd->command == TSDB_SQL_STABLEVGROUP) {
387
    tscBuildMsg[pCmd->command](pSql, NULL);
S
slguan 已提交
388
  }
389 390 391

  int32_t code = tscSendMsgToServer(pSql);
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
392
    pRes->code = code;
H
hjxilinx 已提交
393
    tscQueueAsyncRes(pSql);
S
slguan 已提交
394
  }
H
hjxilinx 已提交
395 396
  
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
397 398 399
}

int tscProcessSql(SSqlObj *pSql) {
400 401 402
  char *   name = NULL;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
403 404
  
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
405
  STableMetaInfo *pTableMetaInfo = NULL;
406
  uint16_t        type = 0;
407

408
  if (pQueryInfo != NULL) {
H
hjxilinx 已提交
409
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
410 411
    if (pTableMetaInfo != NULL) {
      name = pTableMetaInfo->name;
412
    }
413

414
    type = pQueryInfo->type;
415
  
416
    // for heartbeat, numOfTables == 0;
417
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
418
  }
419

420
  tscTrace("%p SQL cmd:%d will be processed, name:%s, type:%d", pSql, pCmd->command, name, type);
H
hzcheng 已提交
421
  if (pSql->cmd.command < TSDB_SQL_MGMT) {
H
hjxilinx 已提交
422 423
    // the pTableMetaInfo cannot be NULL
    if (pTableMetaInfo == NULL) {
H
hjxilinx 已提交
424 425 426
      pSql->res.code = TSDB_CODE_OTHERS;
      return pSql->res.code;
    }
H
hzcheng 已提交
427
  } else if (pSql->cmd.command < TSDB_SQL_LOCAL) {
H
hjxilinx 已提交
428
    pSql->ipList = tscMgmtIpList;
H
hzcheng 已提交
429 430 431 432
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }

S
slguan 已提交
433
  // todo handle async situation
434 435
  if (QUERY_IS_JOIN_QUERY(type)) {
    if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) {
H
hjxilinx 已提交
436
      return tscHandleMasterJoinQuery(pSql);
S
slguan 已提交
437 438
    } else {
      // for first stage sub query, iterate all vnodes to get all timestamp
439
      if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
S
slguan 已提交
440 441 442 443
        return doProcessSql(pSql);
      }
    }
  }
444
  
H
hjxilinx 已提交
445 446 447 448 449
  if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query
    tscHandleMasterSTableQuery(pSql);
    return pRes->code;
  } else if (pSql->fp == (void(*)())tscHandleMultivnodeInsert) {  // multi-vnodes insertion
    tscHandleMultivnodeInsert(pSql);
H
hzcheng 已提交
450 451
    return pSql->res.code;
  }
452
  
S
slguan 已提交
453 454
  return doProcessSql(pSql);
}
H
hzcheng 已提交
455 456

void tscKillMetricQuery(SSqlObj *pSql) {
457 458 459
  SSqlCmd* pCmd = &pSql->cmd;
  
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
460
  if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
H
hzcheng 已提交
461 462 463 464 465 466
    return;
  }

  for (int i = 0; i < pSql->numOfSubs; ++i) {
    SSqlObj *pSub = pSql->pSubs[i];

S
slguan 已提交
467
    if (pSub == NULL) {
H
hzcheng 已提交
468 469
      continue;
    }
S
slguan 已提交
470

H
hzcheng 已提交
471 472 473 474 475
    /*
     * here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
     * sub-queries not correctly released and master sql object of metric query reaches an abnormal state.
     */
    pSql->pSubs[i]->res.code = TSDB_CODE_QUERY_CANCELLED;
S
slguan 已提交
476
    //taosStopRpcConn(pSql->pSubs[i]->thandle);
H
hzcheng 已提交
477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497
  }

  /*
   * 1. if the subqueries are not launched or partially launched, we need to waiting the launched
   * query return to successfully free allocated resources.
   * 2. if no any subqueries are launched yet, which means the metric query only in parse sql stage,
   * set the res.code, and return.
   */
  const int64_t MAX_WAITING_TIME = 10000;  // 10 Sec.
  int64_t       stime = taosGetTimestampMs();

  while (pSql->cmd.command != TSDB_SQL_RETRIEVE_METRIC && pSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
    taosMsleep(100);
    if (taosGetTimestampMs() - stime > MAX_WAITING_TIME) {
      break;
    }
  }

  tscTrace("%p metric query is cancelled", pSql);
}

498
int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
499 500 501 502 503
  char *pMsg, *pStart;

  pStart = pSql->cmd.payload + tsRpcHeadSize;
  pMsg = pStart;

504
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *)pMsg;
S
slguan 已提交
505
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
S
slguan 已提交
506 507
  pMsg += sizeof(pSql->res.qhandle);

508
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
S
slguan 已提交
509
  pRetrieveMsg->free = htons(pQueryInfo->type);
510
  pMsg += sizeof(pQueryInfo->type);
H
hzcheng 已提交
511

512 513 514
  // todo valid the vgroupId at the client side
  if (UTIL_TABLE_IS_SUPERTABLE(pQueryInfo->pTableMetaInfo[0])) {
    SVgroupsInfo* pVgroupInfo = pQueryInfo->pTableMetaInfo[0]->vgroupList;
H
hjxilinx 已提交
515
    assert(pVgroupInfo->numOfVgroups == 1); // todo fix me
516
    
H
hjxilinx 已提交
517
    pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[0].vgId);
518 519
  } else {
    STableMeta* pTableMeta = pQueryInfo->pTableMetaInfo[0]->pTableMeta;
H
hjxilinx 已提交
520
    pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
521 522
  }
  
523 524 525 526
  pMsg += sizeof(SRetrieveTableMsg);
  
  pRetrieveMsg->header.contLen = htonl(pSql->cmd.payloadLen);
  
S
slguan 已提交
527
  pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE;
528
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
529 530
}

531
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hjxilinx 已提交
532
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
533
  STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
H
hjxilinx 已提交
534
  
535 536 537 538
  char* pMsg = pSql->cmd.payload + tsRpcHeadSize;
  
  // NOTE: shell message size should not include SMsgDesc
  int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
H
hjxilinx 已提交
539
  
540
  SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
541 542
  
  pMsgDesc->numOfVnodes = htonl(1);       //todo set the right number of vnodes
543
  pMsg += sizeof(SMsgDesc);
H
hjxilinx 已提交
544
  
545
  SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
H
hjxilinx 已提交
546
  int32_t vgId = pTableMeta->vgroupInfo.vgId;
547
  
H
hjxilinx 已提交
548
  pShellMsg->header.vgId = htonl(vgId);
549
  pShellMsg->header.contLen = htonl(size);
550
  pShellMsg->length = pShellMsg->header.contLen;
H
hjxilinx 已提交
551
  
552
  pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit);  // number of meters to be inserted
H
hzcheng 已提交
553

H
hjxilinx 已提交
554
  // pSql->cmd.payloadLen is set during copying data into payload
S
slguan 已提交
555
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
556
  tscSetDnodeIpList(pSql, pTableMeta);
557
  
H
hjxilinx 已提交
558
  tscTrace("%p build submit msg, vgId:%d numOfVgroup:%d", pSql, vgId, htonl(pMsgDesc->numOfVnodes));
559
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
560 561 562 563 564 565
}

/*
 * for meter query, simply return the size <= 1k
 * for metric query, estimate size according to meter tags
 */
566
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
567
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
568
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
569

570
  int32_t srcColListSize = pQueryInfo->colList.numOfCols * sizeof(SColumnInfo);
571

H
hjxilinx 已提交
572
  int32_t         exprSize = sizeof(SSqlFuncExprMsg) * pQueryInfo->exprsInfo.numOfExprs;
H
hjxilinx 已提交
573
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
574 575

  // meter query without tags values
H
hjxilinx 已提交
576
  if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
S
slguan 已提交
577
    return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize;
H
hzcheng 已提交
578
  }
H
hjxilinx 已提交
579 580 581 582
  
  int32_t size = 4096;
  
#if 0
H
hjxilinx 已提交
583
  SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
H
hjxilinx 已提交
584
  SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vgroupIndex);
H
hzcheng 已提交
585

586
  int32_t meterInfoSize = (pMetricMeta->tagLen + sizeof(STableIdInfo)) * pVnodeSidList->numOfSids;
H
hjxilinx 已提交
587
  int32_t outputColumnSize = pQueryInfo->exprsInfo.numOfExprs * sizeof(SSqlFuncExprMsg);
H
hzcheng 已提交
588

S
slguan 已提交
589
  int32_t size = meterInfoSize + outputColumnSize + srcColListSize + exprSize + MIN_QUERY_MSG_PKT_SIZE;
590 591
  if (pQueryInfo->tsBuf != NULL) {
    size += pQueryInfo->tsBuf->fileSize;
S
slguan 已提交
592
  }
H
hjxilinx 已提交
593 594
#endif
  
S
slguan 已提交
595
  return size;
H
hzcheng 已提交
596 597
}

H
hjxilinx 已提交
598
static char *doSerializeTableInfo(SSqlObj *pSql, int32_t vgId, char *pMsg) {
599
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
600

H
hjxilinx 已提交
601
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
602 603 604 605 606 607 608 609
  tscTrace("%p vgId:%d, query on table:%s, uid:%" PRIu64, pSql, vgId, pTableMetaInfo->name, pTableMeta->uid);
  
  STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
  pTableIdInfo->sid = htonl(pTableMeta->sid);
  pTableIdInfo->uid = htobe64(pTableMeta->uid);
  pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid));
  
  pMsg += sizeof(STableIdInfo);
610 611 612
  return pMsg;
}

613
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
614 615
  SSqlCmd *pCmd = &pSql->cmd;

616
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
617

S
slguan 已提交
618 619 620 621
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
    return -1;
  }
622
  
623
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
624
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
625
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
626 627 628 629 630
  
  if (pQueryInfo->colList.numOfCols <= 0) {
    tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
    return -1;
  }
631 632 633 634 635 636 637 638 639 640
  
  if (pQueryInfo->intervalTime < 0) {
    tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
    return -1;
  }
  
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
    return -1;
  }
641 642

  char *pStart = pCmd->payload + tsRpcHeadSize;
H
hzcheng 已提交
643

S
slguan 已提交
644
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart;
H
hzcheng 已提交
645 646

  int32_t msgLen = 0;
S
slguan 已提交
647
  int32_t numOfTables = 0;
H
hzcheng 已提交
648

H
hjxilinx 已提交
649
  if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
S
slguan 已提交
650
    numOfTables = 1;
651
    tscSetDnodeIpList(pSql, pTableMeta);
H
hjxilinx 已提交
652
    pQueryMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId);
H
hjxilinx 已提交
653
    tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name);
654
  } else {  // query super table
655
    int32_t index = pTableMetaInfo->vgroupIndex;
656
    
657 658
    if (index < 0) {
      tscError("%p error vgroupIndex:%d", pSql, index);
H
hzcheng 已提交
659 660
      return -1;
    }
H
hjxilinx 已提交
661
    
662 663 664
    SCMVgroupInfo* pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
    
    pSql->ipList.numOfIps = pVgroupInfo->numOfIps; // todo fix me
665 666
    pSql->ipList.port     = tsDnodeShellPort;
    pSql->ipList.inUse    = 0;
667
  
668 669 670
    for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) {
      pSql->ipList.ip[i] = pVgroupInfo->ipAddr[i].ip;
    }
H
hjxilinx 已提交
671 672
    
#if 0
H
hjxilinx 已提交
673
    SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vgroupIndex);
H
hzcheng 已提交
674 675
    uint32_t       vnodeId = pVnodeSidList->vpeerDesc[pVnodeSidList->index].vnode;

S
slguan 已提交
676 677 678
    numOfTables = pVnodeSidList->numOfSids;
    if (numOfTables <= 0) {
      tscError("%p vid:%d,error numOfTables in query message:%d", pSql, vnodeId, numOfTables);
H
hzcheng 已提交
679 680
      return -1;  // error
    }
H
hjxilinx 已提交
681 682
#endif
    
683
    tscTrace("%p query on super table, numOfVgroup:%d, vgroupIndex:%d", pSql, pTableMetaInfo->vgroupList->numOfVgroups, index);
684
    
H
hjxilinx 已提交
685
    pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
H
hjxilinx 已提交
686
    numOfTables = 1;
H
hzcheng 已提交
687 688
  }

689
  if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
H
hjxilinx 已提交
690 691
    pQueryMsg->window.skey = htobe64(pQueryInfo->stime);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->etime);
H
hzcheng 已提交
692
  } else {
H
hjxilinx 已提交
693 694
    pQueryMsg->window.skey = htobe64(pQueryInfo->etime);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->stime);
H
hzcheng 已提交
695 696
  }

697
  pQueryMsg->numOfTables    = htonl(numOfTables);
698 699 700 701 702 703 704 705
  pQueryMsg->order          = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId     = htons(pQueryInfo->order.orderColId);
  pQueryMsg->interpoType    = htons(pQueryInfo->interpoType);
  pQueryMsg->limit          = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset         = htobe64(pQueryInfo->limit.offset);
  pQueryMsg->numOfCols      = htons(pQueryInfo->colList.numOfCols);
  pQueryMsg->intervalTime   = htobe64(pQueryInfo->intervalTime);
  pQueryMsg->slidingTime    = htobe64(pQueryInfo->slidingTime);
H
hjxilinx 已提交
706
  pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
707
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
H
hzcheng 已提交
708

709 710
  pQueryMsg->queryType = htons(pQueryInfo->type);
  pQueryMsg->numOfOutputCols = htons(pQueryInfo->exprsInfo.numOfExprs);
H
hzcheng 已提交
711

H
hjxilinx 已提交
712 713 714
  int32_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutputCols;
  if (numOfOutput < 0) {
    tscError("%p illegal value of number of output columns in query msg: %d", pSql, numOfOutput);
H
hzcheng 已提交
715 716 717 718
    return -1;
  }

  // set column list ids
H
hjxilinx 已提交
719
  char *pMsg = (char *)(pQueryMsg->colList) + pQueryInfo->colList.numOfCols * sizeof(SColumnInfo);
H
hjxilinx 已提交
720
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
H
hzcheng 已提交
721

722 723
  for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) {
    SColumnBase *pCol = tscColumnBaseInfoGet(&pQueryInfo->colList, i);
S
slguan 已提交
724
    SSchema *    pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
725

H
hjxilinx 已提交
726 727 728 729 730 731 732 733
//    if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
//        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
//      tscError("%p vid:%d sid:%d id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s", pSql,
//               htons(pQueryMsg->vnode), pTableMeta->sid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex,
//               pColSchema->name);
//
//      return -1;  // 0 means build msg failed
//    }
H
hzcheng 已提交
734 735 736

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
737
    pQueryMsg->colList[i].type  = htons(pColSchema->type);
S
slguan 已提交
738
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
739

S
slguan 已提交
740 741 742
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
743

S
slguan 已提交
744 745 746 747
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
      pFilterMsg->filterOnBinary = htons(pColFilter->filterOnBinary);

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
748

S
slguan 已提交
749 750 751 752 753 754 755 756 757 758 759
      if (pColFilter->filterOnBinary) {
        pFilterMsg->len = htobe64(pColFilter->len);
        memcpy(pMsg, (void *)pColFilter->pz, pColFilter->len + 1);
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
760

S
slguan 已提交
761 762 763 764 765
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
        return -1;
      }
    }
H
hzcheng 已提交
766 767 768 769
  }

  bool hasArithmeticFunction = false;

S
slguan 已提交
770
  SSqlFuncExprMsg *pSqlFuncExpr = (SSqlFuncExprMsg *)pMsg;
H
hjxilinx 已提交
771
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
772
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
773

S
slguan 已提交
774
    if (pExpr->functionId == TSDB_FUNC_ARITHM) {
H
hzcheng 已提交
775 776 777
      hasArithmeticFunction = true;
    }

H
hjxilinx 已提交
778
    if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId)) {
779
      /* column id is not valid according to the cached metermeta, the table meta is expired */
H
hzcheng 已提交
780 781 782 783
      tscError("%p table schema is not matched with parsed sql", pSql);
      return -1;
    }

784 785 786
    pSqlFuncExpr->colInfo.colId    = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
    pSqlFuncExpr->colInfo.flag     = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
787

788
    pSqlFuncExpr->functionId  = htons(pExpr->functionId);
H
hzcheng 已提交
789 790 791 792 793 794 795 796 797
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
    pMsg += sizeof(SSqlFuncExprMsg);

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
S
slguan 已提交
798 799 800

        // by plus one char to make the string null-terminated
        pMsg += pExpr->param[j].nLen + 1;
H
hzcheng 已提交
801 802 803 804 805 806 807 808 809 810
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

    pSqlFuncExpr = (SSqlFuncExprMsg *)pMsg;
  }

  int32_t len = 0;
  if (hasArithmeticFunction) {
811 812
    SColumnBase *pColBase = pQueryInfo->colList.pColList;
    for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) {
S
slguan 已提交
813
      char *  name = pSchema[pColBase[i].colIndex.columnIndex].name;
H
hzcheng 已提交
814 815 816 817 818 819 820 821 822 823 824
      int32_t lenx = strlen(name);
      memcpy(pMsg, name, lenx);
      *(pMsg + lenx) = ',';

      len += (lenx + 1);  // one for comma
      pMsg += (lenx + 1);
    }
  }

  pQueryMsg->colNameLen = htonl(len);

825
  // serialize the table info (sid, uid, tags)
H
hjxilinx 已提交
826
  pMsg = doSerializeTableInfo(pSql, htons(pQueryMsg->head.vgId), pMsg);
H
hzcheng 已提交
827

828
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
829
  if (pGroupbyExpr->numOfGroupCols > 0) {
S
slguan 已提交
830
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
831 832
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
833
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
834
      SColIndex *pCol = &pGroupbyExpr->columnInfo[j];
S
slguan 已提交
835 836 837 838

      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

839 840
      *((int16_t *)pMsg) += pCol->colIndex;
      pMsg += sizeof(pCol->colIndex);
S
slguan 已提交
841 842 843

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
844 845 846
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
847 848 849
    }
  }

850 851 852 853
  if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
      *((int64_t *)pMsg) = htobe64(pQueryInfo->defaultVal[i]);
      pMsg += sizeof(pQueryInfo->defaultVal[0]);
S
slguan 已提交
854 855 856 857 858 859 860 861
    }
  }

  // compressed ts block
  pQueryMsg->tsOffset = htonl(pMsg - pStart);
  int32_t tsLen = 0;
  int32_t numOfBlocks = 0;

862
  if (pQueryInfo->tsBuf != NULL) {
H
hjxilinx 已提交
863
    STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vgroupIndex);
864
    assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL);  // this query should not be sent
S
slguan 已提交
865 866

    // todo refactor
867 868
    fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET);
    fread(pMsg, pBlockInfo->compLen, 1, pQueryInfo->tsBuf->f);
S
slguan 已提交
869 870 871 872

    pMsg += pBlockInfo->compLen;
    tsLen = pBlockInfo->compLen;
    numOfBlocks = pBlockInfo->numOfBlocks;
H
hzcheng 已提交
873 874
  }

S
slguan 已提交
875 876
  pQueryMsg->tsLen = htonl(tsLen);
  pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
877 878
  if (pQueryInfo->tsBuf != NULL) {
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
H
hzcheng 已提交
879 880
  }

H
hjxilinx 已提交
881
  // serialize tag column query condition
882
  if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
883 884
    STagCond* pTagCond = &pQueryInfo->tagCond;
    
885
    SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->uid);
886
    if (pCond != NULL && pCond->cond != NULL) {
887 888
      pQueryMsg->tagCondLen = htons(pCond->len);
      memcpy(pMsg, pCond->cond, pCond->len);
889
      
890
      pMsg += pCond->len;
891 892 893
    }
  }
  
H
hjxilinx 已提交
894
  // tbname in/like query expression should be sent to mgmt node
H
hzcheng 已提交
895 896 897 898
  msgLen = pMsg - pStart;

  tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
  pCmd->payloadLen = msgLen;
S
slguan 已提交
899
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hjxilinx 已提交
900
  
901
  pQueryMsg->head.contLen = htonl(msgLen);
H
hzcheng 已提交
902
  assert(msgLen + minMsgSize() <= size);
903 904

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
905 906
}

907 908
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
909
  pCmd->payloadLen = sizeof(SCMCreateDbMsg);
S
slguan 已提交
910
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
H
hzcheng 已提交
911

912
  SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload;
913

914
  assert(pCmd->numOfClause == 1);
915
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
916
  strncpy(pCreateDbMsg->db, pTableMetaInfo->name, tListLen(pCreateDbMsg->db));
H
hzcheng 已提交
917

918
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
919 920
}

921 922
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
923
  pCmd->payloadLen = sizeof(SCMCreateDnodeMsg);
S
slguan 已提交
924 925 926 927
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
928

929
  SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload;
930
  strncpy(pCreate->ip, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
S
slguan 已提交
931
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
H
hzcheng 已提交
932

933
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
934 935
}

936 937
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
938
  pCmd->payloadLen = sizeof(SCMCreateAcctMsg);
S
slguan 已提交
939 940 941 942
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
943

944
  SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
945

946 947
  SSQLToken *pName = &pInfo->pDCLInfo->user.user;
  SSQLToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
948

949 950
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
951

952
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
953

954 955 956 957 958 959 960 961
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
962

963 964 965 966 967 968 969 970 971 972 973 974 975
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
976

S
slguan 已提交
977
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
978
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
979 980
}

981 982
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
983
  pCmd->payloadLen = sizeof(SCMCreateUserMsg);
S
slguan 已提交
984

S
slguan 已提交
985 986 987 988 989
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

990
  SCMCreateUserMsg *pAlterMsg = (SCMCreateUserMsg*)pCmd->payload;
H
hzcheng 已提交
991

992 993 994
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
  pAlterMsg->flag = pUser->type;
H
hzcheng 已提交
995

996 997 998 999
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
1000 1001
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
1002
  }
H
hzcheng 已提交
1003

1004
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
S
slguan 已提交
1005
    pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
1006
  } else {
S
slguan 已提交
1007
    pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
1008
  }
H
hzcheng 已提交
1009

1010
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1011 1012
}

1013 1014
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1015
  pCmd->payloadLen = sizeof(SCMCfgDnodeMsg);
H
hzcheng 已提交
1016

S
slguan 已提交
1017 1018 1019 1020
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1021

S
slguan 已提交
1022
  pCmd->msgType = TSDB_MSG_TYPE_MD_CONFIG_DNODE;
1023 1024
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
1025

1026 1027
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1028
  pCmd->payloadLen = sizeof(SCMDropDbMsg);
H
hzcheng 已提交
1029

S
slguan 已提交
1030 1031 1032 1033 1034
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

1035
  SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
1036

1037
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1038
  strncpy(pDropDbMsg->db, pTableMetaInfo->name, tListLen(pDropDbMsg->db));
1039
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1040

S
slguan 已提交
1041
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
1042
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1043 1044
}

1045 1046
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1047
  pCmd->payloadLen = sizeof(SCMDropTableMsg);
H
hzcheng 已提交
1048

S
slguan 已提交
1049 1050 1051 1052
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1053

1054
  SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
1055
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1056
  strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
S
slguan 已提交
1057
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1058

S
slguan 已提交
1059
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
1060
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1061 1062
}

1063
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1064
  SSqlCmd *pCmd = &pSql->cmd;
1065
  pCmd->payloadLen = sizeof(SCMDropDnodeMsg);
S
slguan 已提交
1066 1067 1068 1069
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1070

1071
  SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
1072
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1073
  strcpy(pDrop->ip, pTableMetaInfo->name);
S
slguan 已提交
1074
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
H
hzcheng 已提交
1075

1076
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1077 1078
}

S
[TD-16]  
slguan 已提交
1079
int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1080
  SSqlCmd *pCmd = &pSql->cmd;
1081
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
S
slguan 已提交
1082
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
H
hzcheng 已提交
1083

S
slguan 已提交
1084 1085 1086 1087
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1088

1089
  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
1090
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1091
  strcpy(pDropMsg->user, pTableMetaInfo->name);
H
hzcheng 已提交
1092

1093
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1094 1095
}

S
[TD-16]  
slguan 已提交
1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_ACCT;

  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
  strcpy(pDropMsg->user, pTableMetaInfo->name);

  return TSDB_CODE_SUCCESS;
}

1113 1114
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1115
  pCmd->payloadLen = sizeof(SCMUseDbMsg);
H
hzcheng 已提交
1116

S
slguan 已提交
1117 1118 1119 1120
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
1121

1122
  SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload;
1123
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1124
  strcpy(pUseDbMsg->db, pTableMetaInfo->name);
S
slguan 已提交
1125
  pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
H
hzcheng 已提交
1126

1127
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1128 1129
}

1130
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1131
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1132
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1133
  pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
1134
  pCmd->payloadLen = sizeof(SCMShowMsg) + 100;
H
hzcheng 已提交
1135

S
slguan 已提交
1136 1137 1138
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
1139
  }
H
hzcheng 已提交
1140

1141
  SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload;
S
slguan 已提交
1142

1143
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1144
  size_t nameLen = strlen(pTableMetaInfo->name);
S
slguan 已提交
1145
  if (nameLen > 0) {
H
hjxilinx 已提交
1146
    strcpy(pShowMsg->db, pTableMetaInfo->name);  // prefix is set here
H
hzcheng 已提交
1147
  } else {
S
slguan 已提交
1148
    strcpy(pShowMsg->db, pObj->db);
H
hzcheng 已提交
1149 1150
  }

1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
    SSQLToken *pPattern = &pShowInfo->pattern;
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
    SSQLToken *pIpAddr = &pShowInfo->prefix;
    assert(pIpAddr->n > 0 && pIpAddr->type > 0);
H
hzcheng 已提交
1163

1164 1165 1166 1167
    strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n);
    pShowMsg->payloadLen = htons(pIpAddr->n);
  }

1168
  pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
1169
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1170 1171
}

1172
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1173
  SSqlCmd *pCmd = &pSql->cmd;
1174
  pCmd->payloadLen = sizeof(SCMKillQueryMsg);
H
hzcheng 已提交
1175

S
slguan 已提交
1176 1177 1178 1179
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1180

1181
  SCMKillQueryMsg *pKill = (SCMKillQueryMsg*)pCmd->payload;
1182 1183 1184
  strncpy(pKill->queryId, pInfo->pDCLInfo->ip.z, pInfo->pDCLInfo->ip.n);
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
S
slguan 已提交
1185
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
1186 1187
      break;
    case TSDB_SQL_KILL_CONNECTION:
S
slguan 已提交
1188
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
1189 1190
      break;
    case TSDB_SQL_KILL_STREAM:
S
slguan 已提交
1191
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
1192 1193 1194
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1195 1196
}

1197
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1198 1199
  SSqlCmd *pCmd = &(pSql->cmd);

1200
  int32_t size = minMsgSize() + sizeof(SMgmtHead) + sizeof(SCMCreateTableMsg);
H
hzcheng 已提交
1201

1202
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1203
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
1204 1205
    size += sizeof(STagData);
  } else {
S
slguan 已提交
1206
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1207
  }
1208

1209 1210 1211
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1212 1213 1214 1215

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1216
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1217
  int              msgLen = 0;
S
slguan 已提交
1218
  SSchema *        pSchema;
H
hzcheng 已提交
1219
  int              size = 0;
1220 1221 1222
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1223
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1224 1225

  // Reallocate the payload size
1226
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1227 1228
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1229
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
1230
  }
H
hzcheng 已提交
1231 1232


1233
  SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
H
hjxilinx 已提交
1234
  strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
1235 1236

  // use dbinfo from table id without modifying current db info
H
hjxilinx 已提交
1237
  tscGetDBInfoFromMeterId(pTableMetaInfo->name, pCreateTableMsg->db);
H
hzcheng 已提交
1238

1239 1240 1241
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;
H
hzcheng 已提交
1242 1243 1244 1245
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
S
slguan 已提交
1246
  char *pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
1247

1248 1249 1250
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
    memcpy(pMsg, &pInfo->pCreateTableInfo->usingInfo.tagdata, sizeof(STagData));
H
hzcheng 已提交
1251
    pMsg += sizeof(STagData);
1252
  } else {  // create (super) table
1253
    pSchema = (SSchema *)pCreateTableMsg->schema;
1254

H
hzcheng 已提交
1255
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
1256
      TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
H
hzcheng 已提交
1257 1258 1259 1260

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
1261

H
hzcheng 已提交
1262 1263 1264 1265
      pSchema++;
    }

    pMsg = (char *)pSchema;
1266 1267
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
1268

1269 1270 1271
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
1272 1273 1274
    }
  }

1275
  tscClearFieldInfo(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
1276

S
slguan 已提交
1277
  msgLen = pMsg - (char*)pCreateTableMsg;
S
slguan 已提交
1278
  pCreateTableMsg->contLen = htonl(msgLen);
H
hzcheng 已提交
1279
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1280
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
H
hzcheng 已提交
1281 1282

  assert(msgLen + minMsgSize() <= size);
1283
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1284 1285 1286
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
1287
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
1288
  return minMsgSize() + sizeof(SMgmtHead) + sizeof(SCMAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
1289 1290 1291
         TSDB_EXTRA_PAYLOAD_SIZE;
}

1292
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1293
  SCMAlterTableMsg *pAlterTableMsg;
1294
  char *          pMsg;
H
hzcheng 已提交
1295 1296 1297
  int             msgLen = 0;
  int             size = 0;

1298 1299 1300
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
1301
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1302 1303

  size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
1304 1305 1306 1307
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1308

1309
  pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
H
hzcheng 已提交
1310

H
hjxilinx 已提交
1311
  tscGetDBInfoFromMeterId(pTableMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
1312

1313 1314
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;

H
hjxilinx 已提交
1315
  strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
1316
  pAlterTableMsg->type = htons(pAlterInfo->type);
1317

1318
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
1319
  memcpy(pAlterTableMsg->tagVal, pAlterInfo->tagData.data, TSDB_MAX_TAGS_LEN);
H
hzcheng 已提交
1320

S
slguan 已提交
1321
  SSchema *pSchema = pAlterTableMsg->schema;
1322 1323
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
    TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
H
hzcheng 已提交
1324 1325 1326 1327 1328 1329 1330 1331 1332

    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;

S
slguan 已提交
1333
  msgLen = pMsg - (char*)pAlterTableMsg;
H
hzcheng 已提交
1334
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1335
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
H
hzcheng 已提交
1336 1337

  assert(msgLen + minMsgSize() <= size);
1338

1339
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1340 1341
}

1342
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1343
  SSqlCmd *pCmd = &pSql->cmd;
1344
  pCmd->payloadLen = sizeof(SCMAlterDbMsg);
S
slguan 已提交
1345
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
H
hzcheng 已提交
1346

S
slguan 已提交
1347 1348 1349 1350
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1351

1352
  SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
1353
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1354
  strcpy(pAlterDbMsg->db, pTableMetaInfo->name);
H
hzcheng 已提交
1355

1356
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1357 1358
}

1359
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1360 1361 1362
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_RETRIEVE;
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
1363

S
slguan 已提交
1364 1365 1366
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
H
hzcheng 已提交
1367
  }
S
slguan 已提交
1368

S
slguan 已提交
1369 1370 1371 1372
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
1373

1374
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1375 1376
}

1377
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
1378
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1379 1380 1381
    return pRes->code;
  }

1382
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
H
hjxilinx 已提交
1383 1384
    int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
    pRes->tsrow[i] = (pRes->data + offset * pRes->numOfRows);
H
hzcheng 已提交
1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
1399

1400
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1401

H
hzcheng 已提交
1402 1403 1404 1405 1406 1407 1408
  pRes->code = TSDB_CODE_SUCCESS;

  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

1409
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1410
  } else {
S
slguan 已提交
1411
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
1427
  SSqlCmd *       pCmd = &pSql->cmd;
1428
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
1429

H
hjxilinx 已提交
1430
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1431 1432
  
  int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
H
hzcheng 已提交
1433 1434 1435 1436
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

int tscProcessTagRetrieveRsp(SSqlObj *pSql) {
1437
//  SSqlCmd *pCmd = &pSql->cmd;
1438

1439 1440
//  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
//  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1441 1442

  int32_t numOfRes = 0;
H
hjxilinx 已提交
1443
#if 0
1444
  if (tscSqlExprGet(pQueryInfo, 0)->functionId == TSDB_FUNC_TAGPRJ) {
H
hjxilinx 已提交
1445
    numOfRes = pTableMetaInfo->pMetricMeta->numOfTables;
H
hzcheng 已提交
1446 1447 1448
  } else {
    numOfRes = 1;  // for count function, there is only one output.
  }
H
hjxilinx 已提交
1449 1450 1451
  
#endif

H
hzcheng 已提交
1452 1453 1454 1455 1456 1457 1458
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

int tscProcessRetrieveMetricRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
1459 1460
  pRes->code = tscDoLocalreduce(pSql);
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1461 1462

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
1463
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1464 1465 1466 1467
  }

  pRes->row = 0;

1468
  uint8_t code = pRes->code;
H
hzcheng 已提交
1469
  if (pSql->fp) {  // async retrieve metric data
1470 1471
    if (pRes->code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
H
hzcheng 已提交
1472 1473 1474 1475 1476 1477 1478 1479
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

S
slguan 已提交
1480
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
1481

1482
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1483
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1484
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1485
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
1486
  pCmd->payloadLen = sizeof(SCMConnectMsg);
H
hzcheng 已提交
1487

S
slguan 已提交
1488 1489 1490 1491 1492
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

1493
  SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
H
hzcheng 已提交
1494 1495 1496 1497 1498

  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
  strcpy(pConnect->db, db);
S
slguan 已提交
1499
  strcpy(pConnect->clientVersion, version);
S
slguan 已提交
1500
  strcpy(pConnect->msgVersion, "");
H
hzcheng 已提交
1501

1502
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1503 1504
}

H
hjxilinx 已提交
1505
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1506
  SCMTableInfoMsg *pInfoMsg;
S
slguan 已提交
1507
  char *         pMsg;
H
hzcheng 已提交
1508 1509 1510 1511 1512
  int            msgLen = 0;

  char *tmpData = 0;
  if (pSql->cmd.allocSize > 0) {
    tmpData = calloc(1, pSql->cmd.allocSize);
1513 1514 1515 1516
    if (NULL == tmpData) {
      return TSDB_CODE_CLI_OUT_OF_MEMORY;
    }

H
hzcheng 已提交
1517 1518 1519 1520
    // STagData is in binary format, strncpy is not available
    memcpy(tmpData, pSql->cmd.payload, pSql->cmd.allocSize);
  }

1521 1522 1523
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
1524
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1525

1526
  pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
H
hjxilinx 已提交
1527
  strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
H
hjxilinx 已提交
1528
  pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
S
slguan 已提交
1529

1530
  pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
H
hzcheng 已提交
1531

H
hjxilinx 已提交
1532
  if (pSql->cmd.autoCreated) {
H
hzcheng 已提交
1533 1534 1535 1536
    memcpy(pInfoMsg->tags, tmpData, sizeof(STagData));
    pMsg += sizeof(STagData);
  }

H
hjxilinx 已提交
1537
  pCmd->payloadLen = pMsg - (char*)pInfoMsg;;
S
slguan 已提交
1538
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
H
hzcheng 已提交
1539 1540 1541 1542

  tfree(tmpData);

  assert(msgLen + minMsgSize() <= pCmd->allocSize);
1543
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1544 1545
}

S
slguan 已提交
1546
/**
1547
 *  multi table meta req pkg format:
1548
 *  | SMgmtHead | SCMMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
1549 1550
 *      no used         4B
 **/
1551
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
    tmpData = calloc(1, pCmd->payloadLen + 1);
    if (NULL == tmpData) return -1;
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
S
slguan 已提交
1564
  memset(pMgmt->db, 0, TSDB_TABLE_ID_LEN);  // server don't need the db
S
slguan 已提交
1565

1566
  SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
S
slguan 已提交
1567
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
1568 1569

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
1570
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
1571 1572 1573 1574
  }

  tfree(tmpData);

1575
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1576
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
S
slguan 已提交
1577 1578 1579

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

S
slguan 已提交
1580
  tscTrace("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
1581 1582 1583 1584 1585
           pCmd->payloadLen);

  return pCmd->payloadLen;
}

1586
static UNUSED_FUNC int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
H
hzcheng 已提交
1587
  const int32_t defaultSize =
S
slguan 已提交
1588
      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
1589
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hzcheng 已提交
1590

S
slguan 已提交
1591
  int32_t n = 0;
1592 1593 1594 1595
  size_t size = taosArrayGetSize(pQueryInfo->tagCond.pCond);
  for (int32_t i = 0; i < size; ++i) {
    assert(0);
//    n += strlen(pQueryInfo->tagCond.cond[i].cond);
H
hzcheng 已提交
1596
  }
S
slguan 已提交
1597

H
hjxilinx 已提交
1598
  int32_t tagLen = n * TSDB_NCHAR_SIZE;
1599 1600
  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
H
hjxilinx 已提交
1601
  }
1602

S
slguan 已提交
1603
  int32_t joinCondLen = (TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2;
S
slguan 已提交
1604
  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
H
hjxilinx 已提交
1605
  
1606
  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndex);
S
slguan 已提交
1607

H
hjxilinx 已提交
1608
  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
S
slguan 已提交
1609 1610

  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
H
hzcheng 已提交
1611 1612
}

H
hjxilinx 已提交
1613 1614 1615
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {

#if 0
S
slguan 已提交
1616
  SSuperTableMetaMsg *pMetaMsg;
H
hzcheng 已提交
1617 1618
  char *          pMsg, *pStart;
  int             msgLen = 0;
S
slguan 已提交
1619
  int             tableIndex = 0;
H
hzcheng 已提交
1620

1621 1622 1623
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

1624
  STagCond *pTagCond = &pQueryInfo->tagCond;
S
slguan 已提交
1625

H
hjxilinx 已提交
1626
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
H
hzcheng 已提交
1627 1628

  int32_t size = tscEstimateMetricMetaMsgSize(pCmd);
S
slguan 已提交
1629 1630 1631 1632
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for metric meter msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1633 1634 1635 1636 1637

  pStart = pCmd->payload + tsRpcHeadSize;
  pMsg = pStart;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
H
hjxilinx 已提交
1638
  tscGetDBInfoFromMeterId(pTableMetaInfo->name, pMgmt->db);
H
hzcheng 已提交
1639 1640 1641

  pMsg += sizeof(SMgmtHead);

S
slguan 已提交
1642
  pMetaMsg = (SSuperTableMetaMsg *)pMsg;
S
slguan 已提交
1643
  pMetaMsg->numOfTables = htonl(pQueryInfo->numOfTables);
S
slguan 已提交
1644

S
slguan 已提交
1645
  pMsg += sizeof(SSuperTableMetaMsg);
S
slguan 已提交
1646 1647 1648 1649 1650

  int32_t offset = pMsg - (char *)pMetaMsg;
  pMetaMsg->join = htonl(offset);

  // todo refactor
S
slguan 已提交
1651
  pMetaMsg->joinCondLen = htonl((TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2);
H
hzcheng 已提交
1652

S
slguan 已提交
1653
  memcpy(pMsg, pTagCond->joinInfo.left.tableId, TSDB_TABLE_ID_LEN);
S
slguan 已提交
1654
  pMsg += TSDB_TABLE_ID_LEN;
H
hzcheng 已提交
1655

S
slguan 已提交
1656 1657 1658
  *(int16_t *)pMsg = pTagCond->joinInfo.left.tagCol;
  pMsg += sizeof(int16_t);

S
slguan 已提交
1659
  memcpy(pMsg, pTagCond->joinInfo.right.tableId, TSDB_TABLE_ID_LEN);
S
slguan 已提交
1660
  pMsg += TSDB_TABLE_ID_LEN;
S
slguan 已提交
1661 1662 1663 1664

  *(int16_t *)pMsg = pTagCond->joinInfo.right.tagCol;
  pMsg += sizeof(int16_t);

1665
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
1666
    pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
H
hjxilinx 已提交
1667
    uint64_t uid = pTableMetaInfo->pTableMeta->uid;
S
slguan 已提交
1668 1669 1670 1671

    offset = pMsg - (char *)pMetaMsg;
    pMetaMsg->metaElem[i] = htonl(offset);

S
slguan 已提交
1672 1673
    SSuperTableMetaElemMsg *pElem = (SSuperTableMetaElemMsg *)pMsg;
    pMsg += sizeof(SSuperTableMetaElemMsg);
S
slguan 已提交
1674 1675 1676 1677

    // convert to unicode before sending to mnode for metric query
    int32_t condLen = 0;
    if (pTagCond->numOfTagCond > 0) {
1678
      SCond *pCond = tsGetSTableQueryCond(pTagCond, uid);
H
hjxilinx 已提交
1679
      if (pCond != NULL && pCond->cond != NULL) {
H
hjxilinx 已提交
1680
        condLen = strlen(pCond->cond) + 1;
1681

H
hjxilinx 已提交
1682
        bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE);
S
slguan 已提交
1683
        if (!ret) {
1684
          tscError("%p mbs to ucs4 failed:%s", pSql, tsGetSTableQueryCond(pTagCond, uid));
S
slguan 已提交
1685 1686 1687
          return 0;
        }
      }
H
hzcheng 已提交
1688 1689
    }

S
slguan 已提交
1690
    pElem->condLen = htonl(condLen);
H
hzcheng 已提交
1691

S
slguan 已提交
1692 1693 1694
    offset = pMsg - (char *)pMetaMsg;
    pElem->cond = htonl(offset);
    pMsg += condLen * TSDB_NCHAR_SIZE;
H
hzcheng 已提交
1695

S
slguan 已提交
1696 1697 1698
    pElem->rel = htons(pTagCond->relType);
    if (pTagCond->tbnameCond.uid == uid) {
      offset = pMsg - (char *)pMetaMsg;
H
hzcheng 已提交
1699

S
slguan 已提交
1700
      pElem->tableCond = htonl(offset);
H
hjxilinx 已提交
1701 1702 1703 1704 1705 1706 1707
      
      uint32_t len = 0;
      if (pTagCond->tbnameCond.cond != NULL) {
        len = strlen(pTagCond->tbnameCond.cond);
        memcpy(pMsg, pTagCond->tbnameCond.cond, len);
      }
      
H
hjxilinx 已提交
1708 1709
      pElem->tableCondLen = htonl(len);
      pMsg += len;
S
slguan 已提交
1710 1711
    }

1712
    SSqlGroupbyExpr *pGroupby = &pQueryInfo->groupbyExpr;
S
slguan 已提交
1713

H
hjxilinx 已提交
1714
    if (pGroupby->tableIndex != i && pGroupby->numOfGroupCols > 0) {
S
slguan 已提交
1715 1716 1717 1718 1719
      pElem->orderType = 0;
      pElem->orderIndex = 0;
      pElem->numOfGroupCols = 0;
    } else {
      pElem->numOfGroupCols = htons(pGroupby->numOfGroupCols);
H
hjxilinx 已提交
1720 1721
      for (int32_t j = 0; j < pTableMetaInfo->numOfTags; ++j) {
        pElem->tagCols[j] = htons(pTableMetaInfo->tagColumnIndex[j]);
S
slguan 已提交
1722 1723 1724 1725 1726 1727 1728 1729
      }

      if (pGroupby->numOfGroupCols != 0) {
        pElem->orderIndex = htons(pGroupby->orderIndex);
        pElem->orderType = htons(pGroupby->orderType);
        offset = pMsg - (char *)pMetaMsg;

        pElem->groupbyTagColumnList = htonl(offset);
1730
        for (int32_t j = 0; j < pQueryInfo->groupbyExpr.numOfGroupCols; ++j) {
1731 1732
          SColIndex *pCol = &pQueryInfo->groupbyExpr.columnInfo[j];
          SColIndex *pDestCol = (SColIndex *)pMsg;
1733

H
hjxilinx 已提交
1734
          pDestCol->colIdxInBuf = 0;
1735
          pDestCol->colIndex = htons(pCol->colIndex);
H
hjxilinx 已提交
1736 1737
          pDestCol->colId = htons(pDestCol->colId);
          pDestCol->flag = htons(pDestCol->flag);
H
hjxilinx 已提交
1738
          strncpy(pDestCol->name, pCol->name, tListLen(pCol->name));
1739

1740
          pMsg += sizeof(SColIndex);
S
slguan 已提交
1741 1742
        }
      }
H
hzcheng 已提交
1743
    }
S
slguan 已提交
1744

H
hjxilinx 已提交
1745 1746
    strcpy(pElem->tableId, pTableMetaInfo->name);
    pElem->numOfTags = htons(pTableMetaInfo->numOfTags);
S
slguan 已提交
1747 1748 1749

    int16_t len = pMsg - (char *)pElem;
    pElem->elemLen = htons(len);  // redundant data for integrate check
H
hzcheng 已提交
1750 1751 1752 1753
  }

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1754
  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
H
hzcheng 已提交
1755
  assert(msgLen + minMsgSize() <= size);
H
hjxilinx 已提交
1756
#endif
1757
  
H
hjxilinx 已提交
1758 1759 1760 1761 1762 1763 1764 1765 1766 1767
  SSqlCmd *pCmd = &pSql->cmd;

  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);

  SCMSTableVgroupMsg *pStableVgroupMsg = (SCMSTableVgroupMsg *) pCmd->payload;
  strncpy(pStableVgroupMsg->tableId, pTableMetaInfo->name, tListLen(pStableVgroupMsg->tableId));

  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
  pCmd->payloadLen = sizeof(SCMSTableVgroupMsg);

1768
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1769 1770
}

1771
int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) {
H
hzcheng 已提交
1772 1773 1774 1775
  int      size = 0;
  STscObj *pObj = pSql->pTscObj;

  size += tsRpcHeadSize + sizeof(SMgmtHead);
S
slguan 已提交
1776
  size += sizeof(SQqueryList);
H
hzcheng 已提交
1777 1778 1779

  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
S
slguan 已提交
1780
    size += sizeof(SQueryDesc);
H
hzcheng 已提交
1781 1782 1783
    tpSql = tpSql->next;
  }

S
slguan 已提交
1784
  size += sizeof(SStreamList);
H
hzcheng 已提交
1785 1786
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
S
slguan 已提交
1787
    size += sizeof(SStreamDesc);
H
hzcheng 已提交
1788 1789 1790 1791 1792 1793
    pStream = pStream->next;
  }

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1794
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1795 1796 1797 1798 1799 1800 1801 1802 1803
  char *pMsg, *pStart;
  int   msgLen = 0;
  int   size = 0;

  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;

  pthread_mutex_lock(&pObj->mutex);

1804
  size = tscEstimateHeartBeatMsgLength(pSql);
S
slguan 已提交
1805
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
1806
    pthread_mutex_unlock(&pObj->mutex);
S
slguan 已提交
1807 1808 1809
    tscError("%p failed to malloc for heartbeat msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822

  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
  strcpy(pMgmt->db, pObj->db);
  pMsg += sizeof(SMgmtHead);

  pMsg = tscBuildQueryStreamDesc(pMsg, pObj);
  pthread_mutex_unlock(&pObj->mutex);

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1823
  pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
H
hzcheng 已提交
1824 1825 1826 1827 1828

  assert(msgLen + minMsgSize() <= size);
  return msgLen;
}

1829 1830
int tscProcessTableMetaRsp(SSqlObj *pSql) {
  STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
H
hzcheng 已提交
1831

1832 1833
  pMetaMsg->sid = htonl(pMetaMsg->sid);
  pMetaMsg->sversion = htons(pMetaMsg->sversion);
H
hjxilinx 已提交
1834 1835 1836
  
  pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
  
1837 1838
  pMetaMsg->uid = htobe64(pMetaMsg->uid);
  pMetaMsg->contLen = htons(pMetaMsg->contLen);
1839
  pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1840

H
hjxilinx 已提交
1841 1842
  if (pMetaMsg->sid < 0 || pMetaMsg->vgroup.numOfIps < 0) {
    tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgroup.numOfIps, pMetaMsg->sid);
H
hzcheng 已提交
1843 1844 1845
    return TSDB_CODE_INVALID_VALUE;
  }

1846 1847
  if (pMetaMsg->numOfTags > TSDB_MAX_TAGS || pMetaMsg->numOfTags < 0) {
    tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
H
hzcheng 已提交
1848 1849 1850
    return TSDB_CODE_INVALID_VALUE;
  }

1851 1852
  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
H
hzcheng 已提交
1853 1854 1855
    return TSDB_CODE_INVALID_VALUE;
  }

H
hjxilinx 已提交
1856 1857 1858 1859 1860
  for (int i = 0; i < pMetaMsg->vgroup.numOfIps; ++i) {
    pMetaMsg->vgroup.ipAddr[i].ip = htonl(pMetaMsg->vgroup.ipAddr[i].ip);
    pMetaMsg->vgroup.ipAddr[i].port = htons(pMetaMsg->vgroup.ipAddr[i].port);
    
    assert(pMetaMsg->vgroup.ipAddr[i].ip != 0);
H
hzcheng 已提交
1861 1862
  }

1863
  SSchema* pSchema = pMetaMsg->schema;
H
hzcheng 已提交
1864

1865
  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
H
hzcheng 已提交
1866 1867 1868
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);
H
hjxilinx 已提交
1869 1870
    
    assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
1871 1872 1873
    pSchema++;
  }

1874 1875
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
H
hzcheng 已提交
1876

1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887
#if 0
  // if current table is created according to super table, get the table meta of super table
  if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
    char id[TSDB_TABLE_ID_LEN + 1] = {0};
    strncpy(id, pMetaMsg->stableId, TSDB_TABLE_ID_LEN);
  
    // NOTE: if the table meta of super table is not cached at client side yet, the pSTable is NULL
    pTableMeta->pSTable = taosCacheAcquireByName(tscCacheHandle, id);
  }
#endif
  
H
hzcheng 已提交
1888
  // todo add one more function: taosAddDataIfNotExists();
1889
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1890
  assert(pTableMetaInfo->pTableMeta == NULL);
H
hzcheng 已提交
1891

H
hjxilinx 已提交
1892 1893 1894
  pTableMetaInfo->pTableMeta =
      (STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name, pTableMeta, size, tsMeterMetaKeepTimer);
  
1895
  // todo handle out of memory case
1896
  if (pTableMetaInfo->pTableMeta == NULL) {
H
hjxilinx 已提交
1897
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
1898
  }
H
hzcheng 已提交
1899

1900
  free(pTableMeta);
1901
  
H
hjxilinx 已提交
1902
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1903 1904
}

S
slguan 已提交
1905
/**
1906
 *  multi table meta rsp pkg format:
1907
 *  | STaosRsp | ieType | SCMMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
1908 1909 1910
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
1911
#if 0
S
slguan 已提交
1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923
  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
    pSql->res.code = TSDB_CODE_INVALID_IE;
    pSql->res.numOfTotal = 0;
    return TSDB_CODE_OTHERS;
  }

  rsp++;

1924
  SCMMultiTableInfoMsg *pInfo = (SCMMultiTableInfoMsg *)rsp;
S
slguan 已提交
1925
  totalNum = htonl(pInfo->numOfTables);
1926
  rsp += sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1927 1928

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
1929
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
1930
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
1931 1932 1933

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
H
hjxilinx 已提交
1934
    pMeta->vgId = htonl(pMeta->vgId);
S
slguan 已提交
1935 1936
    pMeta->uid = htobe64(pMeta->uid);

H
hjxilinx 已提交
1937 1938
    if (pMeta->sid <= 0 || pMeta->vgId < 0) {
      tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
S
slguan 已提交
1939 1940 1941 1942 1943
      pSql->res.code = TSDB_CODE_INVALID_VALUE;
      pSql->res.numOfTotal = i;
      return TSDB_CODE_OTHERS;
    }

H
hjxilinx 已提交
1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966
    //    pMeta->numOfColumns = htons(pMeta->numOfColumns);
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid tag value count:%d", pMeta->numOfTags);
    //      pSql->res.code = TSDB_CODE_INVALID_VALUE;
    //      pSql->res.numOfTotal = i;
    //      return TSDB_CODE_OTHERS;
    //    }
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid numOfTags:%d", pMeta->numOfTags);
    //      pSql->res.code = TSDB_CODE_INVALID_VALUE;
    //      pSql->res.numOfTotal = i;
    //      return TSDB_CODE_OTHERS;
    //    }
    //
    //    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
    //      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
    //      pSql->res.code = TSDB_CODE_INVALID_VALUE;
    //      pSql->res.numOfTotal = i;
    //      return TSDB_CODE_OTHERS;
    //    }
    //
H
hjxilinx 已提交
1967
    //    for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) {
H
hjxilinx 已提交
1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003
    //      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    //    }
    //
    //    pMeta->rowSize = 0;
    //    rsp += sizeof(SMultiTableMeta);
    //    pSchema = (SSchema *)rsp;
    //
    //    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    //    for (int j = 0; j < numOfTotalCols; ++j) {
    //      pSchema->bytes = htons(pSchema->bytes);
    //      pSchema->colId = htons(pSchema->colId);
    //
    //      // ignore the tags length
    //      if (j < pMeta->numOfColumns) {
    //        pMeta->rowSize += pSchema->bytes;
    //      }
    //      pSchema++;
    //    }
    //
    //    rsp += numOfTotalCols * sizeof(SSchema);
    //
    //    int32_t  tagLen = 0;
    //    SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
    //
    //    if (pMeta->tableType == TSDB_CHILD_TABLE) {
    //      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
    //        tagLen += pTagsSchema[j].bytes;
    //      }
    //    }
    //
    //    rsp += tagLen;
    //    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
    //
    //    pMeta->index = 0;
    //    (void)taosCachePut(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsMeterMetaKeepTimer);
    //  }
S
slguan 已提交
2004
  }
H
hjxilinx 已提交
2005
  
S
slguan 已提交
2006 2007 2008
  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
  tscTrace("%p load multi-metermeta resp complete num:%d", pSql, pSql->res.numOfTotal);
2009 2010
#endif
  
S
slguan 已提交
2011 2012 2013
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
2014 2015
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
#if 0
S
slguan 已提交
2016 2017
  void **      metricMetaList = NULL;
  int32_t *    sizes = NULL;
H
hjxilinx 已提交
2018
  
S
slguan 已提交
2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034
  int32_t num = htons(*(int16_t *)rsp);
  rsp += sizeof(int16_t);

  metricMetaList = calloc(1, POINTER_BYTES * num);
  sizes = calloc(1, sizeof(int32_t) * num);

  // return with error code
  if (metricMetaList == NULL || sizes == NULL) {
    tfree(metricMetaList);
    tfree(sizes);
    pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;

    return pSql->res.code;
  }

  for (int32_t k = 0; k < num; ++k) {
S
slguan 已提交
2035
    pMeta = (SSuperTableMeta *)rsp;
S
slguan 已提交
2036 2037

    size_t size = (size_t)pSql->res.rspLen - 1;
S
slguan 已提交
2038
    rsp = rsp + sizeof(SSuperTableMeta);
S
slguan 已提交
2039

S
slguan 已提交
2040
    pMeta->numOfTables = htonl(pMeta->numOfTables);
S
slguan 已提交
2041 2042 2043
    pMeta->numOfVnodes = htonl(pMeta->numOfVnodes);
    pMeta->tagLen = htons(pMeta->tagLen);

2044
    size += pMeta->numOfVnodes * sizeof(SVnodeSidList *) + pMeta->numOfTables * sizeof(STableIdInfo *);
H
hzcheng 已提交
2045

2046 2047
    char *pBuf = calloc(1, size);
    if (pBuf == NULL) {
S
slguan 已提交
2048 2049 2050
      pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      goto _error_clean;
    }
H
hzcheng 已提交
2051

S
slguan 已提交
2052
    SSuperTableMeta *pNewMetricMeta = (SSuperTableMeta *)pBuf;
S
slguan 已提交
2053
    metricMetaList[k] = pNewMetricMeta;
H
hzcheng 已提交
2054

S
slguan 已提交
2055
    pNewMetricMeta->numOfTables = pMeta->numOfTables;
S
slguan 已提交
2056 2057
    pNewMetricMeta->numOfVnodes = pMeta->numOfVnodes;
    pNewMetricMeta->tagLen = pMeta->tagLen;
H
hzcheng 已提交
2058

S
slguan 已提交
2059
    pBuf = pBuf + sizeof(SSuperTableMeta) + pNewMetricMeta->numOfVnodes * sizeof(SVnodeSidList *);
H
hzcheng 已提交
2060

S
slguan 已提交
2061 2062
    for (int32_t i = 0; i < pMeta->numOfVnodes; ++i) {
      SVnodeSidList *pSidLists = (SVnodeSidList *)rsp;
2063
      memcpy(pBuf, pSidLists, sizeof(SVnodeSidList));
H
hzcheng 已提交
2064

2065 2066
      pNewMetricMeta->list[i] = pBuf - (char *)pNewMetricMeta;  // offset value
      SVnodeSidList *pLists = (SVnodeSidList *)pBuf;
H
hzcheng 已提交
2067

S
slguan 已提交
2068
      tscTrace("%p metricmeta:vid:%d,numOfTables:%d", pSql, i, pLists->numOfSids);
H
hzcheng 已提交
2069

2070
      pBuf += sizeof(SVnodeSidList) + sizeof(STableIdInfo *) * pSidLists->numOfSids;
S
slguan 已提交
2071
      rsp += sizeof(SVnodeSidList);
H
hzcheng 已提交
2072

2073
      size_t elemSize = sizeof(STableIdInfo) + pNewMetricMeta->tagLen;
S
slguan 已提交
2074
      for (int32_t j = 0; j < pSidLists->numOfSids; ++j) {
2075 2076
        pLists->pSidExtInfoList[j] = pBuf - (char *)pLists;
        memcpy(pBuf, rsp, elemSize);
2077

2078 2079
        ((STableIdInfo *)pBuf)->uid = htobe64(((STableIdInfo *)pBuf)->uid);
        ((STableIdInfo *)pBuf)->sid = htonl(((STableIdInfo *)pBuf)->sid);
2080

2081 2082
        rsp += elemSize;
        pBuf += elemSize;
S
slguan 已提交
2083
      }
H
hzcheng 已提交
2084
    }
S
slguan 已提交
2085

2086
    sizes[k] = pBuf - (char *)pNewMetricMeta;
H
hzcheng 已提交
2087 2088
  }

2089
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
S
slguan 已提交
2090 2091 2092
  for (int32_t i = 0; i < num; ++i) {
    char name[TSDB_MAX_TAGS_LEN + 1] = {0};

H
hjxilinx 已提交
2093
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2094
    tscGetMetricMetaCacheKey(pQueryInfo, name, pTableMetaInfo->pTableMeta->uid);
H
hzcheng 已提交
2095

S
slguan 已提交
2096 2097 2098
#ifdef _DEBUG_VIEW
    printf("generate the metric key:%s, index:%d\n", name, i);
#endif
H
hzcheng 已提交
2099

S
slguan 已提交
2100
    // release the used metricmeta
H
hjxilinx 已提交
2101 2102
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), false);
    pTableMetaInfo->pMetricMeta = (SSuperTableMeta *)taosCachePut(tscCacheHandle, name, (char *)metricMetaList[i],
S
slguan 已提交
2103 2104 2105 2106
                                                                      sizes[i], tsMetricMetaKeepTimer);
    tfree(metricMetaList[i]);

    // failed to put into cache
H
hjxilinx 已提交
2107
    if (pTableMetaInfo->pMetricMeta == NULL) {
S
slguan 已提交
2108 2109 2110
      pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      goto _error_clean;
    }
H
hzcheng 已提交
2111 2112
  }

S
slguan 已提交
2113 2114 2115 2116 2117 2118 2119 2120
_error_clean:
  // free allocated resource
  for (int32_t i = 0; i < num; ++i) {
    tfree(metricMetaList[i]);
  }

  free(sizes);
  free(metricMetaList);
H
hjxilinx 已提交
2121
#endif
2122
  SSqlRes* pRes = &pSql->res;
H
hjxilinx 已提交
2123
  
2124
  SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pRes->pRsp;
H
hjxilinx 已提交
2125
  pStableVgroup->numOfVgroups = htonl(pStableVgroup->numOfVgroups);
H
hjxilinx 已提交
2126
  
2127 2128 2129
  // master sqlObj locates in param
  SSqlObj* parent = pSql->param;
  assert(parent != NULL);
H
hjxilinx 已提交
2130
  
2131
  SSqlCmd* pCmd = &parent->cmd;
H
hjxilinx 已提交
2132 2133
  STableMetaInfo* pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
  
2134 2135 2136
  pInfo->vgroupList = malloc(pRes->rspLen);
  memcpy(pInfo->vgroupList, pStableVgroup, pRes->rspLen);
  
H
hjxilinx 已提交
2137 2138
  for(int32_t i = 0; i < pInfo->vgroupList->numOfVgroups; ++i) {
    SCMVgroupInfo* pVgroups = &pInfo->vgroupList->vgroups[i];
2139
    
H
hjxilinx 已提交
2140
    pVgroups->vgId = htonl(pVgroups->vgId);
2141
    assert(pVgroups->numOfIps >= 1);
2142
    
2143
    for(int32_t j = 0; j < pVgroups->numOfIps; ++j) {
H
hjxilinx 已提交
2144 2145
      pVgroups->ipAddr[j].ip = htonl(pVgroups->ipAddr[j].ip);
      pVgroups->ipAddr[j].port = htons(pVgroups->ipAddr[j].port);
2146
    }
H
hjxilinx 已提交
2147 2148
  }
  
S
slguan 已提交
2149
  return pSql->res.code;
H
hzcheng 已提交
2150 2151 2152 2153 2154 2155
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
H
hjxilinx 已提交
2156
  STableMetaMsg * pMetaMsg;
2157
  SCMShowRsp *pShow;
S
slguan 已提交
2158
  SSchema *    pSchema;
H
hzcheng 已提交
2159 2160
  char         key[20];

2161 2162 2163
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
2164
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2165

H
hjxilinx 已提交
2166
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
2167

2168
  pShow = (SCMShowRsp *)pRes->pRsp;
S
slguan 已提交
2169
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
2170 2171
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
2172
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
2173
  pMetaMsg = &(pShow->tableMeta);
H
hzcheng 已提交
2174

H
hjxilinx 已提交
2175
  pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
H
hzcheng 已提交
2176

H
hjxilinx 已提交
2177
  pSchema = pMetaMsg->schema;
H
hjxilinx 已提交
2178 2179
  pMetaMsg->sid = ntohs(pMetaMsg->sid);
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
hzcheng 已提交
2180 2181 2182 2183
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

2184
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
2185 2186
  strcpy(key + 1, "showlist");

H
hjxilinx 已提交
2187
  taosCacheRelease(tscCacheHandle, (void *)&(pTableMetaInfo->pTableMeta), false);
H
hjxilinx 已提交
2188 2189 2190
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
  
H
hjxilinx 已提交
2191
  pTableMetaInfo->pTableMeta =
H
hjxilinx 已提交
2192 2193
      (STableMeta *)taosCachePut(tscCacheHandle, key, (char *)pTableMeta, size, tsMeterMetaKeepTimer);
  
2194
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutputCols;
H
hjxilinx 已提交
2195
  SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2196

H
hjxilinx 已提交
2197
  tscColumnBaseInfoReserve(&pQueryInfo->colList, pMetaMsg->numOfColumns);
S
slguan 已提交
2198 2199
  SColumnIndex index = {0};

H
hjxilinx 已提交
2200
  for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i) {
S
slguan 已提交
2201
    index.columnIndex = i;
2202
    tscColumnBaseInfoInsert(pQueryInfo, &index);
H
hjxilinx 已提交
2203
    tscFieldInfoSetValFromSchema(&pQueryInfo->fieldsInfo, i, &pTableSchema[i]);
H
hjxilinx 已提交
2204 2205
    
    pQueryInfo->fieldsInfo.pSqlExpr[i] = tscSqlExprInsert(pQueryInfo, i, TSDB_FUNC_TS_DUMMY, &index,
H
hjxilinx 已提交
2206
                     pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes);
H
hzcheng 已提交
2207 2208
  }

2209
  tscFieldInfoCalOffset(pQueryInfo);
H
hjxilinx 已提交
2210 2211
  
  tfree(pTableMeta);
H
hzcheng 已提交
2212 2213 2214 2215
  return 0;
}

int tscProcessConnectRsp(SSqlObj *pSql) {
S
slguan 已提交
2216
  char temp[TSDB_TABLE_ID_LEN * 2];
H
hzcheng 已提交
2217 2218 2219
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

2220
  SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
H
hzcheng 已提交
2221
  strcpy(pObj->acctId, pConnect->acctId);  // copy acctId from response
2222 2223
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

H
hjxilinx 已提交
2224 2225 2226
  assert(len <= tListLen(pObj->db));
  strncpy(pObj->db, temp, tListLen(pObj->db));
  
S
slguan 已提交
2227
//  SIpList *    pIpList;
2228
//  char *rsp = pRes->pRsp + sizeof(SCMConnectRsp);
S
slguan 已提交
2229 2230
//  pIpList = (SIpList *)rsp;
//  tscSetMgmtIpList(pIpList);
H
hzcheng 已提交
2231

S
slguan 已提交
2232
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
2233 2234 2235 2236 2237 2238 2239 2240
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
2241
  STscObj *       pObj = pSql->pTscObj;
2242
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2243

H
hjxilinx 已提交
2244
  strcpy(pObj->db, pTableMetaInfo->name);
H
hzcheng 已提交
2245 2246 2247 2248
  return 0;
}

int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
2249
  taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2250 2251 2252 2253
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
2254
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2255

H
hjxilinx 已提交
2256 2257
  STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMeta == NULL) {
H
hzcheng 已提交
2258 2259 2260 2261 2262 2263 2264 2265
    /* not in cache, abort */
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
2266 2267
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
2268
   */
H
hjxilinx 已提交
2269 2270
  tscTrace("%p force release metermeta after drop table:%s", pSql, pTableMetaInfo->name);
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2271

H
hjxilinx 已提交
2272 2273
  if (pTableMetaInfo->pTableMeta) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hjxilinx 已提交
2274
//    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true);
H
hzcheng 已提交
2275 2276 2277 2278 2279 2280
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
2281
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2282

H
hjxilinx 已提交
2283 2284
  STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2285 2286 2287
    return 0;
  }

H
hjxilinx 已提交
2288 2289
  tscTrace("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2290

H
hjxilinx 已提交
2291
  if (pTableMetaInfo->pTableMeta) {
H
hjxilinx 已提交
2292
    bool isSuperTable = UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo);
H
hzcheng 已提交
2293

H
hjxilinx 已提交
2294
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hjxilinx 已提交
2295
//    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true);
H
hzcheng 已提交
2296

2297
    if (isSuperTable) {  // if it is a super table, reset whole query cache
H
hjxilinx 已提交
2298
      tscTrace("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
2299
      taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

2314
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
2315 2316 2317
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
2318
  pRes->data = NULL;
S
slguan 已提交
2319
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2320 2321 2322 2323
  return 0;
}

int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
S
slguan 已提交
2324 2325 2326
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
2327
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2328 2329 2330

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
2331 2332
  pRes->offset    = htobe64(pRetrieve->offset);
  pRes->useconds  = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
2333
  pRes->completed = (pRetrieve->completed == 1);
2334
  pRes->data      = pRetrieve->data;
H
hjxilinx 已提交
2335
  
2336 2337
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  tscSetResultPointer(pQueryInfo, pRes);
2338

weixin_48148422's avatar
weixin_48148422 已提交
2339
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
2340 2341 2342 2343 2344
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutputCols;
    
    TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, numOfCols - 1);
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
2345 2346
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
2347
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
2348
    p += sizeof(int32_t);
S
slguan 已提交
2349
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
2350 2351 2352 2353
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
2354
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
2355
    }
2356 2357
  }

H
hzcheng 已提交
2358
  pRes->row = 0;
S
slguan 已提交
2359
  tscTrace("%p numOfRows:%d, offset:%d", pSql, pRes->numOfRows, pRes->offset);
H
hzcheng 已提交
2360 2361 2362 2363 2364

  return 0;
}

int tscProcessRetrieveRspFromLocal(SSqlObj *pSql) {
2365 2366
  SSqlRes *   pRes = &pSql->res;
  SSqlCmd *   pCmd = &pSql->cmd;
2367
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2368

S
slguan 已提交
2369
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2370 2371 2372

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->data = pRetrieve->data;
2373

2374
  tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
2375 2376 2377 2378
  pRes->row = 0;
  return 0;
}

H
hjxilinx 已提交
2379
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
H
hzcheng 已提交
2380

2381
static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
S
slguan 已提交
2382 2383
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
2384
    tscError("%p malloc failed for new sqlobj to get table meta", pSql);
S
slguan 已提交
2385 2386
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
2387

H
hzcheng 已提交
2388 2389 2390
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
2391

2392
  tscAddSubqueryInfo(&pNew->cmd);
2393 2394 2395 2396

  SQueryInfo *pNewQueryInfo = NULL;
  tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo);

H
hjxilinx 已提交
2397
  pNew->cmd.autoCreated = pSql->cmd.autoCreated;  // create table if not exists
S
slguan 已提交
2398
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
2399
    tscError("%p malloc failed for payload to get table meta", pSql);
S
slguan 已提交
2400
    free(pNew);
2401

S
slguan 已提交
2402 2403 2404
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

H
hjxilinx 已提交
2405
  STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
2406
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
2407

2408
  strncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, tListLen(pNewMeterMetaInfo->name));
2409
  memcpy(pNew->cmd.payload, pSql->cmd.payload, TSDB_DEFAULT_PAYLOAD_SIZE);  // tag information if table does not exists.
S
slguan 已提交
2410
  tscTrace("%p new pSqlObj:%p to get tableMeta", pSql, pNew);
H
hzcheng 已提交
2411

H
hjxilinx 已提交
2412 2413
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
H
hzcheng 已提交
2414

H
hjxilinx 已提交
2415 2416 2417
  int32_t code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
    code = TSDB_CODE_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2418 2419 2420 2421 2422
  }

  return code;
}

H
hjxilinx 已提交
2423
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
H
hjxilinx 已提交
2424
  assert(strlen(pTableMetaInfo->name) != 0);
S
slguan 已提交
2425

2426
  // If this STableMetaInfo owns a table meta, release it first
H
hjxilinx 已提交
2427 2428
  if (pTableMetaInfo->pTableMeta != NULL) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), false);
2429 2430
  }
  
H
hjxilinx 已提交
2431 2432
  pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMetaInfo->pTableMeta != NULL) {
H
hjxilinx 已提交
2433
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
2434 2435
    tscTrace("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
             tinfo.numOfTags, pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2436 2437 2438

    return TSDB_CODE_SUCCESS;
  }
2439 2440
  
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2441 2442
}

H
hjxilinx 已提交
2443
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
H
hjxilinx 已提交
2444
  pSql->cmd.autoCreated = createIfNotExists;
H
hjxilinx 已提交
2445
  return tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
2446 2447 2448 2449
}

/*
 * in handling the renew metermeta problem during insertion,
S
slguan 已提交
2450
 *
H
hzcheng 已提交
2451 2452 2453 2454 2455
 * If the meter is created on demand during insertion, the routine usually waits for a short
 * period to re-issue the getMeterMeta msg, in which makes a greater change that vnode has
 * successfully created the corresponding table.
 */
static void tscWaitingForCreateTable(SSqlCmd *pCmd) {
S
slguan 已提交
2456
  if (pCmd->command == TSDB_SQL_INSERT) {
H
hzcheng 已提交
2457 2458 2459 2460 2461 2462 2463
    taosMsleep(50);  // todo: global config
  }
}

/**
 * in renew metermeta, do not retrieve metadata in cache.
 * @param pSql          sql object
S
slguan 已提交
2464
 * @param tableId       meter id
H
hzcheng 已提交
2465 2466
 * @return              status code
 */
S
slguan 已提交
2467
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) {
2468 2469
  int code = 0;

H
hjxilinx 已提交
2470
  // handle table meta renew process
H
hzcheng 已提交
2471
  SSqlCmd *pCmd = &pSql->cmd;
2472 2473

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
2474
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
2475 2476

  /*
S
slguan 已提交
2477
   * 1. only update the metermeta in force model metricmeta is not updated
H
hzcheng 已提交
2478 2479
   * 2. if get metermeta failed, still get the metermeta
   */
H
hjxilinx 已提交
2480
  if (pTableMetaInfo->pTableMeta == NULL || !tscQueryOnMetric(pCmd)) {
2481
    STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
2482
    if (pTableMetaInfo->pTableMeta) {
2483 2484
      tscTrace("%p update table meta, old: numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
               tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->uid, pTableMeta);
H
hzcheng 已提交
2485
    }
2486

2487
    tscWaitingForCreateTable(pCmd);
H
hjxilinx 已提交
2488
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2489

2490
    code = getTableMetaFromMgmt(pSql, pTableMetaInfo);  // todo ??
H
hzcheng 已提交
2491
  } else {
H
hjxilinx 已提交
2492
    tscTrace("%p metric query not update metric meta, numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
H
hjxilinx 已提交
2493 2494
             tscGetNumOfTags(pTableMetaInfo->pTableMeta), pCmd->numOfCols, pTableMetaInfo->pTableMeta->uid,
             pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2495 2496 2497 2498 2499
  }

  return code;
}

H
hjxilinx 已提交
2500
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
S
slguan 已提交
2501 2502
  int      code = TSDB_CODE_NETWORK_UNAVAIL;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
2503

H
hjxilinx 已提交
2504
  //the query condition is serialized into pCmd->payload, we need to rebuild key for stable meta info in cache.
2505
//  bool required = false;
2506

2507
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2508
  if (pQueryInfo->pTableMetaInfo[0]->vgroupList != NULL) {
H
hjxilinx 已提交
2509 2510 2511 2512
    return TSDB_CODE_SUCCESS;
  }
  
#if 0
2513
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
S
slguan 已提交
2514 2515
    char tagstr[TSDB_MAX_TAGS_LEN + 1] = {0};

H
hjxilinx 已提交
2516
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2517
    tscGetMetricMetaCacheKey(pQueryInfo, tagstr, pTableMetaInfo->pTableMeta->uid);
S
slguan 已提交
2518

H
hjxilinx 已提交
2519
//    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), false);
S
slguan 已提交
2520

2521
    SSuperTableMeta *ppMeta = (SSuperTableMeta *)taosCacheAcquireByName(tscCacheHandle, tagstr);
S
slguan 已提交
2522
    if (ppMeta == NULL) {
2523
      required = true;
S
slguan 已提交
2524 2525
      break;
    } else {
H
hjxilinx 已提交
2526
//      pTableMetaInfo->pMetricMeta = ppMeta;
S
slguan 已提交
2527 2528
    }
  }
H
hzcheng 已提交
2529

2530 2531
  // all metricmeta for one clause are retrieved from cache, no need to retrieve metricmeta from management node
  if (!required) {
H
hzcheng 已提交
2532 2533
    return TSDB_CODE_SUCCESS;
  }
H
hjxilinx 已提交
2534 2535
#endif
  
S
slguan 已提交
2536
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
2537 2538 2539
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

H
hjxilinx 已提交
2540
  pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
2541 2542
  
  SQueryInfo *pNewQueryInfo = NULL;
2543 2544 2545
  if ((code = tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo)) != TSDB_CODE_SUCCESS) {
    return code;
  }
2546
  
2547
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2548
    STableMetaInfo *pMMInfo = tscGetMetaInfo(pQueryInfo, i);
2549

H
hjxilinx 已提交
2550
    STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pMMInfo->name);
2551
    tscAddTableMetaInfo(pNewQueryInfo, pMMInfo->name, pTableMeta, NULL, pMMInfo->numOfTags, pMMInfo->tagColumnIndex);
S
slguan 已提交
2552 2553 2554 2555 2556 2557
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
2558

2559
  tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond);
H
hzcheng 已提交
2560

2561 2562
  pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr;
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
H
hzcheng 已提交
2563

2564 2565
  pNewQueryInfo->slimit = pQueryInfo->slimit;
  pNewQueryInfo->order = pQueryInfo->order;
H
hjxilinx 已提交
2566 2567 2568 2569 2570
  
  STagCond* pTagCond = &pNewQueryInfo->tagCond;
  tscTrace("%p new sqlobj:%p info, numOfTables:%d, slimit:%" PRId64 ", soffset:%" PRId64 ", order:%d, tbname cond:%s",
      pSql, pNew, pNewQueryInfo->numOfTables, pNewQueryInfo->slimit.limit, pNewQueryInfo->slimit.offset,
      pNewQueryInfo->order.order, pTagCond->tbnameCond.cond)
H
hzcheng 已提交
2571

2572 2573 2574 2575
//  if (pSql->fp != NULL && pSql->pStream == NULL) {
//    pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
//    tscFreeSubqueryInfo(pCmd);
//  }
H
hzcheng 已提交
2576

H
hjxilinx 已提交
2577
  tscTrace("%p allocate new pSqlObj:%p to get stable vgroupInfo", pSql, pNew);
2578 2579 2580 2581 2582
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
  code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
    code = TSDB_CODE_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2583 2584 2585 2586 2587
  }

  return code;
}

2588
void tscInitMsgsFp() {
S
slguan 已提交
2589 2590 2591
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildRetrieveMsg;
H
hzcheng 已提交
2592 2593

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
2594
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
2595

2596 2597
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
2598 2599

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
S
[TD-16]  
slguan 已提交
2600
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserMsg;
H
hzcheng 已提交
2601 2602 2603
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
2604
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
2605 2606 2607
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
2608 2609 2610 2611 2612
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
H
hjxilinx 已提交
2613
  tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
H
hjxilinx 已提交
2614
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
S
slguan 已提交
2615
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
2616 2617 2618 2619

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
2620 2621 2622
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
2623 2624 2625 2626 2627 2628 2629 2630

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromVnode;

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
2631
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
H
hjxilinx 已提交
2632
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
S
slguan 已提交
2633
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
2634 2635

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
2636
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromVnode;  // rsp handled by same function.
H
hzcheng 已提交
2637
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
2638

H
hzcheng 已提交
2639
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_TAGS] = tscProcessTagRetrieveRsp;
H
hjxilinx 已提交
2640 2641 2642 2643 2644
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS] = tscProcessTagRetrieveRsp;
2645

H
hzcheng 已提交
2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_METRIC] = tscProcessRetrieveMetricRsp;

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;
}