tscServer.c 85.3 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17 18 19 20
#include "tcache.h"
#include "trpc.h"
#include "tscProfile.h"
#include "tscSecondaryMerge.h"
H
hjxilinx 已提交
21
#include "tscSubquery.h"
H
hzcheng 已提交
22 23 24 25 26 27 28 29 30 31
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "tsocket.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"

#define TSC_MGMT_VNODE 999

S
slguan 已提交
32
SRpcIpSet  tscMgmtIpList;
S
slguan 已提交
33 34
SRpcIpSet  tscDnodeIpSet;

35 36
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
37 38 39
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
40 41 42
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid);
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
43

S
slguan 已提交
44
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
hzcheng 已提交
45

S
slguan 已提交
46 47
void tscPrintMgmtIp() {
  if (tscMgmtIpList.numOfIps <= 0) {
S
slguan 已提交
48
    tscError("invalid mgmt IP list:%d", tscMgmtIpList.numOfIps);
S
slguan 已提交
49
  } else {
S
slguan 已提交
50
    for (int i = 0; i < tscMgmtIpList.numOfIps; ++i) {
S
slguan 已提交
51
      tscTrace("mgmt index:%d ip:%d", i, tscMgmtIpList.ip[i]);
S
slguan 已提交
52
    }
S
slguan 已提交
53 54 55
  }
}

S
slguan 已提交
56 57
void tscSetMgmtIpListFromCluster(SRpcIpSet *pIpList) {
  tscMgmtIpList.numOfIps = htons(pIpList->numOfIps);
S
slguan 已提交
58
  tscMgmtIpList.inUse = htons(pIpList->inUse);
S
slguan 已提交
59 60 61
  tscMgmtIpList.port = htons(pIpList->port);
  for (int32_t i = 0; i <tscMgmtIpList.numOfIps; ++i) {
    tscMgmtIpList.ip[i] = pIpList->ip[i];
S
slguan 已提交
62 63 64 65
  }
}

void tscSetMgmtIpListFromEdge() {
S
slguan 已提交
66 67
  if (tscMgmtIpList.numOfIps != 1) {
    tscMgmtIpList.numOfIps = 1;
S
slguan 已提交
68
    tscMgmtIpList.inUse = 0;
S
slguan 已提交
69
    tscMgmtIpList.port = tsMnodeShellPort;
S
slguan 已提交
70 71 72 73 74 75
    tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
    tscTrace("edge mgmt IP list:");
    tscPrintMgmtIp();
  }
}

S
slguan 已提交
76
void tscSetMgmtIpList(SRpcIpSet *pIpList) {
S
slguan 已提交
77 78 79 80 81 82 83 84
  /*
    * The iplist returned by the cluster edition is the current management nodes
    * and the iplist returned by the edge edition is empty
    */
  if (pIpList->numOfIps != 0) {
    tscSetMgmtIpListFromCluster(pIpList);
  } else {
    tscSetMgmtIpListFromEdge();
S
slguan 已提交
85 86 87
  }
}

H
hjxilinx 已提交
88 89 90 91 92 93 94
/*
 * For each management node, try twice at least in case of poor network situation.
 * If the client start to connect to a non-management node from the client, and the first retry may fail due to
 * the poor network quality. And then, the second retry get the response with redirection command.
 * The retry will not be executed since only *two* retry is allowed in case of single management node in the cluster.
 * Therefore, we need to multiply the retry times by factor of 2 to fix this problem.
 */
95
UNUSED_FUNC
H
hjxilinx 已提交
96 97 98 99 100
static int32_t tscGetMgmtConnMaxRetryTimes() {
  int32_t factor = 2;
  return tscMgmtIpList.numOfIps * factor;
}

H
hzcheng 已提交
101 102 103 104 105 106 107 108 109 110 111 112
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
  if (pObj != pObj->signature) {
    tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
    return;
  }

  SSqlObj *pSql = pObj->pHb;
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
113
    SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
S
slguan 已提交
114
    SRpcIpSet *      pIpList = &pRsp->ipList;
S
slguan 已提交
115
    tscSetMgmtIpList(pIpList);
S
slguan 已提交
116

H
hzcheng 已提交
117 118 119
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
    } else {
S
slguan 已提交
120 121
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
    }
  } else {
    tscTrace("heart beat failed, code:%d", code);
  }

  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;

  if (pObj == NULL) return;
  if (pObj->signature != pObj) return;
  if (pObj->pTimer != tmrId) return;

  if (pObj->pHb == NULL) {
S
slguan 已提交
138 139 140
    SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
    if (NULL == pSql) return;

H
hzcheng 已提交
141
    pSql->fp = tscProcessHeartBeatRsp;
H
hjxilinx 已提交
142
    
143 144 145 146
    SQueryInfo *pQueryInfo = NULL;
    tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
    pQueryInfo->command = TSDB_SQL_HB;
    
S
slguan 已提交
147 148 149 150 151
    if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
      tfree(pSql);
      return;
    }

H
hzcheng 已提交
152 153 154 155
    pSql->param = pObj;
    pSql->pTscObj = pObj;
    pSql->signature = pSql;
    pObj->pHb = pSql;
156
    tscAddSubqueryInfo(&pObj->pHb->cmd);
157

S
slguan 已提交
158
    tscTrace("%p pHb is allocated, pObj:%p", pObj->pHb, pObj);
H
hzcheng 已提交
159 160 161
  }

  if (tscShouldFreeHeatBeat(pObj->pHb)) {
S
slguan 已提交
162
    tscTrace("%p free HB object and release connection", pObj);
H
hzcheng 已提交
163 164 165 166 167 168 169 170 171
    tscFreeSqlObj(pObj->pHb);
    tscCloseTscObj(pObj);
    return;
  }

  tscProcessSql(pObj->pHb);
}

int tscSendMsgToServer(SSqlObj *pSql) {
S
slguan 已提交
172 173
  char *pMsg = rpcMallocCont(pSql->cmd.payloadLen);
  if (NULL == pMsg) {
S
slguan 已提交
174 175
    tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
H
hzcheng 已提交
176 177
  }

S
slguan 已提交
178
  pSql->ipList->ip[0] = inet_addr(tsPrivateIp);
179
  
S
slguan 已提交
180
  if (pSql->cmd.command < TSDB_SQL_MGMT) {
S
slguan 已提交
181
    pSql->ipList->port = tsDnodeShellPort;
S
slguan 已提交
182 183
    tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
    memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen);
184 185 186 187 188 189

    SRpcMsg rpcMsg = {
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
      .contLen = pSql->cmd.payloadLen,
      .handle  = pSql,
H
hjxilinx 已提交
190
      .code    = 0
191 192
    };
    rpcSendRequest(pVnodeConn, pSql->ipList, &rpcMsg);
S
slguan 已提交
193
  } else {
S
slguan 已提交
194
    pSql->ipList->port = tsMnodeShellPort;
S
slguan 已提交
195
    tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
S
slguan 已提交
196
    memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
197 198 199 200 201 202 203 204
    SRpcMsg rpcMsg = {
        .msgType = pSql->cmd.msgType,
        .pCont   = pMsg,
        .contLen = pSql->cmd.payloadLen,
        .handle  = pSql,
        .code   = 0
    };
    rpcSendRequest(pTscMgmtConn, pSql->ipList, &rpcMsg);
H
hzcheng 已提交
205 206
  }

S
slguan 已提交
207
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
208 209
}

210 211
void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
  SSqlObj *pSql = (SSqlObj *)rpcMsg->handle;
S
slguan 已提交
212
  if (pSql == NULL || pSql->signature != pSql) {
H
hzcheng 已提交
213
    tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
S
slguan 已提交
214
    return;
H
hzcheng 已提交
215 216
  }

S
slguan 已提交
217 218 219
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;
220
  tscTrace("%p msg:%p is received from server", pSql, rpcMsg->pCont);
H
hzcheng 已提交
221 222

  if (pSql->freed || pObj->signature != pObj) {
S
slguan 已提交
223 224
    tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed,
             pObj, pObj->signature);
H
hzcheng 已提交
225
    tscFreeSqlObj(pSql);
226
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
227
    return;
H
hzcheng 已提交
228 229
  }

230 231
  if (rpcMsg->pCont == NULL) {
    rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
S
slguan 已提交
232
  } else {
233
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
234 235 236 237
    if (rpcMsg->code == TSDB_CODE_NOT_ACTIVE_TABLE || rpcMsg->code == TSDB_CODE_INVALID_TABLE_ID ||
        rpcMsg->code == TSDB_CODE_INVALID_VNODE_ID || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_VNODE ||
        rpcMsg->code == TSDB_CODE_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_SESSION ||
        rpcMsg->code == TSDB_CODE_TABLE_ID_MISMATCH) {
S
slguan 已提交
238 239 240 241 242 243 244 245 246 247 248
      /*
       * not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized,
       *                   the virtual node may have not create table till now, so try again by using the new metermeta.
       *                   2. this requested table may have been removed by other client, so we need to renew the
       *                   metermeta here.
       *
       * not_active_vnode: current vnode is move to other node due to node balance procedure or virtual node have been
       *                   removed. So, renew metermeta and try again.
       * not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore.
       */
      if (pCmd->command == TSDB_SQL_CONNECT) {
249 250
        rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
        rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
251 252
        return;
      } else if (pCmd->command == TSDB_SQL_HB) {
253 254
        rpcMsg->code = TSDB_CODE_NOT_READY;
        rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
255 256
        return;
      } else {
H
hjxilinx 已提交
257
        tscTrace("%p it shall renew meter meta, code:%d", pSql, tstrerror(rpcMsg->code));
H
hzcheng 已提交
258

S
slguan 已提交
259
        pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
260
        pSql->res.code = rpcMsg->code;  // keep the previous error code
S
slguan 已提交
261

H
hjxilinx 已提交
262
        rpcMsg->code = tscRenewMeterMeta(pSql, pTableMetaInfo->name);
H
hzcheng 已提交
263

H
hjxilinx 已提交
264
        if (pTableMetaInfo->pTableMeta) {
S
slguan 已提交
265
          tscSendMsgToServer(pSql);
266
          rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
267 268
          return;
        }
H
hzcheng 已提交
269 270
      }
    }
S
slguan 已提交
271
  }
H
hzcheng 已提交
272 273 274

  pSql->retry = 0;
  pRes->rspLen = 0;
275
  
H
hzcheng 已提交
276
  if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
277
    pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_NETWORK_UNAVAIL;
H
hzcheng 已提交
278
  } else {
H
hjxilinx 已提交
279
    tscTrace("%p query is cancelled, code:%d", pSql, tstrerror(pRes->code));
H
hzcheng 已提交
280 281
  }

S
slguan 已提交
282
  if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
283
    assert(rpcMsg->msgType == pCmd->msgType + 1);
284
    pRes->code    = rpcMsg->code;
285
    pRes->rspType = rpcMsg->msgType;
286
    pRes->rspLen  = rpcMsg->contLen;
H
hzcheng 已提交
287

S
slguan 已提交
288 289 290 291 292 293
    char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
    if (tmp == NULL) {
      pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
    } else {
      pRes->pRsp = tmp;
      if (pRes->rspLen) {
294
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
295 296 297 298
      }
    }

    // ignore the error information returned from mnode when set ignore flag in sql
S
slguan 已提交
299
    if (pRes->code == TSDB_CODE_DB_ALREADY_EXIST && pCmd->existsCheck && pRes->rspType == TSDB_MSG_TYPE_CM_CREATE_DB_RSP) {
H
hzcheng 已提交
300 301 302 303 304 305 306
      pRes->code = TSDB_CODE_SUCCESS;
    }

    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
307 308
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP) {
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
309 310 311 312 313 314 315
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
S
slguan 已提交
316
      tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code,
H
hjxilinx 已提交
317
          pMsg->affectedRows, pRes->rspLen);
S
slguan 已提交
318
    } else {
H
hjxilinx 已提交
319
      tscTrace("%p cmd:%d code:%s rsp len:%d", pSql, pCmd->command, tstrerror(pRes->code), pRes->rspLen);
H
hzcheng 已提交
320 321 322
    }
  }

323 324
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command])
    rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
hjxilinx 已提交
325
  
326
  if (rpcMsg->code != TSDB_CODE_ACTION_IN_PROGRESS) {
327 328 329
    void *taosres = tscKeepConn[pCmd->command] ? pSql : NULL;
    rpcMsg->code = pRes->code ? -pRes->code : pRes->numOfRows;
    
H
hjxilinx 已提交
330
    tscTrace("%p Async SQL result:%s res:%p", pSql, tstrerror(pRes->code), taosres);
H
hzcheng 已提交
331

332 333 334 335 336 337 338 339 340
    /*
     * Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj
     * may be freed in UDF, and reused by other threads before tscShouldFreeAsyncSqlObj called, in which case
     * tscShouldFreeAsyncSqlObj checks an object which is actually allocated by other threads.
     *
     * If this block of memory is re-allocated for an insert thread, in which tscKeepConn[command] equals to 0,
     * the tscShouldFreeAsyncSqlObj will success and tscFreeSqlObj free it immediately.
     */
    bool shouldFree = tscShouldFreeAsyncSqlObj(pSql);
H
hjxilinx 已提交
341
    (*pSql->fp)(pSql->param, taosres, rpcMsg->code);
H
hzcheng 已提交
342

343
    if (shouldFree) {
344 345
      tscFreeSqlObj(pSql);
      tscTrace("%p Async sql is automatically freed", pSql);
H
hzcheng 已提交
346 347 348
    }
  }

349
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
350 351
}

S
slguan 已提交
352 353 354 355
int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

H
hjxilinx 已提交
356 357 358 359 360 361 362
  if (pCmd->command == TSDB_SQL_SELECT ||
      pCmd->command == TSDB_SQL_FETCH ||
      pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT ||
      pCmd->command == TSDB_SQL_CONNECT ||
      pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META ||
H
hjxilinx 已提交
363
      pCmd->command == TSDB_SQL_STABLEVGROUP) {
364
    tscBuildMsg[pCmd->command](pSql, NULL);
S
slguan 已提交
365
  }
366 367 368

  int32_t code = tscSendMsgToServer(pSql);
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
369
    pRes->code = code;
H
hjxilinx 已提交
370
    tscQueueAsyncRes(pSql);
S
slguan 已提交
371
  }
H
hjxilinx 已提交
372 373
  
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
374 375 376
}

int tscProcessSql(SSqlObj *pSql) {
377 378 379
  char *   name = NULL;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
380 381
  
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
382
  STableMetaInfo *pTableMetaInfo = NULL;
383
  uint16_t        type = 0;
384

385
  if (pQueryInfo != NULL) {
H
hjxilinx 已提交
386
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
387 388
    if (pTableMetaInfo != NULL) {
      name = pTableMetaInfo->name;
389
    }
390

391
    type = pQueryInfo->type;
392 393 394
  
    // for hearbeat, numOfTables == 0;
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
395
  }
396

397
  tscTrace("%p SQL cmd:%d will be processed, name:%s, type:%d", pSql, pCmd->command, name, type);
H
hzcheng 已提交
398
  if (pSql->cmd.command < TSDB_SQL_MGMT) {
H
hjxilinx 已提交
399 400
    // the pTableMetaInfo cannot be NULL
    if (pTableMetaInfo == NULL) {
H
hjxilinx 已提交
401 402 403
      pSql->res.code = TSDB_CODE_OTHERS;
      return pSql->res.code;
    }
404

S
slguan 已提交
405 406
    // temp
    pSql->ipList = &tscMgmtIpList;
H
hjxilinx 已提交
407
//    if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
408
//      pSql->index = pTableMetaInfo->pTableMeta->index;
S
slguan 已提交
409 410
//    } else {  // it must be the parent SSqlObj for super table query
//      if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) != 0) {
H
hjxilinx 已提交
411
//        int32_t idx = pTableMetaInfo->vnodeIndex;
S
slguan 已提交
412
//
H
hjxilinx 已提交
413
//        SVnodeSidList *pSidList = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx);
S
slguan 已提交
414 415 416
//        pSql->index = pSidList->index;
//      }
//    }
H
hzcheng 已提交
417
  } else if (pSql->cmd.command < TSDB_SQL_LOCAL) {
S
slguan 已提交
418
    pSql->ipList = &tscMgmtIpList;
H
hzcheng 已提交
419 420 421 422
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }

S
slguan 已提交
423
  // todo handle async situation
424 425
  if (QUERY_IS_JOIN_QUERY(type)) {
    if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) {
H
hjxilinx 已提交
426
      return tscHandleMasterJoinQuery(pSql);
S
slguan 已提交
427 428
    } else {
      // for first stage sub query, iterate all vnodes to get all timestamp
429
      if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
S
slguan 已提交
430 431 432 433
        return doProcessSql(pSql);
      }
    }
  }
434
  
H
hjxilinx 已提交
435 436 437 438 439
  if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query
    tscHandleMasterSTableQuery(pSql);
    return pRes->code;
  } else if (pSql->fp == (void(*)())tscHandleMultivnodeInsert) {  // multi-vnodes insertion
    tscHandleMultivnodeInsert(pSql);
H
hzcheng 已提交
440 441
    return pSql->res.code;
  }
442
  
S
slguan 已提交
443 444
  return doProcessSql(pSql);
}
H
hzcheng 已提交
445 446

void tscKillMetricQuery(SSqlObj *pSql) {
447 448 449
  SSqlCmd* pCmd = &pSql->cmd;
  
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
450
  if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
H
hzcheng 已提交
451 452 453 454 455 456
    return;
  }

  for (int i = 0; i < pSql->numOfSubs; ++i) {
    SSqlObj *pSub = pSql->pSubs[i];

S
slguan 已提交
457
    if (pSub == NULL) {
H
hzcheng 已提交
458 459
      continue;
    }
S
slguan 已提交
460

H
hzcheng 已提交
461 462 463 464 465
    /*
     * here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
     * sub-queries not correctly released and master sql object of metric query reaches an abnormal state.
     */
    pSql->pSubs[i]->res.code = TSDB_CODE_QUERY_CANCELLED;
S
slguan 已提交
466
    //taosStopRpcConn(pSql->pSubs[i]->thandle);
H
hzcheng 已提交
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487
  }

  /*
   * 1. if the subqueries are not launched or partially launched, we need to waiting the launched
   * query return to successfully free allocated resources.
   * 2. if no any subqueries are launched yet, which means the metric query only in parse sql stage,
   * set the res.code, and return.
   */
  const int64_t MAX_WAITING_TIME = 10000;  // 10 Sec.
  int64_t       stime = taosGetTimestampMs();

  while (pSql->cmd.command != TSDB_SQL_RETRIEVE_METRIC && pSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
    taosMsleep(100);
    if (taosGetTimestampMs() - stime > MAX_WAITING_TIME) {
      break;
    }
  }

  tscTrace("%p metric query is cancelled", pSql);
}

488
int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
489 490 491 492 493
  char *pMsg, *pStart;

  pStart = pSql->cmd.payload + tsRpcHeadSize;
  pMsg = pStart;

494
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *)pMsg;
S
slguan 已提交
495
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
S
slguan 已提交
496 497
  pMsg += sizeof(pSql->res.qhandle);

498
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
S
slguan 已提交
499
  pRetrieveMsg->free = htons(pQueryInfo->type);
500
  pMsg += sizeof(pQueryInfo->type);
H
hzcheng 已提交
501

502 503 504 505 506
  pRetrieveMsg->header.vgId = htonl(1);
  pMsg += sizeof(SRetrieveTableMsg);
  
  pRetrieveMsg->header.contLen = htonl(pSql->cmd.payloadLen);
  
S
slguan 已提交
507
  pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE;
508
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
509 510
}

511
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
512
  SSubmitMsg *pShellMsg;
H
hzcheng 已提交
513
  char *           pMsg, *pStart;
S
slguan 已提交
514

H
hjxilinx 已提交
515
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
516
  STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
H
hjxilinx 已提交
517
  
H
hzcheng 已提交
518 519
  pStart = pSql->cmd.payload + tsRpcHeadSize;
  pMsg = pStart;
H
hjxilinx 已提交
520
  
521 522 523
  SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
  pMsgDesc->numOfVnodes = htonl(1);       //set the number of vnodes
  pMsg += sizeof(SMsgDesc);
H
hjxilinx 已提交
524
  
525
  pShellMsg = (SSubmitMsg *)pMsg;
H
hjxilinx 已提交
526
  pShellMsg->header.vgId = htonl(pTableMeta->vgId);
H
hjxilinx 已提交
527
  pShellMsg->header.contLen = htonl(pSql->cmd.payloadLen);
528
  pShellMsg->length = pShellMsg->header.contLen;
H
hjxilinx 已提交
529
  
530
  pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit);  // number of meters to be inserted
H
hzcheng 已提交
531

S
slguan 已提交
532
  // pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here
S
slguan 已提交
533
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
534
  
H
hjxilinx 已提交
535 536
//  tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pTableMeta->vpeerDesc[pTableMeta->index].ip),
//           htons(pShellMsg->vnode));
537
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
538 539 540 541 542 543
}

/*
 * for meter query, simply return the size <= 1k
 * for metric query, estimate size according to meter tags
 */
544
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
545
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
546
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
547

548
  int32_t srcColListSize = pQueryInfo->colList.numOfCols * sizeof(SColumnInfo);
549

H
hjxilinx 已提交
550
  int32_t         exprSize = sizeof(SSqlFuncExprMsg) * pQueryInfo->exprsInfo.numOfExprs;
H
hjxilinx 已提交
551
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
552 553

  // meter query without tags values
H
hjxilinx 已提交
554
  if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
S
slguan 已提交
555
    return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize;
H
hzcheng 已提交
556
  }
H
hjxilinx 已提交
557 558 559 560
  
  int32_t size = 4096;
  
#if 0
H
hjxilinx 已提交
561 562
  SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
  SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex);
H
hzcheng 已提交
563

564
  int32_t meterInfoSize = (pMetricMeta->tagLen + sizeof(STableIdInfo)) * pVnodeSidList->numOfSids;
H
hjxilinx 已提交
565
  int32_t outputColumnSize = pQueryInfo->exprsInfo.numOfExprs * sizeof(SSqlFuncExprMsg);
H
hzcheng 已提交
566

S
slguan 已提交
567
  int32_t size = meterInfoSize + outputColumnSize + srcColListSize + exprSize + MIN_QUERY_MSG_PKT_SIZE;
568 569
  if (pQueryInfo->tsBuf != NULL) {
    size += pQueryInfo->tsBuf->fileSize;
S
slguan 已提交
570
  }
H
hjxilinx 已提交
571 572
#endif
  
S
slguan 已提交
573
  return size;
H
hzcheng 已提交
574 575
}

H
hjxilinx 已提交
576
static char *doSerializeTableInfo(SSqlObj *pSql, int32_t vgId, char *pMsg) {
577
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
578

H
hjxilinx 已提交
579
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
580 581 582 583 584 585 586 587
  tscTrace("%p vgId:%d, query on table:%s, uid:%" PRIu64, pSql, vgId, pTableMetaInfo->name, pTableMeta->uid);
  
  STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
  pTableIdInfo->sid = htonl(pTableMeta->sid);
  pTableIdInfo->uid = htobe64(pTableMeta->uid);
  pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid));
  
  pMsg += sizeof(STableIdInfo);
588 589 590
  return pMsg;
}

591
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
592 593
  SSqlCmd *pCmd = &pSql->cmd;

594
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
595

S
slguan 已提交
596 597 598 599
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
    return -1;
  }
600
  
601
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
602
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
603
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
604
//  SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
605 606 607 608 609 610 611
  
  if (pQueryInfo->colList.numOfCols <= 0) {
    tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
    return -1;
  }

  char *pStart = pCmd->payload + tsRpcHeadSize;
H
hzcheng 已提交
612

S
slguan 已提交
613
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart;
H
hzcheng 已提交
614 615

  int32_t msgLen = 0;
S
slguan 已提交
616
  int32_t numOfTables = 0;
H
hzcheng 已提交
617

H
hjxilinx 已提交
618
  if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
S
slguan 已提交
619
    numOfTables = 1;
620
    pQueryMsg->head.vgId = htonl(pTableMeta->vgId);
H
hjxilinx 已提交
621
    tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name);
622
  } else {  // query on super table
H
hjxilinx 已提交
623 624
    if (pTableMetaInfo->vnodeIndex < 0) {
      tscError("%p error vnodeIdx:%d", pSql, pTableMetaInfo->vnodeIndex);
H
hzcheng 已提交
625 626
      return -1;
    }
H
hjxilinx 已提交
627 628 629 630
    
    uint32_t vnodeId = 1;
    
#if 0
H
hjxilinx 已提交
631
    SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex);
H
hzcheng 已提交
632 633
    uint32_t       vnodeId = pVnodeSidList->vpeerDesc[pVnodeSidList->index].vnode;

S
slguan 已提交
634 635 636
    numOfTables = pVnodeSidList->numOfSids;
    if (numOfTables <= 0) {
      tscError("%p vid:%d,error numOfTables in query message:%d", pSql, vnodeId, numOfTables);
H
hzcheng 已提交
637 638
      return -1;  // error
    }
H
hjxilinx 已提交
639 640
#endif
    
H
hjxilinx 已提交
641
    tscTrace("%p query on vid:%d, number of tables:%d", pSql, vnodeId, numOfTables);
H
hjxilinx 已提交
642 643
    pQueryMsg->head.vgId = htonl(vnodeId);
    numOfTables = 1;
H
hzcheng 已提交
644 645
  }

H
hjxilinx 已提交
646
  pQueryMsg->numOfTables = htonl(numOfTables);
H
hzcheng 已提交
647

648
  if (pQueryInfo->order.order == TSQL_SO_ASC) {
H
hjxilinx 已提交
649 650
    pQueryMsg->window.skey = htobe64(pQueryInfo->stime);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->etime);
H
hzcheng 已提交
651
  } else {
H
hjxilinx 已提交
652 653
    pQueryMsg->window.skey = htobe64(pQueryInfo->etime);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->stime);
H
hzcheng 已提交
654 655
  }

656 657 658 659 660 661 662 663
  pQueryMsg->order          = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId     = htons(pQueryInfo->order.orderColId);
  pQueryMsg->interpoType    = htons(pQueryInfo->interpoType);
  pQueryMsg->limit          = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset         = htobe64(pQueryInfo->limit.offset);
  pQueryMsg->numOfCols      = htons(pQueryInfo->colList.numOfCols);
  pQueryMsg->intervalTime   = htobe64(pQueryInfo->intervalTime);
  pQueryMsg->slidingTime    = htobe64(pQueryInfo->slidingTime);
H
hjxilinx 已提交
664
  pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
665
  
666 667
  if (pQueryInfo->intervalTime < 0) {
    tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
H
hzcheng 已提交
668 669 670
    return -1;
  }

671 672
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
S
slguan 已提交
673 674 675
    return -1;
  }

676
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
H
hjxilinx 已提交
677
  if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {  // query on meter
H
hzcheng 已提交
678
    pQueryMsg->tagLength = 0;
H
hjxilinx 已提交
679
  } else {  // query on super table
H
hjxilinx 已提交
680
    pQueryMsg->tagLength = htons(0);
H
hzcheng 已提交
681 682
  }

683 684
  pQueryMsg->queryType = htons(pQueryInfo->type);
  pQueryMsg->numOfOutputCols = htons(pQueryInfo->exprsInfo.numOfExprs);
H
hzcheng 已提交
685

H
hjxilinx 已提交
686 687 688
  int32_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutputCols;
  if (numOfOutput < 0) {
    tscError("%p illegal value of number of output columns in query msg: %d", pSql, numOfOutput);
H
hzcheng 已提交
689 690 691 692
    return -1;
  }

  // set column list ids
H
hjxilinx 已提交
693
  char *pMsg = (char *)(pQueryMsg->colList) + pQueryInfo->colList.numOfCols * sizeof(SColumnInfo);
H
hjxilinx 已提交
694
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
H
hzcheng 已提交
695

696 697
  for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) {
    SColumnBase *pCol = tscColumnBaseInfoGet(&pQueryInfo->colList, i);
S
slguan 已提交
698
    SSchema *    pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
699

H
hjxilinx 已提交
700 701 702 703 704 705 706 707
//    if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
//        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
//      tscError("%p vid:%d sid:%d id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s", pSql,
//               htons(pQueryMsg->vnode), pTableMeta->sid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex,
//               pColSchema->name);
//
//      return -1;  // 0 means build msg failed
//    }
H
hzcheng 已提交
708 709 710

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
711
    pQueryMsg->colList[i].type  = htons(pColSchema->type);
S
slguan 已提交
712
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
713

S
slguan 已提交
714 715 716
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
717

S
slguan 已提交
718 719 720 721
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
      pFilterMsg->filterOnBinary = htons(pColFilter->filterOnBinary);

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
722

S
slguan 已提交
723 724 725 726 727 728 729 730 731 732 733
      if (pColFilter->filterOnBinary) {
        pFilterMsg->len = htobe64(pColFilter->len);
        memcpy(pMsg, (void *)pColFilter->pz, pColFilter->len + 1);
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
734

S
slguan 已提交
735 736 737 738 739
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
        return -1;
      }
    }
H
hzcheng 已提交
740 741 742 743
  }

  bool hasArithmeticFunction = false;

S
slguan 已提交
744
  SSqlFuncExprMsg *pSqlFuncExpr = (SSqlFuncExprMsg *)pMsg;
H
hzcheng 已提交
745

H
hjxilinx 已提交
746
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
747
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
748

S
slguan 已提交
749
    if (pExpr->functionId == TSDB_FUNC_ARITHM) {
H
hzcheng 已提交
750 751 752
      hasArithmeticFunction = true;
    }

H
hjxilinx 已提交
753
    if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId)) {
H
hzcheng 已提交
754 755 756 757 758
      /* column id is not valid according to the cached metermeta, the meter meta is expired */
      tscError("%p table schema is not matched with parsed sql", pSql);
      return -1;
    }

759
    pSqlFuncExpr->colInfo.colId  = htons(pExpr->colInfo.colId);
H
hzcheng 已提交
760
    pSqlFuncExpr->colInfo.colIdx = htons(pExpr->colInfo.colIdx);
761
    pSqlFuncExpr->colInfo.flag   = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
762

763
    pSqlFuncExpr->functionId  = htons(pExpr->functionId);
H
hzcheng 已提交
764 765 766 767 768 769 770 771 772
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
    pMsg += sizeof(SSqlFuncExprMsg);

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
S
slguan 已提交
773 774 775

        // by plus one char to make the string null-terminated
        pMsg += pExpr->param[j].nLen + 1;
H
hzcheng 已提交
776 777 778 779 780 781 782 783 784 785
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

    pSqlFuncExpr = (SSqlFuncExprMsg *)pMsg;
  }

  int32_t len = 0;
  if (hasArithmeticFunction) {
786 787
    SColumnBase *pColBase = pQueryInfo->colList.pColList;
    for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) {
S
slguan 已提交
788
      char *  name = pSchema[pColBase[i].colIndex.columnIndex].name;
H
hzcheng 已提交
789 790 791 792 793 794 795 796 797 798 799
      int32_t lenx = strlen(name);
      memcpy(pMsg, name, lenx);
      *(pMsg + lenx) = ',';

      len += (lenx + 1);  // one for comma
      pMsg += (lenx + 1);
    }
  }

  pQueryMsg->colNameLen = htonl(len);

800
  // serialize the table info (sid, uid, tags)
H
hjxilinx 已提交
801
  pMsg = doSerializeTableInfo(pSql, htons(pQueryMsg->head.vgId), pMsg);
H
hzcheng 已提交
802

803
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
S
slguan 已提交
804 805
  if (pGroupbyExpr->numOfGroupCols != 0) {
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
806 807
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
808 809 810 811 812 813 814 815 816 817 818 819 820 821
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
      SColIndexEx *pCol = &pGroupbyExpr->columnInfo[j];

      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

      *((int16_t *)pMsg) += pCol->colIdx;
      pMsg += sizeof(pCol->colIdx);

      *((int16_t *)pMsg) += pCol->colIdxInBuf;
      pMsg += sizeof(pCol->colIdxInBuf);

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
822 823 824
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
825 826 827
    }
  }

828 829 830 831
  if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
      *((int64_t *)pMsg) = htobe64(pQueryInfo->defaultVal[i]);
      pMsg += sizeof(pQueryInfo->defaultVal[0]);
S
slguan 已提交
832 833 834 835 836 837 838 839
    }
  }

  // compressed ts block
  pQueryMsg->tsOffset = htonl(pMsg - pStart);
  int32_t tsLen = 0;
  int32_t numOfBlocks = 0;

840
  if (pQueryInfo->tsBuf != NULL) {
H
hjxilinx 已提交
841
    STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vnodeIndex);
842
    assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL);  // this query should not be sent
S
slguan 已提交
843 844

    // todo refactor
845 846
    fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET);
    fread(pMsg, pBlockInfo->compLen, 1, pQueryInfo->tsBuf->f);
S
slguan 已提交
847 848 849 850

    pMsg += pBlockInfo->compLen;
    tsLen = pBlockInfo->compLen;
    numOfBlocks = pBlockInfo->numOfBlocks;
H
hzcheng 已提交
851 852
  }

S
slguan 已提交
853 854
  pQueryMsg->tsLen = htonl(tsLen);
  pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
855 856
  if (pQueryInfo->tsBuf != NULL) {
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
H
hzcheng 已提交
857 858 859 860 861 862
  }

  msgLen = pMsg - pStart;

  tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
  pCmd->payloadLen = msgLen;
S
slguan 已提交
863
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hjxilinx 已提交
864
  
865
  pQueryMsg->head.contLen = htonl(msgLen);
H
hzcheng 已提交
866
  assert(msgLen + minMsgSize() <= size);
867 868

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
869 870
}

871 872
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
873
  pCmd->payloadLen = sizeof(SCMCreateDbMsg);
S
slguan 已提交
874
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
H
hzcheng 已提交
875

S
slguan 已提交
876 877 878 879 880
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

881
  SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload;
882

883
  assert(pCmd->numOfClause == 1);
884
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
885
  strncpy(pCreateDbMsg->db, pTableMetaInfo->name, tListLen(pCreateDbMsg->db));
H
hzcheng 已提交
886

887
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
888 889
}

890 891
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
892
  pCmd->payloadLen = sizeof(SCMCreateDnodeMsg);
S
slguan 已提交
893 894 895 896
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
897

898
  SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload;
899
  strncpy(pCreate->ip, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
S
slguan 已提交
900
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
H
hzcheng 已提交
901

902
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
903 904
}

905 906
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
907
  pCmd->payloadLen = sizeof(SCMCreateAcctMsg);
S
slguan 已提交
908 909 910 911
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
912

913
  SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
914

915 916
  SSQLToken *pName = &pInfo->pDCLInfo->user.user;
  SSQLToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
917

918 919
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
920

921
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
922

923 924 925 926 927 928 929 930
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
931

932 933 934 935 936 937 938 939 940 941 942 943 944
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
945

S
slguan 已提交
946
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
947
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
948 949
}

950 951
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
952
  pCmd->payloadLen = sizeof(SCMCreateUserMsg);
S
slguan 已提交
953

S
slguan 已提交
954 955 956 957 958
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

959
  SCMCreateUserMsg *pAlterMsg = (SCMCreateUserMsg*)pCmd->payload;
H
hzcheng 已提交
960

961 962 963
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
  pAlterMsg->flag = pUser->type;
H
hzcheng 已提交
964

965 966 967 968
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
969 970
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
971
  }
H
hzcheng 已提交
972

973
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
S
slguan 已提交
974
    pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
975
  } else {
S
slguan 已提交
976
    pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
977
  }
H
hzcheng 已提交
978

979
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
980 981
}

982 983
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
984
  pCmd->payloadLen = sizeof(SCMCfgDnodeMsg);
H
hzcheng 已提交
985

S
slguan 已提交
986 987 988 989
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
990

S
slguan 已提交
991
  pCmd->msgType = TSDB_MSG_TYPE_MD_CONFIG_DNODE;
992 993
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
994

995 996
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
997
  pCmd->payloadLen = sizeof(SCMDropDbMsg);
H
hzcheng 已提交
998

S
slguan 已提交
999 1000 1001 1002 1003
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

1004
  SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
1005

1006
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1007
  strncpy(pDropDbMsg->db, pTableMetaInfo->name, tListLen(pDropDbMsg->db));
1008
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1009

S
slguan 已提交
1010
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
1011
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1012 1013
}

1014 1015
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1016
  pCmd->payloadLen = sizeof(SCMDropTableMsg);
H
hzcheng 已提交
1017

S
slguan 已提交
1018 1019 1020 1021
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1022

1023
  SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
1024
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1025
  strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
S
slguan 已提交
1026
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1027

S
slguan 已提交
1028
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
1029
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1030 1031
}

1032
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1033
  SSqlCmd *pCmd = &pSql->cmd;
1034
  pCmd->payloadLen = sizeof(SCMDropDnodeMsg);
S
slguan 已提交
1035 1036 1037 1038
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1039

1040
  SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
1041
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1042
  strcpy(pDrop->ip, pTableMetaInfo->name);
S
slguan 已提交
1043
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
H
hzcheng 已提交
1044

1045
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1046 1047
}

1048 1049
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1050
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
S
slguan 已提交
1051
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
H
hzcheng 已提交
1052

S
slguan 已提交
1053 1054 1055 1056
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1057

1058
  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
1059
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1060
  strcpy(pDropMsg->user, pTableMetaInfo->name);
H
hzcheng 已提交
1061

1062
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1063 1064
}

1065 1066
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1067
  pCmd->payloadLen = sizeof(SCMUseDbMsg);
H
hzcheng 已提交
1068

S
slguan 已提交
1069 1070 1071 1072
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
1073

1074
  SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload;
1075
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1076
  strcpy(pUseDbMsg->db, pTableMetaInfo->name);
S
slguan 已提交
1077
  pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
H
hzcheng 已提交
1078

1079
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1080 1081
}

1082
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1083
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1084
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1085
  pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
1086
  pCmd->payloadLen = sizeof(SCMShowMsg) + 100;
H
hzcheng 已提交
1087

S
slguan 已提交
1088 1089 1090
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
1091
  }
H
hzcheng 已提交
1092

1093
  SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload;
S
slguan 已提交
1094

1095
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1096
  size_t nameLen = strlen(pTableMetaInfo->name);
S
slguan 已提交
1097
  if (nameLen > 0) {
H
hjxilinx 已提交
1098
    strcpy(pShowMsg->db, pTableMetaInfo->name);  // prefix is set here
H
hzcheng 已提交
1099
  } else {
S
slguan 已提交
1100
    strcpy(pShowMsg->db, pObj->db);
H
hzcheng 已提交
1101 1102
  }

1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
    SSQLToken *pPattern = &pShowInfo->pattern;
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
    SSQLToken *pIpAddr = &pShowInfo->prefix;
    assert(pIpAddr->n > 0 && pIpAddr->type > 0);
H
hzcheng 已提交
1115

1116 1117 1118 1119
    strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n);
    pShowMsg->payloadLen = htons(pIpAddr->n);
  }

1120
  pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
1121
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1122 1123
}

1124
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1125
  SSqlCmd *pCmd = &pSql->cmd;
1126
  pCmd->payloadLen = sizeof(SCMKillQueryMsg);
H
hzcheng 已提交
1127

S
slguan 已提交
1128 1129 1130 1131
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1132

1133
  SCMKillQueryMsg *pKill = (SCMKillQueryMsg*)pCmd->payload;
1134 1135 1136
  strncpy(pKill->queryId, pInfo->pDCLInfo->ip.z, pInfo->pDCLInfo->ip.n);
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
S
slguan 已提交
1137
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
1138 1139
      break;
    case TSDB_SQL_KILL_CONNECTION:
S
slguan 已提交
1140
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
1141 1142
      break;
    case TSDB_SQL_KILL_STREAM:
S
slguan 已提交
1143
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
1144 1145 1146
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1147 1148
}

1149
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1150 1151
  SSqlCmd *pCmd = &(pSql->cmd);

1152
  int32_t size = minMsgSize() + sizeof(SMgmtHead) + sizeof(SCMCreateTableMsg);
H
hzcheng 已提交
1153

1154
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1155
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
1156 1157
    size += sizeof(STagData);
  } else {
S
slguan 已提交
1158
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1159
  }
1160

1161 1162 1163
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1164 1165 1166 1167

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1168
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1169
  int              msgLen = 0;
S
slguan 已提交
1170
  SSchema *        pSchema;
H
hzcheng 已提交
1171
  int              size = 0;
1172 1173 1174
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1175
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1176 1177

  // Reallocate the payload size
1178
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1179 1180
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1181
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
1182
  }
H
hzcheng 已提交
1183 1184


1185
  SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
H
hjxilinx 已提交
1186
  strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
1187 1188

  // use dbinfo from table id without modifying current db info
H
hjxilinx 已提交
1189
  tscGetDBInfoFromMeterId(pTableMetaInfo->name, pCreateTableMsg->db);
H
hzcheng 已提交
1190

1191 1192 1193 1194
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;

H
hzcheng 已提交
1195 1196 1197 1198
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
S
slguan 已提交
1199
  char *pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
1200

1201 1202 1203
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
    memcpy(pMsg, &pInfo->pCreateTableInfo->usingInfo.tagdata, sizeof(STagData));
H
hzcheng 已提交
1204
    pMsg += sizeof(STagData);
1205
  } else {  // create (super) table
1206
    pSchema = (SSchema *)pCreateTableMsg->schema;
1207

H
hzcheng 已提交
1208
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
1209
      TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
H
hzcheng 已提交
1210 1211 1212 1213

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
1214

H
hzcheng 已提交
1215 1216 1217 1218
      pSchema++;
    }

    pMsg = (char *)pSchema;
1219 1220
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
1221

1222 1223 1224
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
1225 1226 1227
    }
  }

1228
  tscClearFieldInfo(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
1229

S
slguan 已提交
1230
  msgLen = pMsg - (char*)pCreateTableMsg;
S
slguan 已提交
1231
  pCreateTableMsg->contLen = htonl(msgLen);
H
hzcheng 已提交
1232
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1233
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
H
hzcheng 已提交
1234 1235

  assert(msgLen + minMsgSize() <= size);
1236
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1237 1238 1239
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
1240
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
1241
  return minMsgSize() + sizeof(SMgmtHead) + sizeof(SCMAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
1242 1243 1244
         TSDB_EXTRA_PAYLOAD_SIZE;
}

1245
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1246
  SCMAlterTableMsg *pAlterTableMsg;
1247
  char *          pMsg;
H
hzcheng 已提交
1248 1249 1250
  int             msgLen = 0;
  int             size = 0;

1251 1252 1253
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
1254
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1255 1256

  size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
1257 1258 1259 1260
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1261

1262
  pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
H
hzcheng 已提交
1263

H
hjxilinx 已提交
1264
  tscGetDBInfoFromMeterId(pTableMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
1265

1266 1267
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;

H
hjxilinx 已提交
1268
  strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
1269
  pAlterTableMsg->type = htons(pAlterInfo->type);
1270

1271
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
1272
  memcpy(pAlterTableMsg->tagVal, pAlterInfo->tagData.data, TSDB_MAX_TAGS_LEN);
H
hzcheng 已提交
1273

S
slguan 已提交
1274
  SSchema *pSchema = pAlterTableMsg->schema;
1275 1276
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
    TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
H
hzcheng 已提交
1277 1278 1279 1280 1281 1282 1283 1284 1285

    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;

S
slguan 已提交
1286
  msgLen = pMsg - (char*)pAlterTableMsg;
H
hzcheng 已提交
1287
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1288
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
H
hzcheng 已提交
1289 1290

  assert(msgLen + minMsgSize() <= size);
1291

1292
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1293 1294
}

1295
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1296
  SSqlCmd *pCmd = &pSql->cmd;
1297
  pCmd->payloadLen = sizeof(SCMAlterDbMsg);
S
slguan 已提交
1298
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
H
hzcheng 已提交
1299

S
slguan 已提交
1300 1301 1302 1303
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1304

1305
  SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
1306
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1307
  strcpy(pAlterDbMsg->db, pTableMetaInfo->name);
H
hzcheng 已提交
1308

1309
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1310 1311
}

1312
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1313 1314 1315
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_RETRIEVE;
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
1316

S
slguan 已提交
1317 1318 1319
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
H
hzcheng 已提交
1320
  }
S
slguan 已提交
1321

S
slguan 已提交
1322 1323 1324 1325
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
1326

1327
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1328 1329
}

1330
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
1331
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1332 1333 1334
    return pRes->code;
  }

1335
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
H
hjxilinx 已提交
1336 1337
    int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
    pRes->tsrow[i] = (pRes->data + offset * pRes->numOfRows);
H
hzcheng 已提交
1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
1352

1353
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1354

H
hzcheng 已提交
1355 1356 1357 1358 1359 1360 1361
  pRes->code = TSDB_CODE_SUCCESS;

  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

1362
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1363
  } else {
S
slguan 已提交
1364
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
1380
  SSqlCmd *       pCmd = &pSql->cmd;
1381
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
1382

H
hjxilinx 已提交
1383
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1384 1385
  
  int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
H
hzcheng 已提交
1386 1387 1388 1389
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

int tscProcessTagRetrieveRsp(SSqlObj *pSql) {
1390 1391
  SSqlCmd *pCmd = &pSql->cmd;

1392
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
1393
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1394 1395

  int32_t numOfRes = 0;
H
hjxilinx 已提交
1396
#if 0
1397
  if (tscSqlExprGet(pQueryInfo, 0)->functionId == TSDB_FUNC_TAGPRJ) {
H
hjxilinx 已提交
1398
    numOfRes = pTableMetaInfo->pMetricMeta->numOfTables;
H
hzcheng 已提交
1399 1400 1401
  } else {
    numOfRes = 1;  // for count function, there is only one output.
  }
H
hjxilinx 已提交
1402 1403 1404
  
#endif

H
hzcheng 已提交
1405 1406 1407 1408 1409 1410 1411
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

int tscProcessRetrieveMetricRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
1412 1413
  pRes->code = tscDoLocalreduce(pSql);
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1414 1415

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
1416
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1417 1418 1419 1420
  }

  pRes->row = 0;

1421
  uint8_t code = pRes->code;
H
hzcheng 已提交
1422
  if (pSql->fp) {  // async retrieve metric data
1423 1424
    if (pRes->code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
H
hzcheng 已提交
1425 1426 1427 1428 1429 1430 1431 1432
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

S
slguan 已提交
1433
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
1434

1435
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1436
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1437
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1438
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
1439
  pCmd->payloadLen = sizeof(SCMConnectMsg);
H
hzcheng 已提交
1440

S
slguan 已提交
1441 1442 1443 1444 1445
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

1446
  SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
H
hzcheng 已提交
1447 1448 1449 1450 1451

  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
  strcpy(pConnect->db, db);
S
slguan 已提交
1452
  strcpy(pConnect->clientVersion, version);
S
slguan 已提交
1453
  strcpy(pConnect->msgVersion, "");
H
hzcheng 已提交
1454

1455
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1456 1457
}

H
hjxilinx 已提交
1458
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1459
  SCMTableInfoMsg *pInfoMsg;
S
slguan 已提交
1460
  char *         pMsg;
H
hzcheng 已提交
1461 1462 1463 1464 1465
  int            msgLen = 0;

  char *tmpData = 0;
  if (pSql->cmd.allocSize > 0) {
    tmpData = calloc(1, pSql->cmd.allocSize);
1466 1467 1468 1469
    if (NULL == tmpData) {
      return TSDB_CODE_CLI_OUT_OF_MEMORY;
    }

H
hzcheng 已提交
1470 1471 1472 1473
    // STagData is in binary format, strncpy is not available
    memcpy(tmpData, pSql->cmd.payload, pSql->cmd.allocSize);
  }

1474 1475 1476
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
1477
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1478

1479
  pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
H
hjxilinx 已提交
1480
  strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
H
hjxilinx 已提交
1481
  pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
S
slguan 已提交
1482

1483
  pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
H
hzcheng 已提交
1484

H
hjxilinx 已提交
1485
  if (pSql->cmd.autoCreated) {
H
hzcheng 已提交
1486 1487 1488 1489
    memcpy(pInfoMsg->tags, tmpData, sizeof(STagData));
    pMsg += sizeof(STagData);
  }

H
hjxilinx 已提交
1490
  pCmd->payloadLen = pMsg - (char*)pInfoMsg;;
S
slguan 已提交
1491
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
H
hzcheng 已提交
1492 1493 1494 1495

  tfree(tmpData);

  assert(msgLen + minMsgSize() <= pCmd->allocSize);
1496
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1497 1498
}

S
slguan 已提交
1499 1500
/**
 *  multi meter meta req pkg format:
1501
 *  | SMgmtHead | SCMMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
1502 1503
 *      no used         4B
 **/
1504
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
    tmpData = calloc(1, pCmd->payloadLen + 1);
    if (NULL == tmpData) return -1;
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
S
slguan 已提交
1517
  memset(pMgmt->db, 0, TSDB_TABLE_ID_LEN);  // server don't need the db
S
slguan 已提交
1518

1519
  SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
S
slguan 已提交
1520
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
1521 1522

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
1523
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
1524 1525 1526 1527
  }

  tfree(tmpData);

1528
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1529
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
S
slguan 已提交
1530 1531 1532

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

S
slguan 已提交
1533
  tscTrace("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
1534 1535 1536 1537 1538
           pCmd->payloadLen);

  return pCmd->payloadLen;
}

H
hzcheng 已提交
1539 1540
static int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
  const int32_t defaultSize =
S
slguan 已提交
1541
      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
1542
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hzcheng 已提交
1543

S
slguan 已提交
1544
  int32_t n = 0;
1545 1546
  for (int32_t i = 0; i < pQueryInfo->tagCond.numOfTagCond; ++i) {
    n += strlen(pQueryInfo->tagCond.cond[i].cond);
H
hzcheng 已提交
1547
  }
S
slguan 已提交
1548

H
hjxilinx 已提交
1549
  int32_t tagLen = n * TSDB_NCHAR_SIZE;
1550 1551
  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
H
hjxilinx 已提交
1552
  }
1553

S
slguan 已提交
1554
  int32_t joinCondLen = (TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2;
S
slguan 已提交
1555
  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
H
hjxilinx 已提交
1556 1557
  
  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndexEx);
S
slguan 已提交
1558

H
hjxilinx 已提交
1559
  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
S
slguan 已提交
1560 1561

  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
H
hzcheng 已提交
1562 1563
}

H
hjxilinx 已提交
1564 1565 1566
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {

#if 0
S
slguan 已提交
1567
  SSuperTableMetaMsg *pMetaMsg;
H
hzcheng 已提交
1568 1569
  char *          pMsg, *pStart;
  int             msgLen = 0;
S
slguan 已提交
1570
  int             tableIndex = 0;
H
hzcheng 已提交
1571

1572 1573 1574
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

1575
  STagCond *pTagCond = &pQueryInfo->tagCond;
S
slguan 已提交
1576

H
hjxilinx 已提交
1577
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
H
hzcheng 已提交
1578 1579

  int32_t size = tscEstimateMetricMetaMsgSize(pCmd);
S
slguan 已提交
1580 1581 1582 1583
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for metric meter msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1584 1585 1586 1587 1588

  pStart = pCmd->payload + tsRpcHeadSize;
  pMsg = pStart;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
H
hjxilinx 已提交
1589
  tscGetDBInfoFromMeterId(pTableMetaInfo->name, pMgmt->db);
H
hzcheng 已提交
1590 1591 1592

  pMsg += sizeof(SMgmtHead);

S
slguan 已提交
1593
  pMetaMsg = (SSuperTableMetaMsg *)pMsg;
S
slguan 已提交
1594
  pMetaMsg->numOfTables = htonl(pQueryInfo->numOfTables);
S
slguan 已提交
1595

S
slguan 已提交
1596
  pMsg += sizeof(SSuperTableMetaMsg);
S
slguan 已提交
1597 1598 1599 1600 1601

  int32_t offset = pMsg - (char *)pMetaMsg;
  pMetaMsg->join = htonl(offset);

  // todo refactor
S
slguan 已提交
1602
  pMetaMsg->joinCondLen = htonl((TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2);
H
hzcheng 已提交
1603

S
slguan 已提交
1604
  memcpy(pMsg, pTagCond->joinInfo.left.tableId, TSDB_TABLE_ID_LEN);
S
slguan 已提交
1605
  pMsg += TSDB_TABLE_ID_LEN;
H
hzcheng 已提交
1606

S
slguan 已提交
1607 1608 1609
  *(int16_t *)pMsg = pTagCond->joinInfo.left.tagCol;
  pMsg += sizeof(int16_t);

S
slguan 已提交
1610
  memcpy(pMsg, pTagCond->joinInfo.right.tableId, TSDB_TABLE_ID_LEN);
S
slguan 已提交
1611
  pMsg += TSDB_TABLE_ID_LEN;
S
slguan 已提交
1612 1613 1614 1615

  *(int16_t *)pMsg = pTagCond->joinInfo.right.tagCol;
  pMsg += sizeof(int16_t);

1616
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
1617
    pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
H
hjxilinx 已提交
1618
    uint64_t uid = pTableMetaInfo->pTableMeta->uid;
S
slguan 已提交
1619 1620 1621 1622

    offset = pMsg - (char *)pMetaMsg;
    pMetaMsg->metaElem[i] = htonl(offset);

S
slguan 已提交
1623 1624
    SSuperTableMetaElemMsg *pElem = (SSuperTableMetaElemMsg *)pMsg;
    pMsg += sizeof(SSuperTableMetaElemMsg);
S
slguan 已提交
1625 1626 1627 1628

    // convert to unicode before sending to mnode for metric query
    int32_t condLen = 0;
    if (pTagCond->numOfTagCond > 0) {
H
hjxilinx 已提交
1629
      SCond *pCond = tsGetSTableQueryCondPos(pTagCond, uid);
H
hjxilinx 已提交
1630
      if (pCond != NULL && pCond->cond != NULL) {
H
hjxilinx 已提交
1631
        condLen = strlen(pCond->cond) + 1;
1632

H
hjxilinx 已提交
1633
        bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE);
S
slguan 已提交
1634
        if (!ret) {
H
hjxilinx 已提交
1635
          tscError("%p mbs to ucs4 failed:%s", pSql, tsGetSTableQueryCondPos(pTagCond, uid));
S
slguan 已提交
1636 1637 1638
          return 0;
        }
      }
H
hzcheng 已提交
1639 1640
    }

S
slguan 已提交
1641
    pElem->condLen = htonl(condLen);
H
hzcheng 已提交
1642

S
slguan 已提交
1643 1644 1645
    offset = pMsg - (char *)pMetaMsg;
    pElem->cond = htonl(offset);
    pMsg += condLen * TSDB_NCHAR_SIZE;
H
hzcheng 已提交
1646

S
slguan 已提交
1647 1648 1649
    pElem->rel = htons(pTagCond->relType);
    if (pTagCond->tbnameCond.uid == uid) {
      offset = pMsg - (char *)pMetaMsg;
H
hzcheng 已提交
1650

S
slguan 已提交
1651
      pElem->tableCond = htonl(offset);
H
hjxilinx 已提交
1652 1653 1654 1655 1656 1657 1658
      
      uint32_t len = 0;
      if (pTagCond->tbnameCond.cond != NULL) {
        len = strlen(pTagCond->tbnameCond.cond);
        memcpy(pMsg, pTagCond->tbnameCond.cond, len);
      }
      
H
hjxilinx 已提交
1659 1660
      pElem->tableCondLen = htonl(len);
      pMsg += len;
S
slguan 已提交
1661 1662
    }

1663
    SSqlGroupbyExpr *pGroupby = &pQueryInfo->groupbyExpr;
S
slguan 已提交
1664

H
hjxilinx 已提交
1665
    if (pGroupby->tableIndex != i && pGroupby->numOfGroupCols > 0) {
S
slguan 已提交
1666 1667 1668 1669 1670
      pElem->orderType = 0;
      pElem->orderIndex = 0;
      pElem->numOfGroupCols = 0;
    } else {
      pElem->numOfGroupCols = htons(pGroupby->numOfGroupCols);
H
hjxilinx 已提交
1671 1672
      for (int32_t j = 0; j < pTableMetaInfo->numOfTags; ++j) {
        pElem->tagCols[j] = htons(pTableMetaInfo->tagColumnIndex[j]);
S
slguan 已提交
1673 1674 1675 1676 1677 1678 1679 1680
      }

      if (pGroupby->numOfGroupCols != 0) {
        pElem->orderIndex = htons(pGroupby->orderIndex);
        pElem->orderType = htons(pGroupby->orderType);
        offset = pMsg - (char *)pMetaMsg;

        pElem->groupbyTagColumnList = htonl(offset);
1681 1682
        for (int32_t j = 0; j < pQueryInfo->groupbyExpr.numOfGroupCols; ++j) {
          SColIndexEx *pCol = &pQueryInfo->groupbyExpr.columnInfo[j];
1683 1684
          SColIndexEx *pDestCol = (SColIndexEx *)pMsg;

H
hjxilinx 已提交
1685 1686 1687 1688
          pDestCol->colIdxInBuf = 0;
          pDestCol->colIdx = htons(pCol->colIdx);
          pDestCol->colId = htons(pDestCol->colId);
          pDestCol->flag = htons(pDestCol->flag);
H
hjxilinx 已提交
1689
          strncpy(pDestCol->name, pCol->name, tListLen(pCol->name));
1690

H
hjxilinx 已提交
1691
          pMsg += sizeof(SColIndexEx);
S
slguan 已提交
1692 1693
        }
      }
H
hzcheng 已提交
1694
    }
S
slguan 已提交
1695

H
hjxilinx 已提交
1696 1697
    strcpy(pElem->tableId, pTableMetaInfo->name);
    pElem->numOfTags = htons(pTableMetaInfo->numOfTags);
S
slguan 已提交
1698 1699 1700

    int16_t len = pMsg - (char *)pElem;
    pElem->elemLen = htons(len);  // redundant data for integrate check
H
hzcheng 已提交
1701 1702 1703 1704
  }

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1705
  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
H
hzcheng 已提交
1706
  assert(msgLen + minMsgSize() <= size);
H
hjxilinx 已提交
1707
#endif
1708
  
H
hjxilinx 已提交
1709 1710 1711 1712 1713 1714 1715 1716 1717 1718
  SSqlCmd *pCmd = &pSql->cmd;

  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);

  SCMSTableVgroupMsg *pStableVgroupMsg = (SCMSTableVgroupMsg *) pCmd->payload;
  strncpy(pStableVgroupMsg->tableId, pTableMetaInfo->name, tListLen(pStableVgroupMsg->tableId));

  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
  pCmd->payloadLen = sizeof(SCMSTableVgroupMsg);

1719
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1720 1721
}

1722
int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) {
H
hzcheng 已提交
1723 1724 1725 1726
  int      size = 0;
  STscObj *pObj = pSql->pTscObj;

  size += tsRpcHeadSize + sizeof(SMgmtHead);
S
slguan 已提交
1727
  size += sizeof(SQqueryList);
H
hzcheng 已提交
1728 1729 1730

  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
S
slguan 已提交
1731
    size += sizeof(SQueryDesc);
H
hzcheng 已提交
1732 1733 1734
    tpSql = tpSql->next;
  }

S
slguan 已提交
1735
  size += sizeof(SStreamList);
H
hzcheng 已提交
1736 1737
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
S
slguan 已提交
1738
    size += sizeof(SStreamDesc);
H
hzcheng 已提交
1739 1740 1741 1742 1743 1744
    pStream = pStream->next;
  }

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1745
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1746 1747 1748 1749 1750 1751 1752 1753 1754
  char *pMsg, *pStart;
  int   msgLen = 0;
  int   size = 0;

  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;

  pthread_mutex_lock(&pObj->mutex);

1755
  size = tscEstimateHeartBeatMsgLength(pSql);
S
slguan 已提交
1756 1757 1758 1759
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for heartbeat msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772

  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
  strcpy(pMgmt->db, pObj->db);
  pMsg += sizeof(SMgmtHead);

  pMsg = tscBuildQueryStreamDesc(pMsg, pObj);
  pthread_mutex_unlock(&pObj->mutex);

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1773
  pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
H
hzcheng 已提交
1774 1775 1776 1777 1778

  assert(msgLen + minMsgSize() <= size);
  return msgLen;
}

1779 1780
int tscProcessTableMetaRsp(SSqlObj *pSql) {
  STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
H
hzcheng 已提交
1781

1782 1783
  pMetaMsg->sid = htonl(pMetaMsg->sid);
  pMetaMsg->sversion = htons(pMetaMsg->sversion);
H
hjxilinx 已提交
1784
  pMetaMsg->vgId = htonl(pMetaMsg->vgId);
1785 1786
  pMetaMsg->uid = htobe64(pMetaMsg->uid);
  pMetaMsg->contLen = htons(pMetaMsg->contLen);
1787
  pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1788

H
hjxilinx 已提交
1789 1790
  if (pMetaMsg->sid < 0 || pMetaMsg->vgId < 0) {
    tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgId, pMetaMsg->sid);
H
hzcheng 已提交
1791 1792 1793
    return TSDB_CODE_INVALID_VALUE;
  }

1794 1795
  if (pMetaMsg->numOfTags > TSDB_MAX_TAGS || pMetaMsg->numOfTags < 0) {
    tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
H
hzcheng 已提交
1796 1797 1798
    return TSDB_CODE_INVALID_VALUE;
  }

1799 1800
  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
H
hzcheng 已提交
1801 1802 1803 1804
    return TSDB_CODE_INVALID_VALUE;
  }

  for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
1805
    pMetaMsg->vpeerDesc[i].vnode = htonl(pMetaMsg->vpeerDesc[i].vnode);
H
hzcheng 已提交
1806 1807
  }

1808
  SSchema* pSchema = pMetaMsg->schema;
H
hzcheng 已提交
1809

1810
  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
H
hzcheng 已提交
1811 1812 1813 1814 1815 1816
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);
    pSchema++;
  }

1817 1818
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
H
hzcheng 已提交
1819

1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830
#if 0
  // if current table is created according to super table, get the table meta of super table
  if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
    char id[TSDB_TABLE_ID_LEN + 1] = {0};
    strncpy(id, pMetaMsg->stableId, TSDB_TABLE_ID_LEN);
  
    // NOTE: if the table meta of super table is not cached at client side yet, the pSTable is NULL
    pTableMeta->pSTable = taosCacheAcquireByName(tscCacheHandle, id);
  }
#endif
  
H
hzcheng 已提交
1831
  // todo add one more function: taosAddDataIfNotExists();
1832
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1833
  assert(pTableMetaInfo->pTableMeta == NULL);
H
hzcheng 已提交
1834

H
hjxilinx 已提交
1835 1836 1837
  pTableMetaInfo->pTableMeta =
      (STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name, pTableMeta, size, tsMeterMetaKeepTimer);
  
1838
  // todo handle out of memory case
1839
  if (pTableMetaInfo->pTableMeta == NULL) {
H
hjxilinx 已提交
1840
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
1841
  }
H
hzcheng 已提交
1842

1843
  free(pTableMeta);
1844
  
H
hjxilinx 已提交
1845
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1846 1847
}

S
slguan 已提交
1848 1849
/**
 *  multi meter meta rsp pkg format:
1850
 *  | STaosRsp | ieType | SCMMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
  uint8_t  ieType;
  int32_t  totalNum;
  int32_t  i;

  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
    pSql->res.code = TSDB_CODE_INVALID_IE;
    pSql->res.numOfTotal = 0;
    return TSDB_CODE_OTHERS;
  }

  rsp++;

1870
  SCMMultiTableInfoMsg *pInfo = (SCMMultiTableInfoMsg *)rsp;
S
slguan 已提交
1871
  totalNum = htonl(pInfo->numOfTables);
1872
  rsp += sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1873 1874

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
1875
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
1876
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
1877 1878 1879

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
H
hjxilinx 已提交
1880
    pMeta->vgId = htonl(pMeta->vgId);
S
slguan 已提交
1881 1882
    pMeta->uid = htobe64(pMeta->uid);

H
hjxilinx 已提交
1883 1884
    if (pMeta->sid <= 0 || pMeta->vgId < 0) {
      tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
S
slguan 已提交
1885 1886 1887 1888 1889
      pSql->res.code = TSDB_CODE_INVALID_VALUE;
      pSql->res.numOfTotal = i;
      return TSDB_CODE_OTHERS;
    }

H
hjxilinx 已提交
1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949
    //    pMeta->numOfColumns = htons(pMeta->numOfColumns);
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid tag value count:%d", pMeta->numOfTags);
    //      pSql->res.code = TSDB_CODE_INVALID_VALUE;
    //      pSql->res.numOfTotal = i;
    //      return TSDB_CODE_OTHERS;
    //    }
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid numOfTags:%d", pMeta->numOfTags);
    //      pSql->res.code = TSDB_CODE_INVALID_VALUE;
    //      pSql->res.numOfTotal = i;
    //      return TSDB_CODE_OTHERS;
    //    }
    //
    //    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
    //      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
    //      pSql->res.code = TSDB_CODE_INVALID_VALUE;
    //      pSql->res.numOfTotal = i;
    //      return TSDB_CODE_OTHERS;
    //    }
    //
    //    for (int j = 0; j < TSDB_VNODES_SUPPORT; ++j) {
    //      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    //    }
    //
    //    pMeta->rowSize = 0;
    //    rsp += sizeof(SMultiTableMeta);
    //    pSchema = (SSchema *)rsp;
    //
    //    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    //    for (int j = 0; j < numOfTotalCols; ++j) {
    //      pSchema->bytes = htons(pSchema->bytes);
    //      pSchema->colId = htons(pSchema->colId);
    //
    //      // ignore the tags length
    //      if (j < pMeta->numOfColumns) {
    //        pMeta->rowSize += pSchema->bytes;
    //      }
    //      pSchema++;
    //    }
    //
    //    rsp += numOfTotalCols * sizeof(SSchema);
    //
    //    int32_t  tagLen = 0;
    //    SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
    //
    //    if (pMeta->tableType == TSDB_CHILD_TABLE) {
    //      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
    //        tagLen += pTagsSchema[j].bytes;
    //      }
    //    }
    //
    //    rsp += tagLen;
    //    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
    //
    //    pMeta->index = 0;
    //    (void)taosCachePut(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsMeterMetaKeepTimer);
    //  }
S
slguan 已提交
1950
  }
H
hjxilinx 已提交
1951
  
S
slguan 已提交
1952 1953 1954 1955 1956 1957
  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
  tscTrace("%p load multi-metermeta resp complete num:%d", pSql, pSql->res.numOfTotal);
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
1958 1959
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
#if 0
S
slguan 已提交
1960 1961
  void **      metricMetaList = NULL;
  int32_t *    sizes = NULL;
H
hjxilinx 已提交
1962
  
S
slguan 已提交
1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978
  int32_t num = htons(*(int16_t *)rsp);
  rsp += sizeof(int16_t);

  metricMetaList = calloc(1, POINTER_BYTES * num);
  sizes = calloc(1, sizeof(int32_t) * num);

  // return with error code
  if (metricMetaList == NULL || sizes == NULL) {
    tfree(metricMetaList);
    tfree(sizes);
    pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;

    return pSql->res.code;
  }

  for (int32_t k = 0; k < num; ++k) {
S
slguan 已提交
1979
    pMeta = (SSuperTableMeta *)rsp;
S
slguan 已提交
1980 1981

    size_t size = (size_t)pSql->res.rspLen - 1;
S
slguan 已提交
1982
    rsp = rsp + sizeof(SSuperTableMeta);
S
slguan 已提交
1983

S
slguan 已提交
1984
    pMeta->numOfTables = htonl(pMeta->numOfTables);
S
slguan 已提交
1985 1986 1987
    pMeta->numOfVnodes = htonl(pMeta->numOfVnodes);
    pMeta->tagLen = htons(pMeta->tagLen);

1988
    size += pMeta->numOfVnodes * sizeof(SVnodeSidList *) + pMeta->numOfTables * sizeof(STableIdInfo *);
H
hzcheng 已提交
1989

1990 1991
    char *pBuf = calloc(1, size);
    if (pBuf == NULL) {
S
slguan 已提交
1992 1993 1994
      pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      goto _error_clean;
    }
H
hzcheng 已提交
1995

S
slguan 已提交
1996
    SSuperTableMeta *pNewMetricMeta = (SSuperTableMeta *)pBuf;
S
slguan 已提交
1997
    metricMetaList[k] = pNewMetricMeta;
H
hzcheng 已提交
1998

S
slguan 已提交
1999
    pNewMetricMeta->numOfTables = pMeta->numOfTables;
S
slguan 已提交
2000 2001
    pNewMetricMeta->numOfVnodes = pMeta->numOfVnodes;
    pNewMetricMeta->tagLen = pMeta->tagLen;
H
hzcheng 已提交
2002

S
slguan 已提交
2003
    pBuf = pBuf + sizeof(SSuperTableMeta) + pNewMetricMeta->numOfVnodes * sizeof(SVnodeSidList *);
H
hzcheng 已提交
2004

S
slguan 已提交
2005 2006
    for (int32_t i = 0; i < pMeta->numOfVnodes; ++i) {
      SVnodeSidList *pSidLists = (SVnodeSidList *)rsp;
2007
      memcpy(pBuf, pSidLists, sizeof(SVnodeSidList));
H
hzcheng 已提交
2008

2009 2010
      pNewMetricMeta->list[i] = pBuf - (char *)pNewMetricMeta;  // offset value
      SVnodeSidList *pLists = (SVnodeSidList *)pBuf;
H
hzcheng 已提交
2011

S
slguan 已提交
2012
      tscTrace("%p metricmeta:vid:%d,numOfTables:%d", pSql, i, pLists->numOfSids);
H
hzcheng 已提交
2013

2014
      pBuf += sizeof(SVnodeSidList) + sizeof(STableIdInfo *) * pSidLists->numOfSids;
S
slguan 已提交
2015
      rsp += sizeof(SVnodeSidList);
H
hzcheng 已提交
2016

2017
      size_t elemSize = sizeof(STableIdInfo) + pNewMetricMeta->tagLen;
S
slguan 已提交
2018
      for (int32_t j = 0; j < pSidLists->numOfSids; ++j) {
2019 2020
        pLists->pSidExtInfoList[j] = pBuf - (char *)pLists;
        memcpy(pBuf, rsp, elemSize);
2021

2022 2023
        ((STableIdInfo *)pBuf)->uid = htobe64(((STableIdInfo *)pBuf)->uid);
        ((STableIdInfo *)pBuf)->sid = htonl(((STableIdInfo *)pBuf)->sid);
2024

2025 2026
        rsp += elemSize;
        pBuf += elemSize;
S
slguan 已提交
2027
      }
H
hzcheng 已提交
2028
    }
S
slguan 已提交
2029

2030
    sizes[k] = pBuf - (char *)pNewMetricMeta;
H
hzcheng 已提交
2031 2032
  }

2033
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
S
slguan 已提交
2034 2035 2036
  for (int32_t i = 0; i < num; ++i) {
    char name[TSDB_MAX_TAGS_LEN + 1] = {0};

H
hjxilinx 已提交
2037
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2038
    tscGetMetricMetaCacheKey(pQueryInfo, name, pTableMetaInfo->pTableMeta->uid);
H
hzcheng 已提交
2039

S
slguan 已提交
2040 2041 2042
#ifdef _DEBUG_VIEW
    printf("generate the metric key:%s, index:%d\n", name, i);
#endif
H
hzcheng 已提交
2043

S
slguan 已提交
2044
    // release the used metricmeta
H
hjxilinx 已提交
2045 2046
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), false);
    pTableMetaInfo->pMetricMeta = (SSuperTableMeta *)taosCachePut(tscCacheHandle, name, (char *)metricMetaList[i],
S
slguan 已提交
2047 2048 2049 2050
                                                                      sizes[i], tsMetricMetaKeepTimer);
    tfree(metricMetaList[i]);

    // failed to put into cache
H
hjxilinx 已提交
2051
    if (pTableMetaInfo->pMetricMeta == NULL) {
S
slguan 已提交
2052 2053 2054
      pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      goto _error_clean;
    }
H
hzcheng 已提交
2055 2056
  }

S
slguan 已提交
2057 2058 2059 2060 2061 2062 2063 2064
_error_clean:
  // free allocated resource
  for (int32_t i = 0; i < num; ++i) {
    tfree(metricMetaList[i]);
  }

  free(sizes);
  free(metricMetaList);
H
hjxilinx 已提交
2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081
#endif
  
  SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pSql->res.pRsp;
  pStableVgroup->numOfDnodes = htonl(pStableVgroup->numOfDnodes);
  
  SSqlObj* pparent = pSql->param;
  assert(pparent != NULL);
  
  SSqlCmd* pCmd = &pparent->cmd;
  STableMetaInfo* pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
  pInfo->vgroupIdList = taosArrayInit(pStableVgroup->numOfDnodes, sizeof(int32_t));
  
  // todo opt performance
  for(int32_t i = 0; i < pStableVgroup->numOfDnodes; ++i) {
    taosArrayPush(pInfo->vgroupIdList, &pStableVgroup->dnodeIps[i]);
  }
  
S
slguan 已提交
2082
  return pSql->res.code;
H
hzcheng 已提交
2083 2084 2085 2086 2087 2088
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
H
hjxilinx 已提交
2089
  STableMetaMsg * pMetaMsg;
2090
  SCMShowRsp *pShow;
S
slguan 已提交
2091
  SSchema *    pSchema;
H
hzcheng 已提交
2092 2093
  char         key[20];

2094 2095 2096
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
2097
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2098

H
hjxilinx 已提交
2099
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
2100

2101
  pShow = (SCMShowRsp *)pRes->pRsp;
S
slguan 已提交
2102
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
2103 2104
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
2105
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
2106
  pMetaMsg = &(pShow->tableMeta);
H
hzcheng 已提交
2107

H
hjxilinx 已提交
2108
  pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
H
hzcheng 已提交
2109

H
hjxilinx 已提交
2110
  pSchema = pMetaMsg->schema;
H
hjxilinx 已提交
2111 2112
  pMetaMsg->sid = ntohs(pMetaMsg->sid);
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
hzcheng 已提交
2113 2114 2115 2116
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

2117
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
2118 2119
  strcpy(key + 1, "showlist");

H
hjxilinx 已提交
2120
  taosCacheRelease(tscCacheHandle, (void *)&(pTableMetaInfo->pTableMeta), false);
H
hjxilinx 已提交
2121 2122 2123
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
  
H
hjxilinx 已提交
2124
  pTableMetaInfo->pTableMeta =
H
hjxilinx 已提交
2125 2126
      (STableMeta *)taosCachePut(tscCacheHandle, key, (char *)pTableMeta, size, tsMeterMetaKeepTimer);
  
2127
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutputCols;
H
hjxilinx 已提交
2128
  SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2129

H
hjxilinx 已提交
2130
  tscColumnBaseInfoReserve(&pQueryInfo->colList, pMetaMsg->numOfColumns);
S
slguan 已提交
2131 2132
  SColumnIndex index = {0};

H
hjxilinx 已提交
2133
  for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i) {
S
slguan 已提交
2134
    index.columnIndex = i;
2135
    tscColumnBaseInfoInsert(pQueryInfo, &index);
H
hjxilinx 已提交
2136
    tscFieldInfoSetValFromSchema(&pQueryInfo->fieldsInfo, i, &pTableSchema[i]);
H
hjxilinx 已提交
2137 2138
    
    pQueryInfo->fieldsInfo.pSqlExpr[i] = tscSqlExprInsert(pQueryInfo, i, TSDB_FUNC_TS_DUMMY, &index,
H
hjxilinx 已提交
2139
                     pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes);
H
hzcheng 已提交
2140 2141
  }

2142
  tscFieldInfoCalOffset(pQueryInfo);
H
hjxilinx 已提交
2143 2144
  
  tfree(pTableMeta);
H
hzcheng 已提交
2145 2146 2147 2148
  return 0;
}

int tscProcessConnectRsp(SSqlObj *pSql) {
S
slguan 已提交
2149
  char temp[TSDB_TABLE_ID_LEN * 2];
H
hzcheng 已提交
2150 2151 2152
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

2153
  SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
H
hzcheng 已提交
2154
  strcpy(pObj->acctId, pConnect->acctId);  // copy acctId from response
2155 2156
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

H
hjxilinx 已提交
2157 2158 2159
  assert(len <= tListLen(pObj->db));
  strncpy(pObj->db, temp, tListLen(pObj->db));
  
S
slguan 已提交
2160
//  SIpList *    pIpList;
2161
//  char *rsp = pRes->pRsp + sizeof(SCMConnectRsp);
S
slguan 已提交
2162 2163
//  pIpList = (SIpList *)rsp;
//  tscSetMgmtIpList(pIpList);
H
hzcheng 已提交
2164

S
slguan 已提交
2165
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
2166 2167 2168 2169 2170 2171 2172 2173
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
2174
  STscObj *       pObj = pSql->pTscObj;
2175
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2176

H
hjxilinx 已提交
2177
  strcpy(pObj->db, pTableMetaInfo->name);
H
hzcheng 已提交
2178 2179 2180 2181
  return 0;
}

int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
2182
  taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2183 2184 2185 2186
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
2187
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2188

H
hjxilinx 已提交
2189 2190
  STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMeta == NULL) {
H
hzcheng 已提交
2191 2192 2193 2194 2195 2196 2197 2198
    /* not in cache, abort */
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
2199 2200
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
2201
   */
H
hjxilinx 已提交
2202 2203
  tscTrace("%p force release metermeta after drop table:%s", pSql, pTableMetaInfo->name);
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2204

H
hjxilinx 已提交
2205 2206
  if (pTableMetaInfo->pTableMeta) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hjxilinx 已提交
2207
//    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true);
H
hzcheng 已提交
2208 2209 2210 2211 2212 2213
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
2214
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2215

H
hjxilinx 已提交
2216 2217
  STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2218 2219 2220
    return 0;
  }

H
hjxilinx 已提交
2221 2222
  tscTrace("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2223

H
hjxilinx 已提交
2224
  if (pTableMetaInfo->pTableMeta) {
H
hjxilinx 已提交
2225
    bool isSuperTable = UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo);
H
hzcheng 已提交
2226

H
hjxilinx 已提交
2227
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hjxilinx 已提交
2228
//    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true);
H
hzcheng 已提交
2229

2230
    if (isSuperTable) {  // if it is a super table, reset whole query cache
H
hjxilinx 已提交
2231
      tscTrace("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
2232
      taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

2247
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
2248 2249 2250
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
2251
  pRes->data = NULL;
S
slguan 已提交
2252
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2253 2254 2255 2256
  return 0;
}

int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
S
slguan 已提交
2257 2258 2259
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
2260
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2261 2262 2263

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
2264 2265
  pRes->offset    = htobe64(pRetrieve->offset);
  pRes->useconds  = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
2266
  pRes->completed = (pRetrieve->completed == 1);
2267
  pRes->data      = pRetrieve->data;
H
hjxilinx 已提交
2268
  
2269 2270
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  tscSetResultPointer(pQueryInfo, pRes);
2271

weixin_48148422's avatar
weixin_48148422 已提交
2272
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
2273 2274 2275 2276 2277
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutputCols;
    
    TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, numOfCols - 1);
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
2278 2279
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
2280
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
2281
    p += sizeof(int32_t);
S
slguan 已提交
2282
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
2283 2284 2285 2286
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
2287
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
2288
    }
2289 2290
  }

H
hzcheng 已提交
2291
  pRes->row = 0;
S
slguan 已提交
2292
  tscTrace("%p numOfRows:%d, offset:%d", pSql, pRes->numOfRows, pRes->offset);
H
hzcheng 已提交
2293 2294 2295 2296 2297

  return 0;
}

int tscProcessRetrieveRspFromLocal(SSqlObj *pSql) {
2298 2299
  SSqlRes *   pRes = &pSql->res;
  SSqlCmd *   pCmd = &pSql->cmd;
2300
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2301

S
slguan 已提交
2302
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2303 2304 2305

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->data = pRetrieve->data;
2306

2307
  tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
2308 2309 2310 2311
  pRes->row = 0;
  return 0;
}

H
hjxilinx 已提交
2312
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
H
hzcheng 已提交
2313

2314
static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
S
slguan 已提交
2315 2316 2317 2318 2319
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
    tscError("%p malloc failed for new sqlobj to get meter meta", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
2320

H
hzcheng 已提交
2321 2322 2323
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
2324

2325
  tscAddSubqueryInfo(&pNew->cmd);
2326 2327 2328 2329

  SQueryInfo *pNewQueryInfo = NULL;
  tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo);

H
hjxilinx 已提交
2330
  pNew->cmd.autoCreated = pSql->cmd.autoCreated;  // create table if not exists
S
slguan 已提交
2331 2332 2333
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
    tscError("%p malloc failed for payload to get meter meta", pSql);
    free(pNew);
2334

S
slguan 已提交
2335 2336 2337
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

H
hjxilinx 已提交
2338
  STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
2339
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
2340

2341
  strncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, tListLen(pNewMeterMetaInfo->name));
2342
  memcpy(pNew->cmd.payload, pSql->cmd.payload, TSDB_DEFAULT_PAYLOAD_SIZE);  // tag information if table does not exists.
S
slguan 已提交
2343
  tscTrace("%p new pSqlObj:%p to get tableMeta", pSql, pNew);
H
hzcheng 已提交
2344

H
hjxilinx 已提交
2345 2346
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
H
hzcheng 已提交
2347

H
hjxilinx 已提交
2348 2349 2350
  int32_t code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
    code = TSDB_CODE_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2351 2352 2353 2354 2355
  }

  return code;
}

H
hjxilinx 已提交
2356
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
H
hjxilinx 已提交
2357
  assert(strlen(pTableMetaInfo->name) != 0);
S
slguan 已提交
2358

2359
  // If this STableMetaInfo owns a table meta, release it first
H
hjxilinx 已提交
2360 2361
  if (pTableMetaInfo->pTableMeta != NULL) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), false);
2362 2363
  }
  
H
hjxilinx 已提交
2364 2365
  pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMetaInfo->pTableMeta != NULL) {
H
hjxilinx 已提交
2366
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
2367 2368
    tscTrace("%p retrieve tableMeta from cache, the number of columns:%d, numOfTags:%d", pSql, tinfo.numOfColumns,
             tinfo.numOfTags);
H
hzcheng 已提交
2369 2370 2371

    return TSDB_CODE_SUCCESS;
  }
2372 2373
  
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2374 2375
}

H
hjxilinx 已提交
2376
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
H
hjxilinx 已提交
2377
  pSql->cmd.autoCreated = createIfNotExists;
H
hjxilinx 已提交
2378
  return tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
2379 2380 2381 2382
}

/*
 * in handling the renew metermeta problem during insertion,
S
slguan 已提交
2383
 *
H
hzcheng 已提交
2384 2385 2386 2387 2388
 * If the meter is created on demand during insertion, the routine usually waits for a short
 * period to re-issue the getMeterMeta msg, in which makes a greater change that vnode has
 * successfully created the corresponding table.
 */
static void tscWaitingForCreateTable(SSqlCmd *pCmd) {
S
slguan 已提交
2389
  if (pCmd->command == TSDB_SQL_INSERT) {
H
hzcheng 已提交
2390 2391 2392 2393 2394 2395 2396
    taosMsleep(50);  // todo: global config
  }
}

/**
 * in renew metermeta, do not retrieve metadata in cache.
 * @param pSql          sql object
S
slguan 已提交
2397
 * @param tableId       meter id
H
hzcheng 已提交
2398 2399
 * @return              status code
 */
S
slguan 已提交
2400
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) {
2401 2402
  int code = 0;

H
hzcheng 已提交
2403 2404
  // handle metric meta renew process
  SSqlCmd *pCmd = &pSql->cmd;
2405 2406

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
2407
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
2408 2409

  /*
S
slguan 已提交
2410
   * 1. only update the metermeta in force model metricmeta is not updated
H
hzcheng 已提交
2411 2412
   * 2. if get metermeta failed, still get the metermeta
   */
H
hjxilinx 已提交
2413 2414
  if (pTableMetaInfo->pTableMeta == NULL || !tscQueryOnMetric(pCmd)) {
    if (pTableMetaInfo->pTableMeta) {
H
hjxilinx 已提交
2415
      tscTrace("%p update meter meta, old: numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
H
hjxilinx 已提交
2416
               pTableMetaInfo->numOfTags, pCmd->numOfCols, pTableMetaInfo->pTableMeta->uid, pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2417
    }
2418

2419
    tscWaitingForCreateTable(pCmd);
H
hjxilinx 已提交
2420
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2421

2422
    code = getTableMetaFromMgmt(pSql, pTableMetaInfo);  // todo ??
H
hzcheng 已提交
2423
  } else {
H
hjxilinx 已提交
2424
    tscTrace("%p metric query not update metric meta, numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
H
hjxilinx 已提交
2425 2426
             tscGetNumOfTags(pTableMetaInfo->pTableMeta), pCmd->numOfCols, pTableMetaInfo->pTableMeta->uid,
             pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2427 2428 2429 2430 2431
  }

  return code;
}

H
hjxilinx 已提交
2432
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
S
slguan 已提交
2433 2434
  int      code = TSDB_CODE_NETWORK_UNAVAIL;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
2435

H
hjxilinx 已提交
2436 2437
  //the query condition is serialized into pCmd->payload, we need to rebuild key for stable meta info in cache.
  bool required = false;
2438

2439
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hjxilinx 已提交
2440 2441 2442 2443 2444 2445
  if (pQueryInfo->pTableMetaInfo[0]->vgroupIdList != NULL) {
    return TSDB_CODE_SUCCESS;
  }
  
#if 0
  
2446
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
S
slguan 已提交
2447 2448
    char tagstr[TSDB_MAX_TAGS_LEN + 1] = {0};

H
hjxilinx 已提交
2449
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2450
    tscGetMetricMetaCacheKey(pQueryInfo, tagstr, pTableMetaInfo->pTableMeta->uid);
S
slguan 已提交
2451

H
hjxilinx 已提交
2452
//    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), false);
S
slguan 已提交
2453

2454
    SSuperTableMeta *ppMeta = (SSuperTableMeta *)taosCacheAcquireByName(tscCacheHandle, tagstr);
S
slguan 已提交
2455
    if (ppMeta == NULL) {
2456
      required = true;
S
slguan 已提交
2457 2458
      break;
    } else {
H
hjxilinx 已提交
2459
//      pTableMetaInfo->pMetricMeta = ppMeta;
S
slguan 已提交
2460 2461
    }
  }
H
hzcheng 已提交
2462

2463 2464
  // all metricmeta for one clause are retrieved from cache, no need to retrieve metricmeta from management node
  if (!required) {
H
hzcheng 已提交
2465 2466
    return TSDB_CODE_SUCCESS;
  }
H
hjxilinx 已提交
2467 2468
#endif
  
S
slguan 已提交
2469
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
2470 2471 2472
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

H
hjxilinx 已提交
2473
  pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
2474 2475
  
  SQueryInfo *pNewQueryInfo = NULL;
2476 2477 2478
  if ((code = tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo)) != TSDB_CODE_SUCCESS) {
    return code;
  }
2479
  
2480
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2481
    STableMetaInfo *pMMInfo = tscGetMetaInfo(pQueryInfo, i);
2482

H
hjxilinx 已提交
2483 2484
    STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pMMInfo->name);
    tscAddMeterMetaInfo(pNewQueryInfo, pMMInfo->name, pTableMeta, NULL, pMMInfo->numOfTags, pMMInfo->tagColumnIndex);
S
slguan 已提交
2485 2486 2487 2488 2489 2490
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
2491

2492
  tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond);
H
hzcheng 已提交
2493

2494 2495
  pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr;
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
H
hzcheng 已提交
2496

2497 2498
  pNewQueryInfo->slimit = pQueryInfo->slimit;
  pNewQueryInfo->order = pQueryInfo->order;
H
hjxilinx 已提交
2499 2500 2501 2502 2503
  
  STagCond* pTagCond = &pNewQueryInfo->tagCond;
  tscTrace("%p new sqlobj:%p info, numOfTables:%d, slimit:%" PRId64 ", soffset:%" PRId64 ", order:%d, tbname cond:%s",
      pSql, pNew, pNewQueryInfo->numOfTables, pNewQueryInfo->slimit.limit, pNewQueryInfo->slimit.offset,
      pNewQueryInfo->order.order, pTagCond->tbnameCond.cond)
H
hzcheng 已提交
2504

2505 2506 2507 2508
//  if (pSql->fp != NULL && pSql->pStream == NULL) {
//    pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
//    tscFreeSubqueryInfo(pCmd);
//  }
H
hzcheng 已提交
2509

H
hjxilinx 已提交
2510
  tscTrace("%p allocate new pSqlObj:%p to get stable vgroupInfo", pSql, pNew);
2511 2512 2513 2514 2515
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
  code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
    code = TSDB_CODE_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2516 2517 2518 2519 2520
  }

  return code;
}

2521
void tscInitMsgsFp() {
S
slguan 已提交
2522 2523 2524
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildRetrieveMsg;
H
hzcheng 已提交
2525 2526

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
2527
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
2528

2529 2530
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
2531 2532

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
2533
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropAcctMsg;
H
hzcheng 已提交
2534 2535 2536
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
2537
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
2538 2539 2540
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
2541 2542 2543 2544 2545
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
H
hjxilinx 已提交
2546
  tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
H
hjxilinx 已提交
2547
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
S
slguan 已提交
2548
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
2549 2550 2551 2552

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
2553 2554 2555
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
2556 2557 2558 2559 2560 2561 2562 2563

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromVnode;

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
2564
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
H
hjxilinx 已提交
2565
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
S
slguan 已提交
2566
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
2567 2568

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
2569
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromVnode;  // rsp handled by same function.
H
hzcheng 已提交
2570
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
2571

H
hzcheng 已提交
2572
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_TAGS] = tscProcessTagRetrieveRsp;
H
hjxilinx 已提交
2573 2574 2575 2576 2577
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS] = tscProcessTagRetrieveRsp;
2578

H
hzcheng 已提交
2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_METRIC] = tscProcessRetrieveMetricRsp;

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;
}