tscServer.c 123.6 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17 18
#include "tcache.h"
#include "trpc.h"
S
slguan 已提交
19
#include "tscJoinProcess.h"
H
hzcheng 已提交
20
#include "tscProfile.h"
21
#include "tscSQLParser.h"
H
hzcheng 已提交
22 23 24 25
#include "tscSecondaryMerge.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
S
slguan 已提交
26
#include "tscompression.h"
H
hzcheng 已提交
27 28 29 30 31 32 33
#include "tsocket.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"

#define TSC_MGMT_VNODE 999

S
slguan 已提交
34
#ifdef CLUSTER
35 36 37
SIpStrList tscMgmtIpList;
int        tsMasterIndex = 0;
int        tsSlaveIndex = 1;
S
slguan 已提交
38
#else
39 40 41
int      tsMasterIndex = 0;
int      tsSlaveIndex = 0;  // slave == master for single node edition
uint32_t tsServerIp;
S
slguan 已提交
42
#endif
H
hzcheng 已提交
43

44 45
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
46
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
S
slguan 已提交
47
void (*tscUpdateVnodeMsg[TSDB_SQL_MAX])(SSqlObj *pSql, char *buf);
H
hzcheng 已提交
48
void tscProcessActivityTimer(void *handle, void *tmrId);
49
int  tscKeepConn[TSDB_SQL_MAX] = {0};
H
hzcheng 已提交
50 51 52

static int32_t minMsgSize() { return tsRpcHeadSize + sizeof(STaosDigest); }

53 54
static char *doBuildMsgHeader(SSqlObj *pSql, char **pStart);

S
slguan 已提交
55 56 57 58 59 60 61 62 63 64
#ifdef CLUSTER
void tscPrintMgmtIp() {
  if (tscMgmtIpList.numOfIps <= 0) {
    tscError("invalid IP list:%d", tscMgmtIpList.numOfIps);
  } else {
    for (int i = 0; i < tscMgmtIpList.numOfIps; ++i) tscTrace("mgmt index:%d ip:%s", i, tscMgmtIpList.ipstr[i]);
  }
}
#endif

H
hjxilinx 已提交
65 66 67 68 69 70 71 72 73 74 75 76
/*
 * For each management node, try twice at least in case of poor network situation.
 * If the client start to connect to a non-management node from the client, and the first retry may fail due to
 * the poor network quality. And then, the second retry get the response with redirection command.
 * The retry will not be executed since only *two* retry is allowed in case of single management node in the cluster.
 * Therefore, we need to multiply the retry times by factor of 2 to fix this problem.
 */
static int32_t tscGetMgmtConnMaxRetryTimes() {
  int32_t factor = 2;
#ifdef CLUSTER
  return tscMgmtIpList.numOfIps * factor;
#else
77
  return 1 * factor;
H
hjxilinx 已提交
78 79 80
#endif
}

H
hzcheng 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
  if (pObj != pObj->signature) {
    tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
    return;
  }

  SSqlObj *pSql = pObj->pHb;
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
    SHeartBeatRsp *pRsp = (SHeartBeatRsp *)pRes->pRsp;
S
slguan 已提交
94
#ifdef CLUSTER
95
    SIpList *pIpList = &pRsp->ipList;
S
slguan 已提交
96 97 98 99 100 101 102 103 104 105
    tscMgmtIpList.numOfIps = pIpList->numOfIps;
    if (memcmp(tscMgmtIpList.ip, pIpList->ip, pIpList->numOfIps * 4) != 0) {
      for (int i = 0; i < pIpList->numOfIps; ++i) {
        tinet_ntoa(tscMgmtIpList.ipstr[i], pIpList->ip[i]);
        tscMgmtIpList.ip[i] = pIpList->ip[i];
      }
      tscTrace("new mgmt IP list:");
      tscPrintMgmtIp();
    }
#endif
H
hzcheng 已提交
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
    } else {
      if (pRsp->queryId) tscKillQuery(pObj, pRsp->queryId);
      if (pRsp->streamId) tscKillStream(pObj, pRsp->streamId);
    }
  } else {
    tscTrace("heart beat failed, code:%d", code);
  }

  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;

  if (pObj == NULL) return;
  if (pObj->signature != pObj) return;
  if (pObj->pTimer != tmrId) return;

  if (pObj->pHb == NULL) {
S
slguan 已提交
127 128 129
    SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
    if (NULL == pSql) return;

H
hzcheng 已提交
130 131
    pSql->fp = tscProcessHeartBeatRsp;
    pSql->cmd.command = TSDB_SQL_HB;
S
slguan 已提交
132 133 134 135 136
    if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
      tfree(pSql);
      return;
    }

H
hzcheng 已提交
137 138 139 140
    pSql->param = pObj;
    pSql->pTscObj = pObj;
    pSql->signature = pSql;
    pObj->pHb = pSql;
141
    tscAddSubqueryInfo(&pObj->pHb->cmd);
142

S
slguan 已提交
143
    tscTrace("%p pHb is allocated, pObj:%p", pObj->pHb, pObj);
H
hzcheng 已提交
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
  }

  if (tscShouldFreeHeatBeat(pObj->pHb)) {
    tscTrace("%p free HB object and release connection, pConn:%p", pObj, pObj->pHb->thandle);
    taosCloseRpcConn(pObj->pHb->thandle);

    tscFreeSqlObj(pObj->pHb);
    tscCloseTscObj(pObj);
    return;
  }

  tscProcessSql(pObj->pHb);
}

void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) {
  STscObj *pTscObj = pSql->pTscObj;
S
slguan 已提交
160
#ifdef CLUSTER
H
hjxilinx 已提交
161
  if (pSql->retry < tscGetMgmtConnMaxRetryTimes()) {
S
slguan 已提交
162 163 164 165 166 167
    *pCode = 0;
    pSql->retry++;
    pSql->index = pSql->index % tscMgmtIpList.numOfIps;
    if (pSql->cmd.command > TSDB_SQL_READ && pSql->index == 0) pSql->index = 1;
    void *thandle = taosGetConnFromCache(tscConnCache, tscMgmtIpList.ip[pSql->index], TSC_MGMT_VNODE, pTscObj->user);
#else
H
hjxilinx 已提交
168
  if (pSql->retry < tscGetMgmtConnMaxRetryTimes()) {
H
hzcheng 已提交
169 170 171
    *pCode = 0;
    pSql->retry++;
    void *thandle = taosGetConnFromCache(tscConnCache, tsServerIp, TSC_MGMT_VNODE, pTscObj->user);
S
slguan 已提交
172
#endif
H
hzcheng 已提交
173 174 175 176 177 178 179 180 181 182 183 184 185 186

    if (thandle == NULL) {
      SRpcConnInit connInit;
      memset(&connInit, 0, sizeof(connInit));
      connInit.cid = 0;
      connInit.sid = 0;
      connInit.meterId = pSql->pTscObj->user;
      connInit.peerId = 0;
      connInit.shandle = pTscMgmtConn;
      connInit.ahandle = pSql;
      connInit.peerPort = tsMgmtShellPort;
      connInit.spi = 1;
      connInit.encrypt = 0;
      connInit.secret = pSql->pTscObj->pass;
187

S
slguan 已提交
188 189 190
#ifdef CLUSTER
      connInit.peerIp = tscMgmtIpList.ipstr[pSql->index];
#else
S
slguan 已提交
191
	    connInit.peerIp = tsMasterIp;
S
slguan 已提交
192
#endif
H
hzcheng 已提交
193 194 195 196
      thandle = taosOpenRpcConn(&connInit, pCode);
    }

    pSql->thandle = thandle;
S
slguan 已提交
197 198 199 200 201 202
#ifdef CLUSTER
    pSql->ip = tscMgmtIpList.ip[pSql->index];
    pSql->vnode = TSC_MGMT_VNODE;
    tscTrace("%p mgmt index:%d ip:0x%x is picked up, pConn:%p", pSql, pSql->index, tscMgmtIpList.ip[pSql->index],
             pSql->thandle);
#else
H
hzcheng 已提交
203 204
    pSql->ip = tsServerIp;
    pSql->vnode = TSC_MGMT_VNODE;
S
slguan 已提交
205
#endif
H
hzcheng 已提交
206
  }
207

H
hjxilinx 已提交
208 209 210 211 212
  // the pSql->res.code is the previous error(status) code.
  if (pSql->thandle == NULL && pSql->retry >= pSql->maxRetry) {
    if (pSql->res.code != TSDB_CODE_SUCCESS && pSql->res.code != TSDB_CODE_ACTION_IN_PROGRESS) {
      *pCode = pSql->res.code;
    }
213

H
hjxilinx 已提交
214 215
    tscError("%p reach the max retry:%d, code:%d", pSql, pSql->retry, *pCode);
  }
H
hzcheng 已提交
216 217 218 219 220 221 222 223 224
}

void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) {
  SVPeerDesc *pVPeersDesc = NULL;
  static int  vidIndex = 0;
  STscObj *   pTscObj = pSql->pTscObj;

  pSql->thandle = NULL;

S
slguan 已提交
225
  SSqlCmd *       pCmd = &pSql->cmd;
226
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
S
slguan 已提交
227

228
  if (UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) {  // multiple vnode query
H
hjxilinx 已提交
229
    SVnodeSidList *vnodeList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, pMeterMetaInfo->vnodeIndex);
H
hzcheng 已提交
230 231 232 233
    if (vnodeList != NULL) {
      pVPeersDesc = vnodeList->vpeerDesc;
    }
  } else {
S
slguan 已提交
234
    SMeterMeta *pMeta = pMeterMetaInfo->pMeterMeta;
H
hzcheng 已提交
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
    if (pMeta == NULL) {
      tscError("%p pMeterMeta is NULL", pSql);
      pSql->retry = pSql->maxRetry;
      return;
    }
    pVPeersDesc = pMeta->vpeerDesc;
  }

  if (pVPeersDesc == NULL) {
    pSql->retry = pSql->maxRetry;
    tscError("%p pVPeerDesc is NULL", pSql);
  }

  while (pSql->retry < pSql->maxRetry) {
    (pSql->retry)++;
S
slguan 已提交
250
#ifdef CLUSTER
S
slguan 已提交
251
    char ipstr[40] = {0};
S
slguan 已提交
252 253 254 255
    if (pVPeersDesc[pSql->index].ip == 0) {
      (pSql->index) = (pSql->index + 1) % TSDB_VNODES_SUPPORT;
      continue;
    }
H
hjxilinx 已提交
256
    *pCode = TSDB_CODE_SUCCESS;
S
slguan 已提交
257 258 259

    void *thandle =
        taosGetConnFromCache(tscConnCache, pVPeersDesc[pSql->index].ip, pVPeersDesc[pSql->index].vnode, pTscObj->user);
H
hzcheng 已提交
260

S
slguan 已提交
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
    if (thandle == NULL) {
      SRpcConnInit connInit;
      tinet_ntoa(ipstr, pVPeersDesc[pSql->index].ip);
      memset(&connInit, 0, sizeof(connInit));
      connInit.cid = vidIndex;
      connInit.sid = 0;
      connInit.spi = 0;
      connInit.encrypt = 0;
      connInit.meterId = pSql->pTscObj->user;
      connInit.peerId = htonl((pVPeersDesc[pSql->index].vnode << TSDB_SHELL_VNODE_BITS));
      connInit.shandle = pVnodeConn;
      connInit.ahandle = pSql;
      connInit.peerIp = ipstr;
      connInit.peerPort = tsVnodeShellPort;
      thandle = taosOpenRpcConn(&connInit, pCode);
      vidIndex = (vidIndex + 1) % tscNumOfThreads;
    }

    pSql->thandle = thandle;
    pSql->ip = pVPeersDesc[pSql->index].ip;
    pSql->vnode = pVPeersDesc[pSql->index].vnode;
H
hjxilinx 已提交
282
    tscTrace("%p vnode:%d ip:%p index:%d is picked up, pConn:%p", pSql, pVPeersDesc[pSql->index].vnode,
S
slguan 已提交
283 284
             pVPeersDesc[pSql->index].ip, pSql->index, pSql->thandle);
#else
H
hzcheng 已提交
285 286 287 288 289 290 291 292 293 294 295 296 297 298
    *pCode = 0;
    void *thandle = taosGetConnFromCache(tscConnCache, tsServerIp, pVPeersDesc[0].vnode, pTscObj->user);

    if (thandle == NULL) {
      SRpcConnInit connInit;
      memset(&connInit, 0, sizeof(connInit));
      connInit.cid = vidIndex;
      connInit.sid = 0;
      connInit.spi = 0;
      connInit.encrypt = 0;
      connInit.meterId = pSql->pTscObj->user;
      connInit.peerId = htonl((pVPeersDesc[0].vnode << TSDB_SHELL_VNODE_BITS));
      connInit.shandle = pVnodeConn;
      connInit.ahandle = pSql;
S
slguan 已提交
299
      connInit.peerIp = tsMasterIp;
H
hzcheng 已提交
300 301 302 303 304 305 306 307
      connInit.peerPort = tsVnodeShellPort;
      thandle = taosOpenRpcConn(&connInit, pCode);
      vidIndex = (vidIndex + 1) % tscNumOfThreads;
    }

    pSql->thandle = thandle;
    pSql->ip = tsServerIp;
    pSql->vnode = pVPeersDesc[0].vnode;
S
slguan 已提交
308 309
#endif

H
hzcheng 已提交
310 311
    break;
  }
312

H
4]  
hjxilinx 已提交
313
  // the pSql->res.code is the previous error(status) code.
H
hjxilinx 已提交
314
  if (pSql->thandle == NULL && pSql->retry >= pSql->maxRetry) {
H
4]  
hjxilinx 已提交
315
    if (pSql->res.code != TSDB_CODE_SUCCESS && pSql->res.code != TSDB_CODE_ACTION_IN_PROGRESS) {
H
hjxilinx 已提交
316 317
      *pCode = pSql->res.code;
    }
318

H
hjxilinx 已提交
319
    tscError("%p reach the max retry:%d, code:%d", pSql, pSql->retry, *pCode);
H
hjxilinx 已提交
320
  }
H
hzcheng 已提交
321 322 323 324 325 326 327 328 329 330 331 332 333
}

int tscSendMsgToServer(SSqlObj *pSql) {
  uint8_t code = TSDB_CODE_NETWORK_UNAVAIL;

  if (pSql->thandle == NULL) {
    if (pSql->cmd.command < TSDB_SQL_MGMT)
      tscGetConnToVnode(pSql, &code);
    else
      tscGetConnToMgmt(pSql, &code);
  }

  if (pSql->thandle) {
S
slguan 已提交
334 335 336 337 338 339 340 341
    /*
     * the total length of message
     * rpc header + actual message body + digest
     *
     * the pSql object may be released automatically during insert procedure, in which the access of
     * message body by using "if (pHeader->msgType & 1)" may cause the segment fault.
     *
     */
S
slguan 已提交
342
    size_t totalLen = pSql->cmd.payloadLen + tsRpcHeadSize + sizeof(STaosDigest);
S
slguan 已提交
343 344

    // the memory will be released by taosProcessResponse, so no memory leak here
S
slguan 已提交
345
    char *buf = malloc(totalLen);
346 347 348 349
    if (NULL == buf) {
      tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]);
      return TSDB_CODE_CLI_OUT_OF_MEMORY;
    }
S
slguan 已提交
350
    memcpy(buf, pSql->cmd.payload, totalLen);
S
slguan 已提交
351

H
hzcheng 已提交
352
    tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]);
S
slguan 已提交
353

S
slguan 已提交
354
    char *pStart = taosBuildReqHeader(pSql->thandle, pSql->cmd.msgType, buf);
H
hzcheng 已提交
355
    if (pStart) {
H
hjxilinx 已提交
356 357 358 359
      /*
       * this SQL object may be released by other thread due to the completion of this query even before the log
       * is dumped to log file. So the signature needs to be kept in a local variable.
       */
360
      uint64_t signature = (uint64_t)pSql->signature;
S
slguan 已提交
361
      if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql, buf);
362

H
hzcheng 已提交
363
      int ret = taosSendMsgToPeerH(pSql->thandle, pStart, pSql->cmd.payloadLen, pSql);
H
hjxilinx 已提交
364 365 366
      if (ret >= 0) {
        code = 0;
      }
367

H
hjxilinx 已提交
368
      tscTrace("%p send msg ret:%d code:%d sig:%p", pSql, ret, code, signature);
H
hzcheng 已提交
369 370 371 372 373 374
    }
  }

  return code;
}

S
slguan 已提交
375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395
#ifdef CLUSTER
void tscProcessMgmtRedirect(SSqlObj *pSql, uint8_t *cont) {
  SIpList *pIpList = (SIpList *)(cont);
  tscMgmtIpList.numOfIps = pIpList->numOfIps;
  for (int i = 0; i < pIpList->numOfIps; ++i) {
    tinet_ntoa(tscMgmtIpList.ipstr[i], pIpList->ip[i]);
    tscMgmtIpList.ip[i] = pIpList->ip[i];
    tscTrace("Update mgmt Ip, index:%d ip:%s", i, tscMgmtIpList.ipstr[i]);
  }

  if (pSql->cmd.command < TSDB_SQL_READ) {
    tsMasterIndex = 0;
    pSql->index = 0;
  } else {
    pSql->index++;
  }

  tscPrintMgmtIp();
}
#endif

H
hzcheng 已提交
396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418
void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
  if (ahandle == NULL) return NULL;

  SIntMsg *pMsg = (SIntMsg *)msg;
  SSqlObj *pSql = (SSqlObj *)ahandle;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;
  int      code = TSDB_CODE_NETWORK_UNAVAIL;

  if (pSql->signature != pSql) {
    tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
    return NULL;
  }

  if (pSql->thandle != thandle) {
    tscError("%p thandle:%p is different from received:%p", pSql, pSql->thandle, thandle);
    return NULL;
  }

  tscTrace("%p msg:%p is received from server, pConn:%p", pSql, msg, thandle);

  if (pSql->freed || pObj->signature != pObj) {
S
slguan 已提交
419 420
    tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed,
             pObj, pObj->signature);
H
hzcheng 已提交
421 422 423 424 425
    taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pObj->user);
    tscFreeSqlObj(pSql);
    return ahandle;
  }

426
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
H
hzcheng 已提交
427 428
  if (msg == NULL) {
    tscTrace("%p no response from ip:0x%x", pSql, pSql->ip);
429

S
slguan 已提交
430 431 432
#ifdef CLUSTER
    pSql->index++;
#else
S
slguan 已提交
433
    // for single node situation, do NOT try next index
S
slguan 已提交
434
#endif
H
hzcheng 已提交
435 436 437 438
    pSql->thandle = NULL;
    // todo taos_stop_query() in async model
    /*
     * in case of
H
hjxilinx 已提交
439 440
     * 1. query cancelled(pRes->code != TSDB_CODE_QUERY_CANCELLED), do NOT re-issue the request to server.
     * 2. retrieve, do NOT re-issue the retrieve request since the qhandle may have been released by server
H
hzcheng 已提交
441 442 443 444 445 446 447 448 449
     */
    if (pCmd->command != TSDB_SQL_FETCH && pCmd->command != TSDB_SQL_RETRIEVE && pCmd->command != TSDB_SQL_KILL_QUERY &&
        pRes->code != TSDB_CODE_QUERY_CANCELLED) {
      code = tscSendMsgToServer(pSql);
      if (code == 0) return NULL;
    }

    // renew meter meta in case it is changed
    if (pCmd->command < TSDB_SQL_FETCH && pRes->code != TSDB_CODE_QUERY_CANCELLED) {
S
slguan 已提交
450 451 452
#ifdef CLUSTER
      pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
#else
H
hzcheng 已提交
453 454
      // for fetch, it shall not renew meter meta
      pSql->maxRetry = 2;
S
slguan 已提交
455 456
#endif
      code = tscRenewMeterMeta(pSql, pMeterMetaInfo->name);
H
hzcheng 已提交
457 458 459
      pRes->code = code;
      if (code == TSDB_CODE_ACTION_IN_PROGRESS) return pSql;

S
slguan 已提交
460
      if (pMeterMetaInfo->pMeterMeta) {
H
hzcheng 已提交
461 462 463 464 465
        code = tscSendMsgToServer(pSql);
        if (code == 0) return pSql;
      }
    }
  } else {
H
hjxilinx 已提交
466
    uint16_t rspCode = pMsg->content[0];
467

H
hjxilinx 已提交
468
#ifdef CLUSTER
469

H
hjxilinx 已提交
470
    if (rspCode == TSDB_CODE_REDIRECT) {
S
slguan 已提交
471 472 473 474 475 476
      tscTrace("%p it shall be redirected!", pSql);
      taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user);
      pSql->thandle = NULL;

      if (pCmd->command > TSDB_SQL_MGMT) {
        tscProcessMgmtRedirect(pSql, pMsg->content + 1);
477
      } else if (pCmd->command == TSDB_SQL_INSERT) {
S
slguan 已提交
478 479
        pSql->index++;
        pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
S
slguan 已提交
480 481 482 483 484 485 486
      } else {
        pSql->index++;
      }

      code = tscSendMsgToServer(pSql);
      if (code == 0) return pSql;
      msg = NULL;
H
hjxilinx 已提交
487 488
    } else if (rspCode == TSDB_CODE_NOT_ACTIVE_TABLE || rspCode == TSDB_CODE_INVALID_TABLE_ID ||
        rspCode == TSDB_CODE_INVALID_VNODE_ID || rspCode == TSDB_CODE_NOT_ACTIVE_VNODE ||
S
slguan 已提交
489 490
        rspCode == TSDB_CODE_NETWORK_UNAVAIL || rspCode == TSDB_CODE_NOT_ACTIVE_SESSION ||
        rspCode == TSDB_CODE_TABLE_ID_MISMATCH) {
S
slguan 已提交
491 492 493 494 495 496 497 498 499 500
      /*
       * not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized,
       *                   the virtual node may have not create table till now, so try again by using the new metermeta.
       *                   2. this requested table may have been removed by other client, so we need to renew the
       *                   metermeta here.
       *
       * not_active_vnode: current vnode is move to other node due to node balance procedure or virtual node have been
       *                   removed. So, renew metermeta and try again.
       * not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore.
       */
S
slguan 已提交
501
#else
502
    if (rspCode == TSDB_CODE_NOT_ACTIVE_TABLE || rspCode == TSDB_CODE_INVALID_TABLE_ID ||
503 504
        rspCode == TSDB_CODE_NOT_ACTIVE_VNODE || rspCode == TSDB_CODE_INVALID_VNODE_ID ||
        rspCode == TSDB_CODE_TABLE_ID_MISMATCH || rspCode == TSDB_CODE_NETWORK_UNAVAIL) {
S
slguan 已提交
505
#endif
H
hzcheng 已提交
506 507
      pSql->thandle = NULL;
      taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user);
508

509
      if (pCmd->command == TSDB_SQL_CONNECT) {
H
hzcheng 已提交
510 511 512 513
        code = TSDB_CODE_NETWORK_UNAVAIL;
      } else if (pCmd->command == TSDB_SQL_HB) {
        code = TSDB_CODE_NOT_READY;
      } else {
H
hjxilinx 已提交
514
        tscTrace("%p it shall renew meter meta, code:%d", pSql, rspCode);
515

H
hzcheng 已提交
516
        pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
517 518
        pSql->res.code = (uint8_t)rspCode;  // keep the previous error code

S
slguan 已提交
519
        code = tscRenewMeterMeta(pSql, pMeterMetaInfo->name);
H
hzcheng 已提交
520 521
        if (code == TSDB_CODE_ACTION_IN_PROGRESS) return pSql;

S
slguan 已提交
522
        if (pMeterMetaInfo->pMeterMeta) {
H
hzcheng 已提交
523 524 525 526 527 528
          code = tscSendMsgToServer(pSql);
          if (code == 0) return pSql;
        }
      }

      msg = NULL;
S
slguan 已提交
529
    } else {  // for other error set and return to invoker
H
hjxilinx 已提交
530
      code = rspCode;
H
hzcheng 已提交
531 532 533 534 535 536 537
    }
  }

  pSql->retry = 0;

  if (msg) {
    if (pCmd->command < TSDB_SQL_MGMT) {
S
slguan 已提交
538 539 540
      if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
        if (pMeterMetaInfo->pMeterMeta)  // it may be deleted
          pMeterMetaInfo->pMeterMeta->index = pSql->index;
H
hzcheng 已提交
541
      } else {
H
hjxilinx 已提交
542
        SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, pMeterMetaInfo->vnodeIndex);
H
hzcheng 已提交
543 544 545 546 547 548 549 550 551 552
        pVnodeSidList->index = pSql->index;
      }
    } else {
      if (pCmd->command > TSDB_SQL_READ)
        tsSlaveIndex = pSql->index;
      else
        tsMasterIndex = pSql->index;
    }
  }

S
slguan 已提交
553
  if (pSql->fp == NULL) tsem_wait(&pSql->emptyRspSem);
H
hzcheng 已提交
554 555 556 557 558 559 560 561 562 563 564 565 566 567

  pRes->rspLen = 0;
  if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
    pRes->code = (code != TSDB_CODE_SUCCESS) ? code : TSDB_CODE_NETWORK_UNAVAIL;
  } else {
    tscTrace("%p query is cancelled, code:%d", pSql, pRes->code);
  }

  if (msg && pRes->code != TSDB_CODE_QUERY_CANCELLED) {
    assert(pMsg->msgType == pCmd->msgType + 1);
    pRes->code = pMsg->content[0];
    pRes->rspType = pMsg->msgType;
    pRes->rspLen = pMsg->msgLen - sizeof(SIntMsg);

S
slguan 已提交
568 569 570 571 572 573 574 575 576 577 578
    char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
    if (tmp == NULL) {
      pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
    } else {
      pRes->pRsp = tmp;
      if (pRes->rspLen) {
        memcpy(pRes->pRsp, pMsg->content + 1, pRes->rspLen - 1);
      }
    }

    // ignore the error information returned from mnode when set ignore flag in sql
H
hzcheng 已提交
579 580 581 582 583 584 585 586 587 588
    if (pRes->code == TSDB_CODE_DB_ALREADY_EXIST && pCmd->existsCheck && pRes->rspType == TSDB_MSG_TYPE_CREATE_DB_RSP) {
      pRes->code = TSDB_CODE_SUCCESS;
    }

    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
    if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP) {
      pRes->numOfRows += *(int32_t *)pRes->pRsp;
S
slguan 已提交
589 590 591 592 593

      tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code,
               *(int32_t *)pRes->pRsp, pRes->rspLen);
    } else {
      tscTrace("%p cmd:%d code:%d rsp len:%d", pSql, pCmd->command, pRes->code, pRes->rspLen);
H
hzcheng 已提交
594 595 596 597 598 599 600 601 602 603 604 605
    }
  }

  if (tscKeepConn[pCmd->command] == 0 ||
      (pRes->code != TSDB_CODE_SUCCESS && pRes->code != TSDB_CODE_ACTION_IN_PROGRESS)) {
    if (pSql->thandle != NULL) {
      taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pObj->user);
      pSql->thandle = NULL;
    }
  }

  if (pSql->fp == NULL) {
S
slguan 已提交
606
    tsem_post(&pSql->rspSem);
H
hzcheng 已提交
607 608 609 610 611 612 613 614 615
  } else {
    if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command])
      code = (*tscProcessMsgRsp[pCmd->command])(pSql);

    if (code != TSDB_CODE_ACTION_IN_PROGRESS) {
      int   command = pCmd->command;
      void *taosres = tscKeepConn[command] ? pSql : NULL;
      code = pRes->code ? -pRes->code : pRes->numOfRows;

S
slguan 已提交
616
      tscTrace("%p Async SQL result:%d res:%p", pSql, code, taosres);
H
hzcheng 已提交
617 618

      /*
S
slguan 已提交
619 620 621
       * Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj
       * may be freed in UDF, and reused by other threads before tscShouldFreeAsyncSqlObj called, in which case
       * tscShouldFreeAsyncSqlObj checks an object which is actually allocated by other threads.
H
hzcheng 已提交
622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648
       *
       * If this block of memory is re-allocated for an insert thread, in which tscKeepConn[command] equals to 0,
       * the tscShouldFreeAsyncSqlObj will success and tscFreeSqlObj free it immediately.
       */
      bool shouldFree = tscShouldFreeAsyncSqlObj(pSql);
      if (command == TSDB_SQL_INSERT) {  // handle multi-vnode insertion situation
        (*pSql->fp)(pSql, taosres, code);
      } else {
        (*pSql->fp)(pSql->param, taosres, code);
      }

      if (shouldFree) {
        // If it is failed, all objects allocated during execution taos_connect_a should be released
        if (command == TSDB_SQL_CONNECT) {
          taos_close(pObj);
          tscTrace("%p Async sql close failed connection", pSql);
        } else {
          tscFreeSqlObj(pSql);
          tscTrace("%p Async sql is automatically freed", pSql);
        }
      }
    }
  }

  return ahandle;
}

S
slguan 已提交
649
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
650
static int      tscLaunchMetricSubQueries(SSqlObj *pSql);
H
hzcheng 已提交
651

S
slguan 已提交
652
// todo merge with callback
H
hjxilinx 已提交
653
int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySupporter *pSupporter) {
654 655 656
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

S
slguan 已提交
657 658 659 660 661 662 663 664 665 666
  pSql->res.qhandle = 0x1;
  pSql->res.numOfRows = 0;

  if (pSql->pSubs == NULL) {
    pSql->pSubs = malloc(POINTER_BYTES * pSupporter->pState->numOfTotal);
    if (pSql->pSubs == NULL) {
      return TSDB_CODE_CLI_OUT_OF_MEMORY;
    }
  }

H
hjxilinx 已提交
667
  SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, NULL);
S
slguan 已提交
668 669 670
  if (pNew == NULL) {
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
671

S
slguan 已提交
672
  pSql->pSubs[pSql->numOfSubs++] = pNew;
H
hjxilinx 已提交
673
  assert(pSql->numOfSubs <= pSupporter->pState->numOfTotal);
H
hzcheng 已提交
674

675 676
  if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
    addGroupInfoForSubquery(pSql, pNew, 0, tableIndex);
S
slguan 已提交
677 678

    // refactor as one method
679
    SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
680
    assert(pNewQueryInfo != NULL);
681

682 683
    tscColumnBaseInfoUpdateTableIndex(&pNewQueryInfo->colList, 0);
    tscColumnBaseInfoCopy(&pSupporter->colList, &pNewQueryInfo->colList, 0);
S
slguan 已提交
684

685
    tscSqlExprCopy(&pSupporter->exprsInfo, &pNewQueryInfo->exprsInfo, pSupporter->uid);
S
slguan 已提交
686

687
    tscFieldInfoCopyAll(&pSupporter->fieldsInfo, &pNewQueryInfo->fieldsInfo);
688
    tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond);
S
slguan 已提交
689 690

    pNew->cmd.numOfCols = 0;
691 692
    pNewQueryInfo->nAggTimeInterval = 0;
    memset(&pNewQueryInfo->limit, 0, sizeof(SLimitVal));
693

694 695
    // backup the data and clear it in the sqlcmd object
    pSupporter->groupbyExpr = pNewQueryInfo->groupbyExpr;
696
    memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));
S
slguan 已提交
697 698

    // set the ts,tags that involved in join, as the output column of intermediate result
699
    tscClearSubqueryInfo(&pNew->cmd);
700

S
slguan 已提交
701 702 703
    SSchema      colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
    SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};

704
    tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL);
S
slguan 已提交
705 706

    // set the tags value for ts_comp function
707
    SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0);
S
slguan 已提交
708

709
    SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pNewQueryInfo, 0);
H
hjxilinx 已提交
710
    int16_t         tagColIndex = tscGetJoinTagColIndexByUid(&pSupporter->tagCond, pMeterMetaInfo->pMeterMeta->uid);
S
slguan 已提交
711 712 713 714 715 716 717 718

    pExpr->param->i64Key = tagColIndex;
    pExpr->numOfParams = 1;

    // add the filter tag column
    for (int32_t i = 0; i < pSupporter->colList.numOfCols; ++i) {
      SColumnBase *pColBase = &pSupporter->colList.pColList[i];
      if (pColBase->numOfFilters > 0) {  // copy to the pNew->cmd.colList if it is filtered.
719 720
        tscColumnBaseCopy(&pNewQueryInfo->colList.pColList[pNewQueryInfo->colList.numOfCols], pColBase);
        pNewQueryInfo->colList.numOfCols++;
S
slguan 已提交
721 722 723
      }
    }
  } else {
724
    SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
725
    pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
S
slguan 已提交
726
  }
727

H
hjxilinx 已提交
728
#ifdef _DEBUG_VIEW
729
  tscPrintSelectClause(&pNew->cmd, 0);
H
hjxilinx 已提交
730
#endif
731

S
slguan 已提交
732 733 734 735 736 737 738 739
  return tscProcessSql(pNew);
}

int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

  void *asyncFp = pSql->fp;
740 741 742 743
  if (pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_CONNECT || pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META || pCmd->command == TSDB_SQL_METRIC) {
    tscBuildMsg[pCmd->command](pSql, NULL);
S
slguan 已提交
744
  }
745 746 747

  int32_t code = tscSendMsgToServer(pSql);

S
slguan 已提交
748
  if (asyncFp) {
749
    if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
750 751 752 753 754 755
      pRes->code = code;
      tscQueueAsyncRes(pSql);
    }
    return 0;
  }

756
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
757 758 759 760 761 762
    pRes->code = code;
    return code;
  }

  tsem_wait(&pSql->rspSem);

763
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) (*tscProcessMsgRsp[pCmd->command])(pSql);
S
slguan 已提交
764 765 766 767 768 769 770

  tsem_post(&pSql->emptyRspSem);

  return pRes->code;
}

int tscProcessSql(SSqlObj *pSql) {
771 772 773
  char *   name = NULL;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
774 775
  
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
776 777 778
  SMeterMetaInfo *pMeterMetaInfo = NULL;
  int16_t         type = 0;

779 780 781 782 783
  if (pQueryInfo != NULL) {
    pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
    if (pMeterMetaInfo != NULL) {
      name = pMeterMetaInfo->name;
    }
784

785
    type = pQueryInfo->type;
S
slguan 已提交
786
  }
787

788
  tscTrace("%p SQL cmd:%d will be processed, name:%s, type:%d", pSql, pCmd->command, name, type);
H
hzcheng 已提交
789 790
  pSql->retry = 0;
  if (pSql->cmd.command < TSDB_SQL_MGMT) {
S
slguan 已提交
791 792 793
#ifdef CLUSTER
    pSql->maxRetry = TSDB_VNODES_SUPPORT;
#else
H
hzcheng 已提交
794
    pSql->maxRetry = 2;
S
slguan 已提交
795
#endif
796

797
    if (pMeterMetaInfo == NULL) {  // the pMeterMetaInfo cannot be NULL
H
hjxilinx 已提交
798 799 800
      pSql->res.code = TSDB_CODE_OTHERS;
      return pSql->res.code;
    }
801

S
slguan 已提交
802 803
    if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
      pSql->index = pMeterMetaInfo->pMeterMeta->index;
H
hjxilinx 已提交
804
    } else {  // it must be the parent SSqlObj for super table query
805
      if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) != 0) {
806
        int32_t idx = pMeterMetaInfo->vnodeIndex;
807

S
slguan 已提交
808
        SVnodeSidList *pSidList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx);
H
hzcheng 已提交
809 810 811 812 813 814 815 816 817
        pSql->index = pSidList->index;
      }
    }
  } else if (pSql->cmd.command < TSDB_SQL_LOCAL) {
    pSql->index = pSql->cmd.command < TSDB_SQL_READ ? tsMasterIndex : tsSlaveIndex;
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }

S
slguan 已提交
818
  // todo handle async situation
819 820
  if (QUERY_IS_JOIN_QUERY(type)) {
    if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) {
S
slguan 已提交
821
      SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
822

823
      pState->numOfTotal = pQueryInfo->numOfTables;
S
slguan 已提交
824

825
      for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
S
slguan 已提交
826 827 828 829
        SJoinSubquerySupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i);

        if (pSupporter == NULL) {  // failed to create support struct, abort current query
          tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i);
830
          pState->numOfCompleted = pQueryInfo->numOfTables - i - 1;
S
slguan 已提交
831 832 833 834 835
          pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;

          return pSql->res.code;
        }

H
hjxilinx 已提交
836
        int32_t code = tscLaunchJoinSubquery(pSql, i, pSupporter);
S
slguan 已提交
837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858
        if (code != TSDB_CODE_SUCCESS) {  // failed to create subquery object, quit query
          tscDestroyJoinSupporter(pSupporter);
          pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;

          break;
        }
      }

      sem_post(&pSql->emptyRspSem);
      sem_wait(&pSql->rspSem);

      sem_post(&pSql->emptyRspSem);

      if (pSql->numOfSubs <= 0) {
        pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
      } else {
        pSql->cmd.command = TSDB_SQL_METRIC_JOIN_RETRIEVE;
      }

      return TSDB_CODE_SUCCESS;
    } else {
      // for first stage sub query, iterate all vnodes to get all timestamp
859
      if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
S
slguan 已提交
860 861 862 863
        return doProcessSql(pSql);
      }
    }
  }
H
hzcheng 已提交
864

S
slguan 已提交
865
  if (tscIsTwoStageMergeMetricQuery(pCmd)) {
H
hzcheng 已提交
866 867 868 869 870 871 872 873 874 875 876 877 878 879 880
    /*
     * (ref. line: 964)
     * Before this function returns from tscLaunchMetricSubQueries and continues, pSql may have been released at user
     * program context after retrieving all data from vnodes. User function is called at tscRetrieveFromVnodeCallBack.
     *
     * when pSql being released, pSql->fp == NULL, it may pass the check of pSql->fp == NULL,
     * which causes deadlock. So we keep it as local variable.
     */
    void *fp = pSql->fp;

    if (tscLaunchMetricSubQueries(pSql) != TSDB_CODE_SUCCESS) {
      return pRes->code;
    }

    if (fp == NULL) {
S
slguan 已提交
881 882 883
      sem_post(&pSql->emptyRspSem);
      sem_wait(&pSql->rspSem);
      sem_post(&pSql->emptyRspSem);
H
hzcheng 已提交
884 885 886 887 888 889 890 891

      // set the command flag must be after the semaphore been correctly set.
      pSql->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
    }

    return pSql->res.code;
  }

S
slguan 已提交
892 893
  return doProcessSql(pSql);
}
H
hzcheng 已提交
894

895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911
static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState* pState) {
  assert(numOfSubs <= pSql->numOfSubs && numOfSubs >= 0 && pState != NULL);
  
  for(int32_t i = 0; i < numOfSubs; ++i) {
    SSqlObj* pSub = pSql->pSubs[i];
    assert(pSub != NULL);
    
    SRetrieveSupport* pSupport = pSub->param;
  
    tfree(pSupport->localBuffer);
  
    pthread_mutex_unlock(&pSupport->queryMutex);
    pthread_mutex_destroy(&pSupport->queryMutex);
  
    tfree(pSupport);
  
    tscFreeSqlObj(pSub);
S
slguan 已提交
912
  }
913 914
  
  free(pState);
H
hzcheng 已提交
915 916 917 918
}

int tscLaunchMetricSubQueries(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;
919
  SSqlCmd *pCmd = &pSql->cmd;
920

S
slguan 已提交
921
  // pRes->code check only serves in launching metric sub-queries
H
hzcheng 已提交
922
  if (pRes->code == TSDB_CODE_QUERY_CANCELLED) {
923 924
    pCmd->command = TSDB_SQL_RETRIEVE_METRIC;  // enable the abort of kill metric function.
    return pRes->code;
H
hzcheng 已提交
925 926 927 928 929 930 931
  }

  tExtMemBuffer **  pMemoryBuf = NULL;
  tOrderDescriptor *pDesc = NULL;
  tColModel *       pModel = NULL;

  pRes->qhandle = 1;  // hack the qhandle check
932 933 934 935

  const uint32_t nBufferSize = (1 << 16);  // 64KB

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
936
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
937 938
  int32_t         numOfSubQueries = pMeterMetaInfo->pMetricMeta->numOfVnodes;
  assert(numOfSubQueries > 0);
H
hzcheng 已提交
939 940 941 942 943 944 945 946 947 948

  int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize);
  if (ret != 0) {
    pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
    if (pSql->fp) {
      tscQueueAsyncRes(pSql);
    }
    return pRes->code;
  }

949 950
  pSql->pSubs = calloc(numOfSubQueries, POINTER_BYTES);
  pSql->numOfSubs = numOfSubQueries;
951

952
  tscTrace("%p retrieved query data from %d vnode(s)", pSql, numOfSubQueries);
S
slguan 已提交
953
  SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
954
  pState->numOfTotal = numOfSubQueries;
H
hzcheng 已提交
955 956
  pRes->code = TSDB_CODE_SUCCESS;

957 958 959 960 961
  int32_t i = 0;
  for (; i < numOfSubQueries; ++i) {
    SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport));
    if (trs == NULL) {
      tscError("%p failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
H
hzcheng 已提交
962 963
      break;
    }
964
    
H
hzcheng 已提交
965 966
    trs->pExtMemBuffer = pMemoryBuf;
    trs->pOrderDescriptor = pDesc;
S
slguan 已提交
967
    trs->pState = pState;
968
    
H
hzcheng 已提交
969
    trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage));
970 971 972 973 974 975
    if (trs->localBuffer == NULL) {
      tscError("%p failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
      tfree(trs);
      break;
    }
    
H
hjxilinx 已提交
976
    trs->subqueryIndex = i;
H
hzcheng 已提交
977 978 979 980 981 982 983 984
    trs->pParentSqlObj = pSql;
    trs->pFinalColModel = pModel;

    pthread_mutexattr_t mutexattr = {0};
    pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE_NP);
    pthread_mutex_init(&trs->queryMutex, &mutexattr);
    pthread_mutexattr_destroy(&mutexattr);

985
    SSqlObj *pNew = tscCreateSqlObjForSubquery(pSql, trs, NULL);
S
slguan 已提交
986
    if (pNew == NULL) {
987 988 989
      tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
      tfree(trs->localBuffer);
      tfree(trs);
S
slguan 已提交
990 991 992 993
      break;
    }

    // todo handle multi-vnode situation
994
    if (pQueryInfo->tsBuf) {
995
      SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
996
      pNewQueryInfo->tsBuf = tsBufClone(pQueryInfo->tsBuf);
S
slguan 已提交
997
    }
998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022
    
    tscTrace("%p sub:%p create subquery success. orderOfSub:%d", pSql, pNew, trs->subqueryIndex);
  }
  
  if (i < numOfSubQueries) {
    tscError("%p failed to prepare subquery structure and launch subqueries", pSql);
    pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
  
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, numOfSubQueries);
    doCleanupSubqueries(pSql, i, pState);
    return pRes->code;   // free all allocated resource
  }
  
  if (pRes->code == TSDB_CODE_QUERY_CANCELLED) {
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, numOfSubQueries);
    doCleanupSubqueries(pSql, i, pState);
    return pRes->code;
  }
  
  for(int32_t j = 0; j < numOfSubQueries; ++j) {
    SSqlObj* pSub = pSql->pSubs[j];
    SRetrieveSupport* pSupport = pSub->param;
    
    tscTrace("%p sub:%p launch subquery, orderOfSub:%d.", pSql, pSub, pSupport->subqueryIndex);
    tscProcessSql(pSub);
H
hzcheng 已提交
1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033
  }

  return TSDB_CODE_SUCCESS;
}

static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) {
  tscTrace("%p start to free subquery result", pSql);

  if (pSql->res.code == TSDB_CODE_SUCCESS) {
    taos_free_result(pSql);
  }
S
slguan 已提交
1034

H
hzcheng 已提交
1035 1036 1037 1038 1039 1040 1041 1042
  tfree(trsupport->localBuffer);

  pthread_mutex_unlock(&trsupport->queryMutex);
  pthread_mutex_destroy(&trsupport->queryMutex);

  tfree(trsupport);
}

S
slguan 已提交
1043 1044
static void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows);

H
hzcheng 已提交
1045
static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t errCode) {
S
slguan 已提交
1046 1047 1048 1049 1050 1051 1052 1053 1054
// set no disk space error info
#ifdef WINDOWS
  LPVOID lpMsgBuf;
  FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL,
                GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),  // Default language
                (LPTSTR)&lpMsgBuf, 0, NULL);
  tscError("sub:%p failed to flush data to disk:reason:%s", tres, lpMsgBuf);
  LocalFree(lpMsgBuf);
#else
H
hzcheng 已提交
1055 1056 1057
  char buf[256] = {0};
  strerror_r(errno, buf, 256);
  tscError("sub:%p failed to flush data to disk:reason:%s", tres, buf);
S
slguan 已提交
1058
#endif
H
hzcheng 已提交
1059

S
slguan 已提交
1060
  trsupport->pState->code = -errCode;
H
hzcheng 已提交
1061 1062 1063 1064
  trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;

  pthread_mutex_unlock(&trsupport->queryMutex);

S
slguan 已提交
1065
  tscRetrieveFromVnodeCallBack(trsupport, tres, trsupport->pState->code);
H
hzcheng 已提交
1066 1067 1068 1069
}

static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
  SSqlObj *pPObj = trsupport->pParentSqlObj;
H
hjxilinx 已提交
1070
  int32_t  subqueryIndex = trsupport->subqueryIndex;
H
hzcheng 已提交
1071 1072

  assert(pSql != NULL);
1073 1074 1075
  SSubqueryState* pState = trsupport->pState;
  assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
         pPObj->numOfSubs == pState->numOfTotal);
H
hzcheng 已提交
1076 1077

  /* retrieved in subquery failed. OR query cancelled in retrieve phase. */
1078 1079
  if (pState->code == TSDB_CODE_SUCCESS && pPObj->res.code != TSDB_CODE_SUCCESS) {
    pState->code = -(int)pPObj->res.code;
H
hzcheng 已提交
1080 1081 1082 1083 1084 1085 1086 1087

    /*
     * kill current sub-query connection, which may retrieve data from vnodes;
     * Here we get: pPObj->res.code == TSDB_CODE_QUERY_CANCELLED
     */
    pSql->res.numOfRows = 0;
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;  // disable retry efforts
    tscTrace("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%d", trsupport->pParentSqlObj, pSql,
1088
             subqueryIndex, pState->code);
H
hzcheng 已提交
1089 1090
  }

S
slguan 已提交
1091
  if (numOfRows >= 0) {  // current query is successful, but other sub query failed, still abort current query.
H
hjxilinx 已提交
1092 1093
    tscTrace("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pPObj, pSql, numOfRows, subqueryIndex);
    tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pPObj, pSql,
1094
        subqueryIndex, pState->code);
H
hzcheng 已提交
1095
  } else {
1096
    if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pState->code == TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1097
      /*
S
slguan 已提交
1098 1099
       * current query failed, and the retry count is less than the available
       * count, retry query clear previous retrieved data, then launch a new sub query
H
hzcheng 已提交
1100
       */
H
hjxilinx 已提交
1101
      tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]);
H
hzcheng 已提交
1102 1103 1104 1105 1106

      // clear local saved number of results
      trsupport->localBuffer->numOfElems = 0;
      pthread_mutex_unlock(&trsupport->queryMutex);

S
slguan 已提交
1107
      tscTrace("%p sub:%p retrieve failed, code:%d, orderOfSub:%d, retry:%d", trsupport->pParentSqlObj, pSql, numOfRows,
H
hjxilinx 已提交
1108
               subqueryIndex, trsupport->numOfRetry);
S
slguan 已提交
1109

1110
      SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql);
S
slguan 已提交
1111 1112 1113 1114
      if (pNew == NULL) {
        tscError("%p sub:%p failed to create new subquery sqlobj due to out of memory, abort retry",
                 trsupport->pParentSqlObj, pSql);

1115
        pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
1116 1117 1118
        trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
        return;
      }
H
hzcheng 已提交
1119 1120 1121

      tscProcessSql(pNew);
      return;
S
slguan 已提交
1122
    } else {  // reach the maximum retry count, abort
1123
      atomic_val_compare_exchange_32(&pState->code, TSDB_CODE_SUCCESS, numOfRows);
S
slguan 已提交
1124
      tscError("%p sub:%p retrieve failed,code:%d,orderOfSub:%d failed.no more retry,set global code:%d", pPObj, pSql,
1125
               numOfRows, subqueryIndex, pState->code);
H
hzcheng 已提交
1126 1127 1128
    }
  }

1129 1130 1131
  int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1);
  if (finished < pState->numOfTotal) {
    tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished);
H
hzcheng 已提交
1132 1133 1134 1135
    return tscFreeSubSqlObj(trsupport, pSql);
  }

  // all subqueries are failed
1136 1137 1138
  tscError("%p retrieve from %d vnode(s) completed,code:%d.FAILED.", pPObj, pState->numOfTotal,
           pState->code);
  pPObj->res.code = -(pState->code);
H
hzcheng 已提交
1139 1140 1141

  // release allocated resource
  tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel,
1142
                            pState->numOfTotal);
H
hzcheng 已提交
1143

S
slguan 已提交
1144
  tfree(trsupport->pState);
H
hzcheng 已提交
1145 1146
  tscFreeSubSqlObj(trsupport, pSql);

S
slguan 已提交
1147
  // sync query, wait for the master SSqlObj to proceed
H
hzcheng 已提交
1148 1149
  if (pPObj->fp == NULL) {
    // sync query, wait for the master SSqlObj to proceed
S
slguan 已提交
1150 1151
    tsem_wait(&pPObj->emptyRspSem);
    tsem_wait(&pPObj->emptyRspSem);
H
hzcheng 已提交
1152

S
slguan 已提交
1153
    tsem_post(&pPObj->rspSem);
H
hzcheng 已提交
1154 1155 1156

    pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
  } else {
S
slguan 已提交
1157
    // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
1158 1159
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);

1160
    if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) == TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
S
slguan 已提交
1161 1162 1163 1164 1165
      (*pPObj->fp)(pPObj->param, pPObj, pPObj->res.code);
    } else {  // regular super table query
      if (pPObj->res.code != TSDB_CODE_SUCCESS) {
        tscQueueAsyncRes(pPObj);
      }
H
hzcheng 已提交
1166 1167 1168 1169 1170 1171
    }
  }
}

void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
  SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
H
hjxilinx 已提交
1172
  int32_t           idx = trsupport->subqueryIndex;
H
hzcheng 已提交
1173 1174 1175 1176
  SSqlObj *         pPObj = trsupport->pParentSqlObj;
  tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;

  SSqlObj *pSql = (SSqlObj *)tres;
1177
  if (pSql == NULL) {  // sql object has been released in error process, return immediately
H
hzcheng 已提交
1178 1179 1180 1181
    tscTrace("%p subquery has been released, idx:%d, abort", pPObj, idx);
    return;
  }

1182 1183 1184 1185
  SSubqueryState* pState = trsupport->pState;
  assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
      pPObj->numOfSubs == pState->numOfTotal);
  
H
hzcheng 已提交
1186 1187 1188
  // query process and cancel query process may execute at the same time
  pthread_mutex_lock(&trsupport->queryMutex);

1189
  if (numOfRows < 0 || pState->code < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1190 1191 1192
    return tscHandleSubRetrievalError(trsupport, pSql, numOfRows);
  }

1193 1194 1195
  SSqlRes *   pRes = &pSql->res;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

1196
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
1197

S
slguan 已提交
1198
  SVnodeSidList *vnodeInfo = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx);
H
hzcheng 已提交
1199 1200 1201 1202
  SVPeerDesc *   pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];

  if (numOfRows > 0) {
    assert(pRes->numOfRows == numOfRows);
1203
    atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows);
H
hzcheng 已提交
1204

S
slguan 已提交
1205
    tscTrace("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%d from ip:%u,vid:%d,orderOfSub:%d", pPObj, pSql,
1206
             pRes->numOfRows, pState->numOfRetrievedRows, pSvd->ip, pSvd->vnode, idx);
H
hzcheng 已提交
1207 1208 1209 1210

#ifdef _DEBUG_VIEW
    printf("received data from vnode: %d rows\n", pRes->numOfRows);
    SSrcColumnInfo colInfo[256] = {0};
1211
    SQueryInfo *   pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
1212 1213

    tscGetSrcColumnInfo(colInfo, pQueryInfo);
H
hzcheng 已提交
1214 1215
    tColModelDisplayEx(pDesc->pSchema, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo);
#endif
S
slguan 已提交
1216
    if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) {
S
slguan 已提交
1217 1218
      tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql,
               tsAvailTmpDirGB, tsMinimalTmpDirGB);
S
slguan 已提交
1219 1220 1221
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
      return;
    }
S
slguan 已提交
1222
    int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data,
1223
                               pRes->numOfRows, pQueryInfo->groupbyExpr.orderType);
H
hzcheng 已提交
1224 1225 1226 1227 1228 1229 1230 1231
    if (ret < 0) {
      // set no disk space error info, and abort retry
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
    } else {
      pthread_mutex_unlock(&trsupport->queryMutex);
      taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param);
    }

S
slguan 已提交
1232 1233
  } else {  // all data has been retrieved to client
    /* data in from current vnode is stored in cache and disk */
1234
    uint32_t numOfRowsFromVnode = trsupport->pExtMemBuffer[idx]->numOfAllElems + trsupport->localBuffer->numOfElems;
S
slguan 已提交
1235 1236
    tscTrace("%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, pSvd->ip,
             pSvd->vnode, numOfRowsFromVnode, idx);
H
hzcheng 已提交
1237

S
slguan 已提交
1238
    tColModelCompact(pDesc->pSchema, trsupport->localBuffer, pDesc->pSchema->maxCapacity);
H
hzcheng 已提交
1239 1240 1241 1242

#ifdef _DEBUG_VIEW
    printf("%ld rows data flushed to disk:\n", trsupport->localBuffer->numOfElems);
    SSrcColumnInfo colInfo[256] = {0};
1243 1244
    SQueryInfo *   pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);

1245
    tscGetSrcColumnInfo(colInfo, pQueryInfo);
H
hzcheng 已提交
1246 1247 1248
    tColModelDisplayEx(pDesc->pSchema, trsupport->localBuffer->data, trsupport->localBuffer->numOfElems,
                       trsupport->localBuffer->numOfElems, colInfo);
#endif
S
slguan 已提交
1249
    if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) {
S
slguan 已提交
1250 1251
      tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql,
               tsAvailTmpDirGB, tsMinimalTmpDirGB);
S
slguan 已提交
1252 1253 1254
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
      return;
    }
H
hzcheng 已提交
1255 1256 1257

    // each result for a vnode is ordered as an independant list,
    // then used as an input of loser tree for disk-based merge routine
1258 1259
    int32_t ret = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer,
                                    pQueryInfo->groupbyExpr.orderType);
H
hzcheng 已提交
1260 1261 1262 1263
    if (ret != 0) {
      /* set no disk space error info, and abort retry */
      return tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
    }
1264 1265 1266 1267
  
    int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1);
    if (finished < pState->numOfTotal) {
      tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished);
H
hzcheng 已提交
1268 1269 1270 1271
      return tscFreeSubSqlObj(trsupport, pSql);
    }

    // all sub-queries are returned, start to local merge process
S
slguan 已提交
1272
    pDesc->pSchema->maxCapacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage;
H
hzcheng 已提交
1273

S
slguan 已提交
1274
    tscTrace("%p retrieve from %d vnodes completed.final NumOfRows:%d,start to build loser tree", pPObj,
1275
             pState->numOfTotal, pState->numOfCompleted);
H
hzcheng 已提交
1276

1277
    SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);
1278
    tscClearInterpInfo(pPQueryInfo);
1279

1280
    tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel,
H
hzcheng 已提交
1281 1282 1283 1284 1285 1286 1287 1288
                          &pPObj->cmd, &pPObj->res);
    tscTrace("%p build loser tree completed", pPObj);

    pPObj->res.precision = pSql->res.precision;
    pPObj->res.numOfRows = 0;
    pPObj->res.row = 0;

    // only free once
1289 1290
    tfree(trsupport->pState);
    
H
hzcheng 已提交
1291 1292 1293
    tscFreeSubSqlObj(trsupport, pSql);

    if (pPObj->fp == NULL) {
S
slguan 已提交
1294 1295
      tsem_wait(&pPObj->emptyRspSem);
      tsem_wait(&pPObj->emptyRspSem);
H
hzcheng 已提交
1296

S
slguan 已提交
1297
      tsem_post(&pPObj->rspSem);
H
hzcheng 已提交
1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310
    } else {
      // set the command flag must be after the semaphore been correctly set.
      pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
      if (pPObj->res.code == TSDB_CODE_SUCCESS) {
        (*pPObj->fp)(pPObj->param, pPObj, 0);
      } else {
        tscQueueAsyncRes(pPObj);
      }
    }
  }
}

void tscKillMetricQuery(SSqlObj *pSql) {
S
slguan 已提交
1311
  if (!tscIsTwoStageMergeMetricQuery(&pSql->cmd)) {
H
hzcheng 已提交
1312 1313 1314 1315 1316 1317 1318 1319 1320
    return;
  }

  for (int i = 0; i < pSql->numOfSubs; ++i) {
    SSqlObj *pSub = pSql->pSubs[i];

    if (pSub == NULL || pSub->thandle == NULL) {
      continue;
    }
S
slguan 已提交
1321

H
hzcheng 已提交
1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350
    /*
     * here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
     * sub-queries not correctly released and master sql object of metric query reaches an abnormal state.
     */
    pSql->pSubs[i]->res.code = TSDB_CODE_QUERY_CANCELLED;
    taosStopRpcConn(pSql->pSubs[i]->thandle);
  }

  pSql->numOfSubs = 0;

  /*
   * 1. if the subqueries are not launched or partially launched, we need to waiting the launched
   * query return to successfully free allocated resources.
   * 2. if no any subqueries are launched yet, which means the metric query only in parse sql stage,
   * set the res.code, and return.
   */
  const int64_t MAX_WAITING_TIME = 10000;  // 10 Sec.
  int64_t       stime = taosGetTimestampMs();

  while (pSql->cmd.command != TSDB_SQL_RETRIEVE_METRIC && pSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
    taosMsleep(100);
    if (taosGetTimestampMs() - stime > MAX_WAITING_TIME) {
      break;
    }
  }

  tscTrace("%p metric query is cancelled", pSql);
}

S
slguan 已提交
1351
static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int retCode);
H
hzcheng 已提交
1352

S
slguan 已提交
1353
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
H
hjxilinx 已提交
1354
  SSqlObj *pNew = createSubqueryObj(pSql, 0, tscRetrieveDataRes, trsupport, prevSqlObj);
S
slguan 已提交
1355
  if (pNew != NULL) {  // the sub query of two-stage super table query
1356
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
1357 1358
    pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
    assert(pQueryInfo->numOfTables == 1);
1359 1360

    // launch subquery for each vnode, so the subquery index equals to the vnodeIndex.
1361
    SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hjxilinx 已提交
1362
    pMeterMetaInfo->vnodeIndex = trsupport->subqueryIndex;
1363

H
hjxilinx 已提交
1364
    pSql->pSubs[trsupport->subqueryIndex] = pNew;
1365
  }
H
hzcheng 已提交
1366 1367 1368 1369

  return pNew;
}

S
slguan 已提交
1370
void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
H
hzcheng 已提交
1371 1372
  SRetrieveSupport *trsupport = (SRetrieveSupport *)param;

S
slguan 已提交
1373
  SSqlObj *       pSql = (SSqlObj *)tres;
1374
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1375
  int32_t         idx = pMeterMetaInfo->vnodeIndex;
H
hzcheng 已提交
1376 1377

  SVnodeSidList *vnodeInfo = NULL;
S
slguan 已提交
1378 1379 1380 1381
  SVPeerDesc *   pSvd = NULL;
  if (pMeterMetaInfo->pMetricMeta != NULL) {
    vnodeInfo = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx);
    pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];
H
hzcheng 已提交
1382 1383
  }

1384 1385 1386 1387 1388
  SSubqueryState* pState = trsupport->pState;
  assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
         trsupport->pParentSqlObj->numOfSubs == pState->numOfTotal);
  
  if (trsupport->pParentSqlObj->res.code != TSDB_CODE_SUCCESS || pState->code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
1389
    // metric query is killed, Note: code must be less than 0
H
hzcheng 已提交
1390 1391
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
    if (trsupport->pParentSqlObj->res.code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
1392
      code = -(int)(trsupport->pParentSqlObj->res.code);
H
hzcheng 已提交
1393
    } else {
1394
      code = pState->code;
H
hzcheng 已提交
1395 1396
    }
    tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%d", trsupport->pParentSqlObj, pSql,
H
hjxilinx 已提交
1397
             trsupport->subqueryIndex, code);
H
hzcheng 已提交
1398 1399 1400
  }

  /*
S
slguan 已提交
1401
   * if a query on a vnode is failed, all retrieve operations from vnode that occurs later
H
hzcheng 已提交
1402 1403
   * than this one are actually not necessary, we simply call the tscRetrieveFromVnodeCallBack
   * function to abort current and remain retrieve process.
S
slguan 已提交
1404 1405
   *
   * NOTE: threadsafe is required.
H
hzcheng 已提交
1406
   */
S
slguan 已提交
1407
  if (code != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1408
    if (trsupport->numOfRetry++ >= MAX_NUM_OF_SUBQUERY_RETRY) {
S
slguan 已提交
1409
      tscTrace("%p sub:%p reach the max retry count,set global code:%d", trsupport->pParentSqlObj, pSql, code);
1410
      atomic_val_compare_exchange_32(&pState->code, 0, code);
H
hzcheng 已提交
1411
    } else {  // does not reach the maximum retry count, go on
S
slguan 已提交
1412 1413
      tscTrace("%p sub:%p failed code:%d, retry:%d", trsupport->pParentSqlObj, pSql, code, trsupport->numOfRetry);

1414
      SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql);
S
slguan 已提交
1415 1416
      if (pNew == NULL) {
        tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d",
H
hjxilinx 已提交
1417
                 trsupport->pParentSqlObj, pSql, pSvd->vnode, trsupport->subqueryIndex);
H
hzcheng 已提交
1418

1419
        pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
1420 1421
        trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
      } else {
1422
        SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
1423
        assert(pNewQueryInfo->pMeterInfo[0]->pMeterMeta != NULL && pNewQueryInfo->pMeterInfo[0]->pMetricMeta != NULL);
S
slguan 已提交
1424 1425 1426
        tscProcessSql(pNew);
        return;
      }
H
hzcheng 已提交
1427 1428 1429
    }
  }

1430
  if (pState->code != TSDB_CODE_SUCCESS) {  // failed, abort
H
hzcheng 已提交
1431 1432 1433
    if (vnodeInfo != NULL) {
      tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", trsupport->pParentSqlObj, pSql,
               vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
1434
               trsupport->subqueryIndex, pState->code);
H
hzcheng 已提交
1435 1436
    } else {
      tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", trsupport->pParentSqlObj, pSql,
1437
               trsupport->subqueryIndex, pState->code);
H
hzcheng 已提交
1438 1439
    }

1440
    tscRetrieveFromVnodeCallBack(param, tres, pState->code);
H
hzcheng 已提交
1441 1442 1443
  } else {  // success, proceed to retrieve data from dnode
    tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
             vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
H
hjxilinx 已提交
1444
             trsupport->subqueryIndex);
H
hzcheng 已提交
1445 1446 1447 1448 1449

    taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param);
  }
}

1450
int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1451 1452 1453 1454 1455 1456 1457
  char *pMsg, *pStart;
  int   msgLen = 0;

  pStart = pSql->cmd.payload + tsRpcHeadSize;
  pMsg = pStart;

  *((uint64_t *)pMsg) = pSql->res.qhandle;
S
slguan 已提交
1458 1459
  pMsg += sizeof(pSql->res.qhandle);

1460
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
1461 1462
  *((uint16_t *)pMsg) = htons(pQueryInfo->type);
  pMsg += sizeof(pQueryInfo->type);
H
hzcheng 已提交
1463 1464 1465 1466 1467 1468 1469 1470

  msgLen = pMsg - pStart;
  pSql->cmd.payloadLen = msgLen;
  pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE;

  return msgLen;
}

S
slguan 已提交
1471
void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) {
H
hzcheng 已提交
1472 1473
  SShellSubmitMsg *pShellMsg;
  char *           pMsg;
1474
  SMeterMetaInfo * pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
S
slguan 已提交
1475 1476

  SMeterMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
H
hzcheng 已提交
1477

S
slguan 已提交
1478
  pMsg = buf + tsRpcHeadSize;
H
hzcheng 已提交
1479 1480 1481

  pShellMsg = (SShellSubmitMsg *)pMsg;
  pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode);
1482 1483
  tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pSql->index].ip),
           htons(pShellMsg->vnode));
H
hzcheng 已提交
1484 1485
}

1486
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1487 1488 1489
  SShellSubmitMsg *pShellMsg;
  char *           pMsg, *pStart;
  int              msgLen = 0;
S
slguan 已提交
1490

1491 1492 1493 1494
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);

  SMeterMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
H
hzcheng 已提交
1495 1496 1497 1498 1499

  pStart = pSql->cmd.payload + tsRpcHeadSize;
  pMsg = pStart;

  pShellMsg = (SShellSubmitMsg *)pMsg;
1500 1501

  pShellMsg->import = htons(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT) ? 0 : 1);
H
hzcheng 已提交
1502
  pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode);
1503
  pShellMsg->numOfSid = htonl(pSql->cmd.numOfTablesInSubmit);  // number of meters to be inserted
H
hzcheng 已提交
1504

S
slguan 已提交
1505
  // pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here
H
hzcheng 已提交
1506
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
1507 1508
  tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pMeterMeta->index].ip),
           htons(pShellMsg->vnode));
H
hzcheng 已提交
1509 1510 1511 1512

  return msgLen;
}

S
slguan 已提交
1513 1514
void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char *buf) {
  SSqlCmd *       pCmd = &pSql->cmd;
1515
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
H
hzcheng 已提交
1516

S
slguan 已提交
1517
  char *          pStart = buf + tsRpcHeadSize;
H
hzcheng 已提交
1518 1519
  SQueryMeterMsg *pQueryMsg = (SQueryMeterMsg *)pStart;

S
slguan 已提交
1520 1521
  if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {  // pSchema == NULL, query on meter
    SMeterMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
H
hzcheng 已提交
1522 1523
    pQueryMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode);
  } else {  // query on metric
S
slguan 已提交
1524
    SMetricMeta *  pMetricMeta = pMeterMetaInfo->pMetricMeta;
H
hjxilinx 已提交
1525
    SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
H
hzcheng 已提交
1526 1527 1528 1529 1530 1531 1532 1533
    pQueryMsg->vnode = htons(pVnodeSidList->vpeerDesc[pSql->index].vnode);
  }
}

/*
 * for meter query, simply return the size <= 1k
 * for metric query, estimate size according to meter tags
 */
1534
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
1535
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
1536
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
1537

1538
  int32_t srcColListSize = pQueryInfo->colList.numOfCols * sizeof(SColumnInfo);
1539

1540
  int32_t         exprSize = sizeof(SSqlFuncExprMsg) * pQueryInfo->fieldsInfo.numOfOutputCols;
1541
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
1542 1543

  // meter query without tags values
1544
  if (!UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) {
H
hzcheng 已提交
1545 1546 1547
    return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryMeterMsg) + srcColListSize + exprSize;
  }

S
slguan 已提交
1548
  SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta;
H
hzcheng 已提交
1549

H
hjxilinx 已提交
1550
  SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
H
hzcheng 已提交
1551 1552

  int32_t meterInfoSize = (pMetricMeta->tagLen + sizeof(SMeterSidExtInfo)) * pVnodeSidList->numOfSids;
1553
  int32_t outputColumnSize = pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(SSqlFuncExprMsg);
H
hzcheng 已提交
1554

S
slguan 已提交
1555
  int32_t size = meterInfoSize + outputColumnSize + srcColListSize + exprSize + MIN_QUERY_MSG_PKT_SIZE;
1556 1557
  if (pQueryInfo->tsBuf != NULL) {
    size += pQueryInfo->tsBuf->fileSize;
S
slguan 已提交
1558 1559 1560
  }

  return size;
H
hzcheng 已提交
1561 1562
}

1563
static char *doSerializeTableInfo(SSqlObj *pSql, int32_t numOfMeters, int32_t vnodeId, char *pMsg) {
1564
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
1565

1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576
  SMeterMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta;
  SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta;

  tscTrace("%p vid:%d, query on %d meters", pSql, htons(vnodeId), numOfMeters);
  if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
#ifdef _DEBUG_VIEW
    tscTrace("%p sid:%d, uid:%lld", pSql, pMeterMetaInfo->pMeterMeta->sid, pMeterMetaInfo->pMeterMeta->uid);
#endif
    SMeterSidExtInfo *pMeterInfo = (SMeterSidExtInfo *)pMsg;
    pMeterInfo->sid = htonl(pMeterMeta->sid);
    pMeterInfo->uid = htobe64(pMeterMeta->uid);
1577

1578 1579 1580
    pMsg += sizeof(SMeterSidExtInfo);
  } else {
    SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
1581

1582 1583 1584
    for (int32_t i = 0; i < numOfMeters; ++i) {
      SMeterSidExtInfo *pMeterInfo = (SMeterSidExtInfo *)pMsg;
      SMeterSidExtInfo *pQueryMeterInfo = tscGetMeterSidInfo(pVnodeSidList, i);
1585

1586 1587
      pMeterInfo->sid = htonl(pQueryMeterInfo->sid);
      pMeterInfo->uid = htobe64(pQueryMeterInfo->uid);
1588

1589
      pMsg += sizeof(SMeterSidExtInfo);
1590

1591 1592 1593 1594 1595 1596 1597 1598
      memcpy(pMsg, pQueryMeterInfo->tags, pMetricMeta->tagLen);
      pMsg += pMetricMeta->tagLen;

#ifdef _DEBUG_VIEW
      tscTrace("%p sid:%d, uid:%lld", pSql, pQueryMeterInfo->sid, pQueryMeterInfo->uid);
#endif
    }
  }
1599

1600 1601 1602
  return pMsg;
}

1603
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1604 1605
  SSqlCmd *pCmd = &pSql->cmd;

1606
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1607

S
slguan 已提交
1608 1609 1610 1611 1612
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
    return -1;
  }

1613
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1614

1615
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
S
slguan 已提交
1616
  char *          pStart = pCmd->payload + tsRpcHeadSize;
H
hzcheng 已提交
1617

S
slguan 已提交
1618 1619
  SMeterMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta;
  SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta;
H
hzcheng 已提交
1620 1621 1622 1623 1624 1625

  SQueryMeterMsg *pQueryMsg = (SQueryMeterMsg *)pStart;

  int32_t msgLen = 0;
  int32_t numOfMeters = 0;

S
slguan 已提交
1626
  if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
H
hzcheng 已提交
1627 1628 1629
    numOfMeters = 1;

    tscTrace("%p query on vnode: %d, number of sid:%d, meter id: %s", pSql,
S
slguan 已提交
1630
             pMeterMeta->vpeerDesc[pMeterMeta->index].vnode, 1, pMeterMetaInfo->name);
H
hzcheng 已提交
1631 1632 1633 1634

    pQueryMsg->vnode = htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode);
    pQueryMsg->uid = pMeterMeta->uid;
    pQueryMsg->numOfTagsCols = 0;
1635
  } else {  // query on super table
H
hjxilinx 已提交
1636 1637
    if (pMeterMetaInfo->vnodeIndex < 0) {
      tscError("%p error vnodeIdx:%d", pSql, pMeterMetaInfo->vnodeIndex);
H
hzcheng 已提交
1638 1639 1640
      return -1;
    }

H
hjxilinx 已提交
1641
    SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
H
hzcheng 已提交
1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654
    uint32_t       vnodeId = pVnodeSidList->vpeerDesc[pVnodeSidList->index].vnode;

    numOfMeters = pVnodeSidList->numOfSids;
    if (numOfMeters <= 0) {
      tscError("%p vid:%d,error numOfMeters in query message:%d", pSql, vnodeId, numOfMeters);
      return -1;  // error
    }

    tscTrace("%p query on vid:%d, number of sid:%d", pSql, vnodeId, numOfMeters);
    pQueryMsg->vnode = htons(vnodeId);
  }

  pQueryMsg->numOfSids = htonl(numOfMeters);
S
slguan 已提交
1655
  pQueryMsg->numOfTagsCols = htons(pMeterMetaInfo->numOfTags);
H
hzcheng 已提交
1656

1657 1658 1659
  if (pQueryInfo->order.order == TSQL_SO_ASC) {
    pQueryMsg->skey = htobe64(pQueryInfo->stime);
    pQueryMsg->ekey = htobe64(pQueryInfo->etime);
H
hzcheng 已提交
1660
  } else {
1661 1662
    pQueryMsg->skey = htobe64(pQueryInfo->etime);
    pQueryMsg->ekey = htobe64(pQueryInfo->stime);
H
hzcheng 已提交
1663 1664
  }

1665 1666
  pQueryMsg->order = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId);
H
hzcheng 已提交
1667

1668
  pQueryMsg->interpoType = htons(pQueryInfo->interpoType);
H
hzcheng 已提交
1669

1670 1671
  pQueryMsg->limit = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset = htobe64(pQueryInfo->limit.offset);
H
hzcheng 已提交
1672

1673
  pQueryMsg->numOfCols = htons(pQueryInfo->colList.numOfCols);
H
hzcheng 已提交
1674

1675
  if (pQueryInfo->colList.numOfCols <= 0) {
H
hzcheng 已提交
1676 1677 1678 1679 1680 1681 1682 1683 1684
    tscError("%p illegal value of numOfCols in query msg: %d", pSql, pMeterMeta->numOfColumns);
    return -1;
  }

  if (pMeterMeta->numOfTags < 0) {
    tscError("%p illegal value of numOfTagsCols in query msg: %d", pSql, pMeterMeta->numOfTags);
    return -1;
  }

1685 1686 1687 1688
  pQueryMsg->nAggTimeInterval = htobe64(pQueryInfo->nAggTimeInterval);
  pQueryMsg->intervalTimeUnit = pQueryInfo->intervalTimeUnit;
  if (pQueryInfo->nAggTimeInterval < 0) {
    tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->nAggTimeInterval);
H
hzcheng 已提交
1689 1690 1691
    return -1;
  }

1692 1693
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
S
slguan 已提交
1694 1695 1696
    return -1;
  }

1697
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
S
slguan 已提交
1698 1699

  if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {  // query on meter
H
hzcheng 已提交
1700 1701 1702 1703 1704
    pQueryMsg->tagLength = 0;
  } else {  // query on metric
    pQueryMsg->tagLength = htons(pMetricMeta->tagLen);
  }

1705 1706
  pQueryMsg->queryType = htons(pQueryInfo->type);
  pQueryMsg->numOfOutputCols = htons(pQueryInfo->exprsInfo.numOfExprs);
H
hzcheng 已提交
1707

1708
  if (pQueryInfo->fieldsInfo.numOfOutputCols < 0) {
1709 1710
    tscError("%p illegal value of number of output columns in query msg: %d", pSql,
             pQueryInfo->fieldsInfo.numOfOutputCols);
H
hzcheng 已提交
1711 1712 1713 1714
    return -1;
  }

  // set column list ids
1715
  char *   pMsg = (char *)(pQueryMsg->colList) + pQueryInfo->colList.numOfCols * sizeof(SColumnInfo);
H
hzcheng 已提交
1716 1717
  SSchema *pSchema = tsGetSchema(pMeterMeta);

1718 1719
  for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) {
    SColumnBase *pCol = tscColumnBaseInfoGet(&pQueryInfo->colList, i);
S
slguan 已提交
1720
    SSchema *    pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
1721

S
slguan 已提交
1722
    if (pCol->colIndex.columnIndex >= pMeterMeta->numOfColumns || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
H
hzcheng 已提交
1723
        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
S
slguan 已提交
1724 1725
      tscError("%p vid:%d sid:%d id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s", pSql,
               htons(pQueryMsg->vnode), pMeterMeta->sid, pMeterMetaInfo->name, pMeterMeta->numOfColumns, pCol->colIndex,
H
hzcheng 已提交
1726 1727
               pColSchema->name);

S
slguan 已提交
1728
      return -1;  // 0 means build msg failed
H
hzcheng 已提交
1729 1730 1731 1732 1733
    }

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
    pQueryMsg->colList[i].type = htons(pColSchema->type);
S
slguan 已提交
1734
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
1735

S
slguan 已提交
1736 1737 1738
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
1739

S
slguan 已提交
1740 1741 1742 1743
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
      pFilterMsg->filterOnBinary = htons(pColFilter->filterOnBinary);

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
1744

S
slguan 已提交
1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755
      if (pColFilter->filterOnBinary) {
        pFilterMsg->len = htobe64(pColFilter->len);
        memcpy(pMsg, (void *)pColFilter->pz, pColFilter->len + 1);
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
1756

S
slguan 已提交
1757 1758 1759 1760 1761
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
        return -1;
      }
    }
H
hzcheng 已提交
1762 1763 1764 1765
  }

  bool hasArithmeticFunction = false;

S
slguan 已提交
1766
  SSqlFuncExprMsg *pSqlFuncExpr = (SSqlFuncExprMsg *)pMsg;
H
hzcheng 已提交
1767

1768 1769
  for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
1770

S
slguan 已提交
1771
    if (pExpr->functionId == TSDB_FUNC_ARITHM) {
H
hzcheng 已提交
1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782
      hasArithmeticFunction = true;
    }

    if (!tscValidateColumnId(pCmd, pExpr->colInfo.colId)) {
      /* column id is not valid according to the cached metermeta, the meter meta is expired */
      tscError("%p table schema is not matched with parsed sql", pSql);
      return -1;
    }

    pSqlFuncExpr->colInfo.colId = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIdx = htons(pExpr->colInfo.colIdx);
S
slguan 已提交
1783
    pSqlFuncExpr->colInfo.flag = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
1784

S
slguan 已提交
1785
    pSqlFuncExpr->functionId = htons(pExpr->functionId);
H
hzcheng 已提交
1786 1787 1788 1789 1790 1791 1792 1793 1794
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
    pMsg += sizeof(SSqlFuncExprMsg);

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
S
slguan 已提交
1795 1796 1797

        // by plus one char to make the string null-terminated
        pMsg += pExpr->param[j].nLen + 1;
H
hzcheng 已提交
1798 1799 1800 1801 1802 1803 1804 1805 1806 1807
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

    pSqlFuncExpr = (SSqlFuncExprMsg *)pMsg;
  }

  int32_t len = 0;
  if (hasArithmeticFunction) {
1808 1809
    SColumnBase *pColBase = pQueryInfo->colList.pColList;
    for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) {
S
slguan 已提交
1810
      char *  name = pSchema[pColBase[i].colIndex.columnIndex].name;
H
hzcheng 已提交
1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821
      int32_t lenx = strlen(name);
      memcpy(pMsg, name, lenx);
      *(pMsg + lenx) = ',';

      len += (lenx + 1);  // one for comma
      pMsg += (lenx + 1);
    }
  }

  pQueryMsg->colNameLen = htonl(len);

1822 1823
  // serialize the table info (sid, uid, tags)
  pMsg = doSerializeTableInfo(pSql, numOfMeters, htons(pQueryMsg->vnode), pMsg);
H
hzcheng 已提交
1824

S
slguan 已提交
1825 1826 1827
  // only include the required tag column schema. If a tag is not required, it won't be sent to vnode
  if (pMeterMetaInfo->numOfTags > 0) {
    // always transfer tag schema to vnode if exists
H
hzcheng 已提交
1828 1829
    SSchema *pTagSchema = tsGetTagSchema(pMeterMeta);

S
slguan 已提交
1830 1831 1832 1833
    for (int32_t j = 0; j < pMeterMetaInfo->numOfTags; ++j) {
      if (pMeterMetaInfo->tagColumnIndex[j] == TSDB_TBNAME_COLUMN_INDEX) {
        SSchema tbSchema = {
            .bytes = TSDB_METER_NAME_LEN, .colId = TSDB_TBNAME_COLUMN_INDEX, .type = TSDB_DATA_TYPE_BINARY};
H
hzcheng 已提交
1834 1835
        memcpy(pMsg, &tbSchema, sizeof(SSchema));
      } else {
S
slguan 已提交
1836
        memcpy(pMsg, &pTagSchema[pMeterMetaInfo->tagColumnIndex[j]], sizeof(SSchema));
H
hzcheng 已提交
1837 1838 1839 1840 1841 1842
      }

      pMsg += sizeof(SSchema);
    }
  }

1843
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
S
slguan 已提交
1844 1845
  if (pGroupbyExpr->numOfGroupCols != 0) {
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
1846 1847
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
      SColIndexEx *pCol = &pGroupbyExpr->columnInfo[j];

      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

      *((int16_t *)pMsg) += pCol->colIdx;
      pMsg += sizeof(pCol->colIdx);

      *((int16_t *)pMsg) += pCol->colIdxInBuf;
      pMsg += sizeof(pCol->colIdxInBuf);

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
    }
  }

1865 1866 1867 1868
  if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
      *((int64_t *)pMsg) = htobe64(pQueryInfo->defaultVal[i]);
      pMsg += sizeof(pQueryInfo->defaultVal[0]);
S
slguan 已提交
1869 1870 1871 1872 1873 1874 1875 1876
    }
  }

  // compressed ts block
  pQueryMsg->tsOffset = htonl(pMsg - pStart);
  int32_t tsLen = 0;
  int32_t numOfBlocks = 0;

1877 1878 1879
  if (pQueryInfo->tsBuf != NULL) {
    STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pMeterMetaInfo->vnodeIndex);
    assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL);  // this query should not be sent
S
slguan 已提交
1880 1881

    // todo refactor
1882 1883
    fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET);
    fread(pMsg, pBlockInfo->compLen, 1, pQueryInfo->tsBuf->f);
S
slguan 已提交
1884 1885 1886 1887

    pMsg += pBlockInfo->compLen;
    tsLen = pBlockInfo->compLen;
    numOfBlocks = pBlockInfo->numOfBlocks;
H
hzcheng 已提交
1888 1889
  }

S
slguan 已提交
1890 1891
  pQueryMsg->tsLen = htonl(tsLen);
  pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
1892 1893
  if (pQueryInfo->tsBuf != NULL) {
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
H
hzcheng 已提交
1894 1895 1896 1897 1898 1899 1900 1901 1902
  }

  msgLen = pMsg - pStart;

  tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
  pCmd->payloadLen = msgLen;
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;

  assert(msgLen + minMsgSize() <= size);
1903 1904

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1905 1906
}

1907
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1908 1909
  SCreateDbMsg *pCreateDbMsg;
  char *        pMsg, *pStart;
S
slguan 已提交
1910

1911
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1912

1913
  pMsg = doBuildMsgHeader(pSql, &pStart);
H
hzcheng 已提交
1914
  pCreateDbMsg = (SCreateDbMsg *)pMsg;
1915

1916
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
L
lihui 已提交
1917
  strncpy(pCreateDbMsg->db, pMeterMetaInfo->name, tListLen(pCreateDbMsg->db));
H
hzcheng 已提交
1918 1919
  pMsg += sizeof(SCreateDbMsg);

1920
  pCmd->payloadLen = pMsg - pStart;
H
hzcheng 已提交
1921 1922
  pCmd->msgType = TSDB_MSG_TYPE_CREATE_DB;

1923
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1924 1925
}

1926
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1927 1928
  SCreateDnodeMsg *pCreate;

1929
  char *pMsg, *pStart;
S
slguan 已提交
1930

1931
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1932

1933
  pMsg = doBuildMsgHeader(pSql, &pStart);
H
hzcheng 已提交
1934 1935

  pCreate = (SCreateDnodeMsg *)pMsg;
1936
  strncpy(pCreate->ip, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
H
hzcheng 已提交
1937 1938 1939

  pMsg += sizeof(SCreateDnodeMsg);

1940 1941
  pCmd->payloadLen = pMsg - pStart;
  pCmd->msgType = TSDB_MSG_TYPE_CREATE_DNODE;
H
hzcheng 已提交
1942

1943
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1944 1945
}

1946 1947 1948 1949
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SCreateAcctMsg *pAlterMsg;
  char *          pMsg, *pStart;
  int             msgLen = 0;
H
hzcheng 已提交
1950

1951
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1952

1953
  pMsg = doBuildMsgHeader(pSql, &pStart);
H
hzcheng 已提交
1954

1955
  pAlterMsg = (SCreateAcctMsg *)pMsg;
H
hzcheng 已提交
1956

1957 1958
  SSQLToken *pName = &pInfo->pDCLInfo->user.user;
  SSQLToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
1959

1960 1961
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
1962

1963
  pMsg += sizeof(SCreateAcctMsg);
H
hzcheng 已提交
1964

1965
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
1966

1967 1968 1969 1970 1971 1972 1973 1974
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
1975

1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
1989 1990 1991 1992

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;

1993 1994
  pCmd->msgType = TSDB_MSG_TYPE_CREATE_ACCT;
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1995 1996
}

1997 1998 1999
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SAlterUserMsg *pAlterMsg;
  char *         pMsg, *pStart;
H
hzcheng 已提交
2000

2001
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
2002

2003 2004
  pMsg = doBuildMsgHeader(pSql, &pStart);
  pAlterMsg = (SCreateUserMsg *)pMsg;
H
hzcheng 已提交
2005

2006 2007 2008
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
  pAlterMsg->flag = pUser->type;
H
hzcheng 已提交
2009

2010 2011 2012 2013 2014
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
  }
H
hzcheng 已提交
2015

2016
  pMsg += sizeof(SAlterUserMsg);
H
hzcheng 已提交
2017

2018
  pCmd->payloadLen = pMsg - pStart;
H
hzcheng 已提交
2019

2020 2021 2022 2023 2024
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pCmd->msgType = TSDB_MSG_TYPE_ALTER_USER;
  } else {
    pCmd->msgType = TSDB_MSG_TYPE_CREATE_USER;
  }
H
hzcheng 已提交
2025

2026
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2027 2028
}

2029 2030 2031
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  char *   pStart = NULL;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
2032

2033 2034
  char *pMsg = doBuildMsgHeader(pSql, &pStart);
  pMsg += sizeof(SCfgMsg);
H
hzcheng 已提交
2035

2036 2037
  pCmd->payloadLen = pMsg - pStart;
  pCmd->msgType = TSDB_MSG_TYPE_CFG_PNODE;
H
hzcheng 已提交
2038

2039 2040
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
2041

2042 2043 2044 2045 2046 2047
char *doBuildMsgHeader(SSqlObj *pSql, char **pStart) {
  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;

  char *pMsg = pCmd->payload + tsRpcHeadSize;
  *pStart = pMsg;
H
hzcheng 已提交
2048 2049 2050 2051

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
  strcpy(pMgmt->db, pObj->db);

2052
  pMsg += sizeof(SMgmtHead);
H
hzcheng 已提交
2053

2054
  return pMsg;
H
hzcheng 已提交
2055 2056
}

2057 2058 2059
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SDropDbMsg *pDropDbMsg;
  char *      pMsg, *pStart;
S
slguan 已提交
2060

2061
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
2062

2063 2064
  pMsg = doBuildMsgHeader(pSql, &pStart);
  pDropDbMsg = (SDropDbMsg *)pMsg;
H
hzcheng 已提交
2065

2066
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
2067 2068
  strncpy(pDropDbMsg->db, pMeterMetaInfo->name, tListLen(pDropDbMsg->db));
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
2069

2070
  pMsg += sizeof(SDropDbMsg);
H
hzcheng 已提交
2071

2072 2073
  pCmd->payloadLen = pMsg - pStart;
  pCmd->msgType = TSDB_MSG_TYPE_DROP_DB;
H
hzcheng 已提交
2074

2075
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2076 2077
}

2078 2079 2080 2081
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SDropTableMsg *pDropTableMsg;
  char *         pMsg, *pStart;
  int            msgLen = 0;
H
hzcheng 已提交
2082

2083
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
2084

2085 2086
  pMsg = doBuildMsgHeader(pSql, &pStart);
  pDropTableMsg = (SDropTableMsg *)pMsg;
H
hzcheng 已提交
2087

2088
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
2089
  strcpy(pDropTableMsg->meterId, pMeterMetaInfo->name);
H
hzcheng 已提交
2090

2091 2092
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
  pMsg += sizeof(SDropTableMsg);
H
hzcheng 已提交
2093 2094 2095

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
2096
  pCmd->msgType = TSDB_MSG_TYPE_DROP_TABLE;
H
hzcheng 已提交
2097

2098
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2099 2100
}

2101 2102 2103
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SDropDnodeMsg *pDrop;
  char *         pMsg, *pStart;
H
hzcheng 已提交
2104

S
slguan 已提交
2105
  SSqlCmd *       pCmd = &pSql->cmd;
2106
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
S
slguan 已提交
2107

2108 2109
  pMsg = doBuildMsgHeader(pSql, &pStart);
  pDrop = (SDropDnodeMsg *)pMsg;
H
hzcheng 已提交
2110

2111
  strcpy(pDrop->ip, pMeterMetaInfo->name);
H
hzcheng 已提交
2112

2113
  pMsg += sizeof(SDropDnodeMsg);
H
hzcheng 已提交
2114

2115 2116
  pCmd->payloadLen = pMsg - pStart;
  pCmd->msgType = TSDB_MSG_TYPE_DROP_DNODE;
H
hzcheng 已提交
2117

2118
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2119 2120
}

2121 2122
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SDropUserMsg *pDropMsg;
H
hzcheng 已提交
2123 2124 2125
  char *        pMsg, *pStart;
  int           msgLen = 0;

2126
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
2127

2128 2129
  pMsg = doBuildMsgHeader(pSql, &pStart);
  pDropMsg = (SDropUserMsg *)pMsg;
H
hzcheng 已提交
2130

2131
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
S
slguan 已提交
2132
  strcpy(pDropMsg->user, pMeterMetaInfo->name);
H
hzcheng 已提交
2133

2134
  pMsg += sizeof(SDropUserMsg);
H
hzcheng 已提交
2135

2136 2137
  pCmd->payloadLen = pMsg - pStart;
  pCmd->msgType = TSDB_MSG_TYPE_DROP_USER;
H
hzcheng 已提交
2138 2139 2140 2141

  return msgLen;
}

2142
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2143 2144 2145
  SUseDbMsg *pUseDbMsg;
  char *     pMsg, *pStart;

2146
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
2147

2148
  pMsg = doBuildMsgHeader(pSql, &pStart);
H
hzcheng 已提交
2149
  pUseDbMsg = (SUseDbMsg *)pMsg;
2150

2151
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
S
slguan 已提交
2152
  strcpy(pUseDbMsg->db, pMeterMetaInfo->name);
H
hzcheng 已提交
2153 2154 2155

  pMsg += sizeof(SUseDbMsg);

2156
  pCmd->payloadLen = pMsg - pStart;
H
hzcheng 已提交
2157 2158
  pCmd->msgType = TSDB_MSG_TYPE_USE_DB;

2159
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2160 2161
}

2162
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2163 2164 2165 2166 2167 2168 2169 2170
  SShowMsg *pShowMsg;
  char *    pMsg, *pStart;
  int       msgLen = 0;

  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;

  int32_t size = minMsgSize() + sizeof(SMgmtHead) + sizeof(SShowTableMsg) + pCmd->payloadLen + TSDB_EXTRA_PAYLOAD_SIZE;
S
slguan 已提交
2171 2172 2173 2174
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for show msg", pSql);
    return -1;
  }
H
hzcheng 已提交
2175 2176 2177 2178 2179

  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
S
slguan 已提交
2180

2181
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
S
slguan 已提交
2182 2183 2184
  size_t          nameLen = strlen(pMeterMetaInfo->name);

  if (nameLen > 0) {
2185
    strcpy(pMgmt->db, pMeterMetaInfo->name);  // prefix is set here
H
hzcheng 已提交
2186 2187 2188 2189 2190 2191 2192
  } else {
    strcpy(pMgmt->db, pObj->db);
  }

  pMsg += sizeof(SMgmtHead);

  pShowMsg = (SShowMsg *)pMsg;
2193
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
H
hzcheng 已提交
2194

2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
    SSQLToken *pPattern = &pShowInfo->pattern;
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
    pMsg += (sizeof(SShowTableMsg) + pPattern->n);
  } else {
    SSQLToken *pIpAddr = &pShowInfo->prefix;
    assert(pIpAddr->n > 0 && pIpAddr->type > 0);
H
hzcheng 已提交
2207

2208 2209
    strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n);
    pShowMsg->payloadLen = htons(pIpAddr->n);
H
hzcheng 已提交
2210

2211 2212 2213 2214
    pMsg += (sizeof(SShowTableMsg) + pIpAddr->n);
  }

  pCmd->payloadLen = pMsg - pStart;
H
hzcheng 已提交
2215 2216 2217
  pCmd->msgType = TSDB_MSG_TYPE_SHOW;

  assert(msgLen + minMsgSize() <= size);
2218 2219

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2220 2221
}

2222
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2223 2224 2225 2226 2227
  SKillQuery *pKill;
  char *      pMsg, *pStart;

  SSqlCmd *pCmd = &pSql->cmd;

2228
  pMsg = doBuildMsgHeader(pSql, &pStart);
H
hzcheng 已提交
2229 2230 2231
  pKill = (SKillQuery *)pMsg;

  pKill->handle = 0;
2232
  strncpy(pKill->queryId, pInfo->pDCLInfo->ip.z, pInfo->pDCLInfo->ip.n);
H
hzcheng 已提交
2233

2234
  pMsg += sizeof(SKillQuery);
H
hzcheng 已提交
2235

2236
  pCmd->payloadLen = pMsg - pStart;
H
hzcheng 已提交
2237

2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
      pCmd->msgType = TSDB_MSG_TYPE_KILL_QUERY;
      break;
    case TSDB_SQL_KILL_CONNECTION:
      pCmd->msgType = TSDB_MSG_TYPE_KILL_CONNECTION;
      break;
    case TSDB_SQL_KILL_STREAM:
      pCmd->msgType = TSDB_MSG_TYPE_KILL_STREAM;
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2250 2251
}

2252
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2253 2254 2255 2256
  SSqlCmd *pCmd = &(pSql->cmd);

  int32_t size = minMsgSize() + sizeof(SMgmtHead) + sizeof(SCreateTableMsg);

2257
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
2258
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
2259 2260 2261 2262
    size += sizeof(STagData);
  } else {
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
  }
2263

2264 2265 2266
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
2267 2268 2269 2270

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

2271
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2272 2273 2274 2275 2276 2277
  SCreateTableMsg *pCreateTableMsg;
  char *           pMsg, *pStart;
  int              msgLen = 0;
  SSchema *        pSchema;
  int              size = 0;

2278 2279 2280
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2281
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
2282 2283

  // Reallocate the payload size
2284
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
2285 2286
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
2287
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
2288
  }
H
hzcheng 已提交
2289 2290 2291 2292 2293

  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
2294 2295

  // use dbinfo from table id without modifying current db info
S
slguan 已提交
2296
  tscGetDBInfoFromMeterId(pMeterMetaInfo->name, pMgmt->db);
H
hzcheng 已提交
2297 2298 2299 2300

  pMsg += sizeof(SMgmtHead);

  pCreateTableMsg = (SCreateTableMsg *)pMsg;
S
slguan 已提交
2301
  strcpy(pCreateTableMsg->meterId, pMeterMetaInfo->name);
H
hzcheng 已提交
2302

2303 2304 2305 2306
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;

H
hzcheng 已提交
2307 2308 2309 2310
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
2311
  pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
2312

2313 2314 2315
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
    memcpy(pMsg, &pInfo->pCreateTableInfo->usingInfo.tagdata, sizeof(STagData));
H
hzcheng 已提交
2316
    pMsg += sizeof(STagData);
2317
  } else {  // create (super) table
H
hzcheng 已提交
2318
    pSchema = pCreateTableMsg->schema;
2319

H
hzcheng 已提交
2320
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
2321
      TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
H
hzcheng 已提交
2322 2323 2324 2325

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
2326

H
hzcheng 已提交
2327 2328 2329 2330
      pSchema++;
    }

    pMsg = (char *)pSchema;
2331 2332
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
2333

2334 2335 2336
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
2337 2338 2339
    }
  }

2340
  tscClearFieldInfo(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
2341 2342 2343 2344 2345 2346

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
  pCmd->msgType = TSDB_MSG_TYPE_CREATE_TABLE;

  assert(msgLen + minMsgSize() <= size);
2347
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2348 2349 2350
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
2351
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2352
  return minMsgSize() + sizeof(SMgmtHead) + sizeof(SAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
2353 2354 2355
         TSDB_EXTRA_PAYLOAD_SIZE;
}

2356
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2357 2358 2359 2360 2361
  SAlterTableMsg *pAlterTableMsg;
  char *          pMsg, *pStart;
  int             msgLen = 0;
  int             size = 0;

2362 2363 2364
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

2365
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
2366 2367

  size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
2368 2369 2370 2371
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
    return -1;
  }
H
hzcheng 已提交
2372 2373 2374 2375 2376

  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
S
slguan 已提交
2377
  tscGetDBInfoFromMeterId(pMeterMetaInfo->name, pMgmt->db);
H
hzcheng 已提交
2378 2379
  pMsg += sizeof(SMgmtHead);

2380 2381
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;

H
hzcheng 已提交
2382
  pAlterTableMsg = (SAlterTableMsg *)pMsg;
S
slguan 已提交
2383
  strcpy(pAlterTableMsg->meterId, pMeterMetaInfo->name);
2384
  pAlterTableMsg->type = htons(pAlterInfo->type);
2385

2386
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
2387
  memcpy(pAlterTableMsg->tagVal, pAlterInfo->tagData.data, TSDB_MAX_TAGS_LEN);
H
hzcheng 已提交
2388 2389

  SSchema *pSchema = pAlterTableMsg->schema;
2390 2391
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
    TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
H
hzcheng 已提交
2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405

    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
  pCmd->msgType = TSDB_MSG_TYPE_ALTER_TABLE;

  assert(msgLen + minMsgSize() <= size);
2406

2407
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2408 2409
}

2410
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2411 2412 2413 2414
  SAlterDbMsg *pAlterDbMsg;
  char *       pMsg, *pStart;
  int          msgLen = 0;

S
slguan 已提交
2415 2416
  SSqlCmd *       pCmd = &pSql->cmd;
  STscObj *       pObj = pSql->pTscObj;
2417
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
S
slguan 已提交
2418

H
hzcheng 已提交
2419 2420 2421 2422 2423 2424 2425 2426
  pStart = pCmd->payload + tsRpcHeadSize;
  pMsg = pStart;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
  strcpy(pMgmt->db, pObj->db);
  pMsg += sizeof(SMgmtHead);

  pAlterDbMsg = (SAlterDbMsg *)pMsg;
S
slguan 已提交
2427
  strcpy(pAlterDbMsg->db, pMeterMetaInfo->name);
H
hzcheng 已提交
2428 2429 2430 2431 2432 2433 2434

  pMsg += sizeof(SAlterDbMsg);

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
  pCmd->msgType = TSDB_MSG_TYPE_ALTER_DB;

2435
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2436 2437
}

2438
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2439 2440 2441 2442 2443 2444 2445 2446 2447
  char *pMsg, *pStart;
  int   msgLen = 0;

  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;
  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
S
slguan 已提交
2448

2449
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2450
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
S
slguan 已提交
2451 2452 2453 2454
  size_t          nameLen = strlen(pMeterMetaInfo->name);

  if (nameLen > 0) {
    strcpy(pMgmt->db, pMeterMetaInfo->name);
H
hzcheng 已提交
2455 2456 2457
  } else {
    strcpy(pMgmt->db, pObj->db);
  }
S
slguan 已提交
2458

H
hzcheng 已提交
2459 2460
  pMsg += sizeof(SMgmtHead);

2461
  *((uint64_t *)pMsg) = pSql->res.qhandle;
S
slguan 已提交
2462 2463
  pMsg += sizeof(pSql->res.qhandle);

2464 2465
  *((uint16_t *)pMsg) = htons(pQueryInfo->type);
  pMsg += sizeof(pQueryInfo->type);
H
hzcheng 已提交
2466 2467 2468 2469 2470

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
  pCmd->msgType = TSDB_MSG_TYPE_RETRIEVE;

2471
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2472 2473
}

2474
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
2475
  if (tscCreateResPointerInfo(pQueryInfo, pRes) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
2476 2477 2478
    return pRes->code;
  }

2479 2480 2481
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
    TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, i);
H
hzcheng 已提交
2482 2483

    pRes->bytes[i] = pField->bytes;
2484
    if (pQueryInfo->order.order == TSQL_SO_DESC) {
H
hzcheng 已提交
2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503
      pRes->bytes[i] = -pRes->bytes[i];
      pRes->tsrow[i] = ((pRes->data + offset * pRes->numOfRows) + (pRes->numOfRows - 1) * pField->bytes);
    } else {
      pRes->tsrow[i] = (pRes->data + offset * pRes->numOfRows);
    }
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
2504 2505 2506

  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hzcheng 已提交
2507 2508 2509 2510 2511 2512 2513
  pRes->code = TSDB_CODE_SUCCESS;

  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

2514
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
2515 2516 2517
    pRes->row = 0;

  } else {
S
slguan 已提交
2518
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
2534
  SSqlCmd *       pCmd = &pSql->cmd;
2535
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
S
slguan 已提交
2536 2537

  int32_t numOfRes = pMeterMetaInfo->pMeterMeta->numOfColumns + pMeterMetaInfo->pMeterMeta->numOfTags;
H
hzcheng 已提交
2538 2539 2540 2541 2542

  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

int tscProcessTagRetrieveRsp(SSqlObj *pSql) {
2543 2544 2545
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2546
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
S
slguan 已提交
2547 2548

  int32_t numOfRes = 0;
2549
  if (tscSqlExprGet(pQueryInfo, 0)->functionId == TSDB_FUNC_TAGPRJ) {
S
slguan 已提交
2550
    numOfRes = pMeterMetaInfo->pMetricMeta->numOfMeters;
H
hzcheng 已提交
2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561
  } else {
    numOfRes = 1;  // for count function, there is only one output.
  }
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

int tscProcessRetrieveMetricRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

  pRes->code = tscLocalDoReduce(pSql);
2562
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hzcheng 已提交
2563 2564

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
2565
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
2566 2567 2568 2569
  }

  pRes->row = 0;

2570
  uint8_t code = pRes->code;
H
hzcheng 已提交
2571
  if (pSql->fp) {  // async retrieve metric data
2572 2573
    if (pRes->code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
H
hzcheng 已提交
2574 2575 2576 2577 2578 2579 2580 2581
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

S
slguan 已提交
2582
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
2583

2584
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600
  SConnectMsg *pConnect;
  char *       pMsg, *pStart;
  int          msgLen = 0;

  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;
  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  pConnect = (SConnectMsg *)pMsg;

  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
  strcpy(pConnect->db, db);

S
slguan 已提交
2601 2602
  strcpy(pConnect->clientVersion, version);

H
hzcheng 已提交
2603 2604 2605 2606 2607 2608 2609 2610 2611
  pMsg += sizeof(SConnectMsg);

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
  pCmd->msgType = TSDB_MSG_TYPE_CONNECT;

  return msgLen;
}

2612
int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2613 2614 2615 2616 2617 2618 2619
  SMeterInfoMsg *pInfoMsg;
  char *         pMsg, *pStart;
  int            msgLen = 0;

  char *tmpData = 0;
  if (pSql->cmd.allocSize > 0) {
    tmpData = calloc(1, pSql->cmd.allocSize);
2620 2621 2622 2623
    if (NULL == tmpData) {
      return TSDB_CODE_CLI_OUT_OF_MEMORY;
    }

H
hzcheng 已提交
2624 2625 2626 2627
    // STagData is in binary format, strncpy is not available
    memcpy(tmpData, pSql->cmd.payload, pSql->cmd.allocSize);
  }

2628 2629 2630
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

2631
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
S
slguan 已提交
2632

H
hzcheng 已提交
2633 2634 2635 2636
  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
S
slguan 已提交
2637
  tscGetDBInfoFromMeterId(pMeterMetaInfo->name, pMgmt->db);
H
hzcheng 已提交
2638 2639 2640 2641

  pMsg += sizeof(SMgmtHead);

  pInfoMsg = (SMeterInfoMsg *)pMsg;
S
slguan 已提交
2642
  strcpy(pInfoMsg->meterId, pMeterMetaInfo->name);
2643
  pInfoMsg->createFlag = htons(pSql->cmd.createOnDemand ? 1 : 0);
H
hzcheng 已提交
2644 2645
  pMsg += sizeof(SMeterInfoMsg);

2646
  if (pSql->cmd.createOnDemand) {
H
hzcheng 已提交
2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660
    memcpy(pInfoMsg->tags, tmpData, sizeof(STagData));
    pMsg += sizeof(STagData);
  }

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
  pCmd->msgType = TSDB_MSG_TYPE_METERINFO;

  tfree(tmpData);

  assert(msgLen + minMsgSize() <= pCmd->allocSize);
  return msgLen;
}

S
slguan 已提交
2661 2662 2663 2664 2665
/**
 *  multi meter meta req pkg format:
 *  | SMgmtHead | SMultiMeterInfoMsg | meterId0 | meterId1 | meterId2 | ......
 *      no used         4B
 **/
2666
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
    tmpData = calloc(1, pCmd->payloadLen + 1);
    if (NULL == tmpData) return -1;
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
  memset(pMgmt->db, 0, TSDB_METER_ID_LEN);  // server don't need the db

  SMultiMeterInfoMsg *pInfoMsg = (SMultiMeterInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
  pInfoMsg->numOfMeters = htonl((int32_t)pCmd->count);

  if (pCmd->payloadLen > 0) {
    memcpy(pInfoMsg->meterId, tmpData, pCmd->payloadLen);
  }

  tfree(tmpData);

  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SMultiMeterInfoMsg);
  pCmd->msgType = TSDB_MSG_TYPE_MULTI_METERINFO;

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

  tscTrace("%p build load multi-metermeta msg completed, numOfMeters:%d, msg size:%d", pSql, pCmd->count,
           pCmd->payloadLen);

  return pCmd->payloadLen;
}

H
hzcheng 已提交
2701 2702 2703
static int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
  const int32_t defaultSize =
      minMsgSize() + sizeof(SMetricMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
2704
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hzcheng 已提交
2705

S
slguan 已提交
2706
  int32_t n = 0;
2707 2708
  for (int32_t i = 0; i < pQueryInfo->tagCond.numOfTagCond; ++i) {
    n += strlen(pQueryInfo->tagCond.cond[i].cond);
H
hzcheng 已提交
2709
  }
S
slguan 已提交
2710

H
hjxilinx 已提交
2711
  int32_t tagLen = n * TSDB_NCHAR_SIZE;
2712 2713
  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
H
hjxilinx 已提交
2714
  }
2715

S
slguan 已提交
2716
  int32_t joinCondLen = (TSDB_METER_ID_LEN + sizeof(int16_t)) * 2;
2717
  int32_t elemSize = sizeof(SMetricMetaElemMsg) * pQueryInfo->numOfTables;
S
slguan 已提交
2718 2719 2720 2721

  int32_t len = tagLen + joinCondLen + elemSize + defaultSize;

  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
H
hzcheng 已提交
2722 2723
}

2724
int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2725 2726 2727
  SMetricMetaMsg *pMetaMsg;
  char *          pMsg, *pStart;
  int             msgLen = 0;
S
slguan 已提交
2728
  int             tableIndex = 0;
H
hzcheng 已提交
2729

2730 2731 2732
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

2733
  STagCond *pTagCond = &pQueryInfo->tagCond;
S
slguan 已提交
2734

2735
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, tableIndex);
H
hzcheng 已提交
2736 2737

  int32_t size = tscEstimateMetricMetaMsgSize(pCmd);
S
slguan 已提交
2738 2739 2740 2741
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for metric meter msg", pSql);
    return -1;
  }
H
hzcheng 已提交
2742 2743 2744 2745 2746

  pStart = pCmd->payload + tsRpcHeadSize;
  pMsg = pStart;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
S
slguan 已提交
2747
  tscGetDBInfoFromMeterId(pMeterMetaInfo->name, pMgmt->db);
H
hzcheng 已提交
2748 2749 2750 2751

  pMsg += sizeof(SMgmtHead);

  pMetaMsg = (SMetricMetaMsg *)pMsg;
2752
  pMetaMsg->numOfMeters = htonl(pQueryInfo->numOfTables);
S
slguan 已提交
2753 2754 2755 2756 2757 2758 2759 2760

  pMsg += sizeof(SMetricMetaMsg);

  int32_t offset = pMsg - (char *)pMetaMsg;
  pMetaMsg->join = htonl(offset);

  // todo refactor
  pMetaMsg->joinCondLen = htonl((TSDB_METER_ID_LEN + sizeof(int16_t)) * 2);
H
hzcheng 已提交
2761

S
slguan 已提交
2762 2763
  memcpy(pMsg, pTagCond->joinInfo.left.meterId, TSDB_METER_ID_LEN);
  pMsg += TSDB_METER_ID_LEN;
H
hzcheng 已提交
2764

S
slguan 已提交
2765 2766 2767 2768 2769 2770 2771 2772 2773
  *(int16_t *)pMsg = pTagCond->joinInfo.left.tagCol;
  pMsg += sizeof(int16_t);

  memcpy(pMsg, pTagCond->joinInfo.right.meterId, TSDB_METER_ID_LEN);
  pMsg += TSDB_METER_ID_LEN;

  *(int16_t *)pMsg = pTagCond->joinInfo.right.tagCol;
  pMsg += sizeof(int16_t);

2774 2775
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
    pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, i);
S
slguan 已提交
2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788
    uint64_t uid = pMeterMetaInfo->pMeterMeta->uid;

    offset = pMsg - (char *)pMetaMsg;
    pMetaMsg->metaElem[i] = htonl(offset);

    SMetricMetaElemMsg *pElem = (SMetricMetaElemMsg *)pMsg;
    pMsg += sizeof(SMetricMetaElemMsg);

    // convert to unicode before sending to mnode for metric query
    int32_t condLen = 0;
    if (pTagCond->numOfTagCond > 0) {
      SCond *pCond = tsGetMetricQueryCondPos(pTagCond, uid);
      if (pCond != NULL) {
H
hjxilinx 已提交
2789
        condLen = strlen(pCond->cond) + 1;
2790

H
hjxilinx 已提交
2791
        bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE);
S
slguan 已提交
2792 2793 2794 2795 2796
        if (!ret) {
          tscError("%p mbs to ucs4 failed:%s", pSql, tsGetMetricQueryCondPos(pTagCond, uid));
          return 0;
        }
      }
H
hzcheng 已提交
2797 2798
    }

S
slguan 已提交
2799
    pElem->condLen = htonl(condLen);
H
hzcheng 已提交
2800

S
slguan 已提交
2801 2802 2803
    offset = pMsg - (char *)pMetaMsg;
    pElem->cond = htonl(offset);
    pMsg += condLen * TSDB_NCHAR_SIZE;
H
hzcheng 已提交
2804

S
slguan 已提交
2805 2806 2807
    pElem->rel = htons(pTagCond->relType);
    if (pTagCond->tbnameCond.uid == uid) {
      offset = pMsg - (char *)pMetaMsg;
H
hzcheng 已提交
2808

S
slguan 已提交
2809
      pElem->tableCond = htonl(offset);
2810

H
hjxilinx 已提交
2811 2812
      uint32_t len = strlen(pTagCond->tbnameCond.cond);
      pElem->tableCondLen = htonl(len);
S
slguan 已提交
2813

H
hjxilinx 已提交
2814 2815
      memcpy(pMsg, pTagCond->tbnameCond.cond, len);
      pMsg += len;
S
slguan 已提交
2816 2817
    }

2818
    SSqlGroupbyExpr *pGroupby = &pQueryInfo->groupbyExpr;
S
slguan 已提交
2819

H
hjxilinx 已提交
2820
    if (pGroupby->tableIndex != i && pGroupby->numOfGroupCols > 0) {
S
slguan 已提交
2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835
      pElem->orderType = 0;
      pElem->orderIndex = 0;
      pElem->numOfGroupCols = 0;
    } else {
      pElem->numOfGroupCols = htons(pGroupby->numOfGroupCols);
      for (int32_t j = 0; j < pMeterMetaInfo->numOfTags; ++j) {
        pElem->tagCols[j] = htons(pMeterMetaInfo->tagColumnIndex[j]);
      }

      if (pGroupby->numOfGroupCols != 0) {
        pElem->orderIndex = htons(pGroupby->orderIndex);
        pElem->orderType = htons(pGroupby->orderType);
        offset = pMsg - (char *)pMetaMsg;

        pElem->groupbyTagColumnList = htonl(offset);
2836 2837
        for (int32_t j = 0; j < pQueryInfo->groupbyExpr.numOfGroupCols; ++j) {
          SColIndexEx *pCol = &pQueryInfo->groupbyExpr.columnInfo[j];
2838 2839
          SColIndexEx *pDestCol = (SColIndexEx *)pMsg;

H
hjxilinx 已提交
2840 2841 2842 2843
          pDestCol->colIdxInBuf = 0;
          pDestCol->colIdx = htons(pCol->colIdx);
          pDestCol->colId = htons(pDestCol->colId);
          pDestCol->flag = htons(pDestCol->flag);
2844

H
hjxilinx 已提交
2845
          pMsg += sizeof(SColIndexEx);
S
slguan 已提交
2846 2847
        }
      }
H
hzcheng 已提交
2848
    }
S
slguan 已提交
2849 2850 2851 2852 2853 2854

    strcpy(pElem->meterId, pMeterMetaInfo->name);
    pElem->numOfTags = htons(pMeterMetaInfo->numOfTags);

    int16_t len = pMsg - (char *)pElem;
    pElem->elemLen = htons(len);  // redundant data for integrate check
H
hzcheng 已提交
2855 2856 2857 2858 2859 2860 2861 2862 2863
  }

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
  pCmd->msgType = TSDB_MSG_TYPE_METRIC_META;
  assert(msgLen + minMsgSize() <= size);
  return msgLen;
}

2864
int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) {
H
hzcheng 已提交
2865 2866 2867 2868 2869 2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884 2885 2886
  int      size = 0;
  STscObj *pObj = pSql->pTscObj;

  size += tsRpcHeadSize + sizeof(SMgmtHead);
  size += sizeof(SQList);

  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
    size += sizeof(SQDesc);
    tpSql = tpSql->next;
  }

  size += sizeof(SSList);
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
    size += sizeof(SSDesc);
    pStream = pStream->next;
  }

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

2887
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2888 2889 2890 2891 2892 2893 2894 2895 2896
  char *pMsg, *pStart;
  int   msgLen = 0;
  int   size = 0;

  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;

  pthread_mutex_lock(&pObj->mutex);

2897
  size = tscEstimateHeartBeatMsgLength(pSql);
S
slguan 已提交
2898 2899 2900 2901
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for heartbeat msg", pSql);
    return -1;
  }
H
hzcheng 已提交
2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933 2934 2935 2936 2937

  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
  strcpy(pMgmt->db, pObj->db);
  pMsg += sizeof(SMgmtHead);

  pMsg = tscBuildQueryStreamDesc(pMsg, pObj);
  pthread_mutex_unlock(&pObj->mutex);

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
  pCmd->msgType = TSDB_MSG_TYPE_HEARTBEAT;

  assert(msgLen + minMsgSize() <= size);
  return msgLen;
}

int tscProcessMeterMetaRsp(SSqlObj *pSql) {
  SMeterMeta *pMeta;
  SSchema *   pSchema;
  uint8_t     ieType;

  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
    return TSDB_CODE_INVALID_IE;
  }

  rsp++;
  pMeta = (SMeterMeta *)rsp;

  pMeta->sid = htonl(pMeta->sid);
S
slguan 已提交
2938
  pMeta->sversion = htons(pMeta->sversion);
H
hzcheng 已提交
2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953
  pMeta->vgid = htonl(pMeta->vgid);
  pMeta->uid = htobe64(pMeta->uid);

  if (pMeta->sid < 0 || pMeta->vgid < 0) {
    tscError("invalid meter vgid:%d, sid%d", pMeta->vgid, pMeta->sid);
    return TSDB_CODE_INVALID_VALUE;
  }

  pMeta->numOfColumns = htons(pMeta->numOfColumns);

  if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    tscError("invalid numOfTags:%d", pMeta->numOfTags);
    return TSDB_CODE_INVALID_VALUE;
  }

2954
  if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns <= 0) {
H
hzcheng 已提交
2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979 2980 2981
    tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
    return TSDB_CODE_INVALID_VALUE;
  }

  for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
    pMeta->vpeerDesc[i].vnode = htonl(pMeta->vpeerDesc[i].vnode);
  }

  pMeta->rowSize = 0;
  rsp += sizeof(SMeterMeta);
  pSchema = (SSchema *)rsp;

  int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);

    // ignore the tags length
    if (i < pMeta->numOfColumns) {
      pMeta->rowSize += pSchema->bytes;
    }
    pSchema++;
  }

  rsp += numOfTotalCols * sizeof(SSchema);

  int32_t  tagLen = 0;
S
slguan 已提交
2982
  SSchema *pTagsSchema = tsGetTagSchema(pMeta);
H
hzcheng 已提交
2983 2984 2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996

  if (pMeta->meterType == TSDB_METER_MTABLE) {
    for (int32_t i = 0; i < pMeta->numOfTags; ++i) {
      tagLen += pTagsSchema[i].bytes;
    }
  }

  rsp += tagLen;
  int32_t size = (int32_t)(rsp - (char *)pMeta);

  // pMeta->index = rand() % TSDB_VNODES_SUPPORT;
  pMeta->index = 0;

  // todo add one more function: taosAddDataIfNotExists();
2997
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
2998
  assert(pMeterMetaInfo->pMeterMeta == NULL);
H
hzcheng 已提交
2999

S
slguan 已提交
3000 3001
  pMeterMetaInfo->pMeterMeta = (SMeterMeta *)taosAddDataIntoCache(tscCacheHandle, pMeterMetaInfo->name, (char *)pMeta,
                                                                  size, tsMeterMetaKeepTimer);
3002
  // todo handle out of memory case
S
slguan 已提交
3003
  if (pMeterMetaInfo->pMeterMeta == NULL) return 0;
H
hzcheng 已提交
3004 3005 3006 3007

  return TSDB_CODE_OTHERS;
}

S
slguan 已提交
3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117
/**
 *  multi meter meta rsp pkg format:
 *  | STaosRsp | ieType | SMultiMeterInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
  SSchema *pSchema;
  uint8_t  ieType;
  int32_t  totalNum;
  int32_t  i;

  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
    pSql->res.code = TSDB_CODE_INVALID_IE;
    pSql->res.numOfTotal = 0;
    return TSDB_CODE_OTHERS;
  }

  rsp++;

  SMultiMeterInfoMsg *pInfo = (SMultiMeterInfoMsg *)rsp;
  totalNum = htonl(pInfo->numOfMeters);
  rsp += sizeof(SMultiMeterInfoMsg);

  for (i = 0; i < totalNum; i++) {
    SMultiMeterMeta *pMultiMeta = (SMultiMeterMeta *)rsp;
    SMeterMeta *     pMeta = &pMultiMeta->meta;

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
    pMeta->vgid = htonl(pMeta->vgid);
    pMeta->uid = htobe64(pMeta->uid);

    if (pMeta->sid <= 0 || pMeta->vgid < 0) {
      tscError("invalid meter vgid:%d, sid%d", pMeta->vgid, pMeta->sid);
      pSql->res.code = TSDB_CODE_INVALID_VALUE;
      pSql->res.numOfTotal = i;
      return TSDB_CODE_OTHERS;
    }

    pMeta->numOfColumns = htons(pMeta->numOfColumns);

    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
      tscError("invalid tag value count:%d", pMeta->numOfTags);
      pSql->res.code = TSDB_CODE_INVALID_VALUE;
      pSql->res.numOfTotal = i;
      return TSDB_CODE_OTHERS;
    }

    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
      tscError("invalid numOfTags:%d", pMeta->numOfTags);
      pSql->res.code = TSDB_CODE_INVALID_VALUE;
      pSql->res.numOfTotal = i;
      return TSDB_CODE_OTHERS;
    }

    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
      pSql->res.code = TSDB_CODE_INVALID_VALUE;
      pSql->res.numOfTotal = i;
      return TSDB_CODE_OTHERS;
    }

    for (int j = 0; j < TSDB_VNODES_SUPPORT; ++j) {
      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    }

    pMeta->rowSize = 0;
    rsp += sizeof(SMultiMeterMeta);
    pSchema = (SSchema *)rsp;

    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    for (int j = 0; j < numOfTotalCols; ++j) {
      pSchema->bytes = htons(pSchema->bytes);
      pSchema->colId = htons(pSchema->colId);

      // ignore the tags length
      if (j < pMeta->numOfColumns) {
        pMeta->rowSize += pSchema->bytes;
      }
      pSchema++;
    }

    rsp += numOfTotalCols * sizeof(SSchema);

    int32_t  tagLen = 0;
    SSchema *pTagsSchema = tsGetTagSchema(pMeta);

    if (pMeta->meterType == TSDB_METER_MTABLE) {
      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
        tagLen += pTagsSchema[j].bytes;
      }
    }

    rsp += tagLen;
    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with SMeterMeta in cache

    pMeta->index = 0;
    (void)taosAddDataIntoCache(tscCacheHandle, pMultiMeta->meterId, (char *)pMeta, size, tsMeterMetaKeepTimer);
  }

  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
  tscTrace("%p load multi-metermeta resp complete num:%d", pSql, pSql->res.numOfTotal);
  return TSDB_CODE_SUCCESS;
}

H
hzcheng 已提交
3118 3119 3120
int tscProcessMetricMetaRsp(SSqlObj *pSql) {
  SMetricMeta *pMeta;
  uint8_t      ieType;
S
slguan 已提交
3121 3122 3123 3124
  void **      metricMetaList = NULL;
  int32_t *    sizes = NULL;

  char *rsp = pSql->res.pRsp;
H
hzcheng 已提交
3125 3126 3127 3128 3129 3130 3131 3132 3133

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
    return TSDB_CODE_INVALID_IE;
  }

  rsp++;

S
slguan 已提交
3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159
  int32_t num = htons(*(int16_t *)rsp);
  rsp += sizeof(int16_t);

  metricMetaList = calloc(1, POINTER_BYTES * num);
  sizes = calloc(1, sizeof(int32_t) * num);

  // return with error code
  if (metricMetaList == NULL || sizes == NULL) {
    tfree(metricMetaList);
    tfree(sizes);
    pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;

    return pSql->res.code;
  }

  for (int32_t k = 0; k < num; ++k) {
    pMeta = (SMetricMeta *)rsp;

    size_t size = (size_t)pSql->res.rspLen - 1;
    rsp = rsp + sizeof(SMetricMeta);

    pMeta->numOfMeters = htonl(pMeta->numOfMeters);
    pMeta->numOfVnodes = htonl(pMeta->numOfVnodes);
    pMeta->tagLen = htons(pMeta->tagLen);

    size += pMeta->numOfVnodes * sizeof(SVnodeSidList *) + pMeta->numOfMeters * sizeof(SMeterSidExtInfo *);
H
hzcheng 已提交
3160

3161 3162
    char *pBuf = calloc(1, size);
    if (pBuf == NULL) {
S
slguan 已提交
3163 3164 3165
      pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      goto _error_clean;
    }
H
hzcheng 已提交
3166

3167
    SMetricMeta *pNewMetricMeta = (SMetricMeta *)pBuf;
S
slguan 已提交
3168
    metricMetaList[k] = pNewMetricMeta;
H
hzcheng 已提交
3169

S
slguan 已提交
3170 3171 3172
    pNewMetricMeta->numOfMeters = pMeta->numOfMeters;
    pNewMetricMeta->numOfVnodes = pMeta->numOfVnodes;
    pNewMetricMeta->tagLen = pMeta->tagLen;
H
hzcheng 已提交
3173

3174
    pBuf = pBuf + sizeof(SMetricMeta) + pNewMetricMeta->numOfVnodes * sizeof(SVnodeSidList *);
H
hzcheng 已提交
3175

S
slguan 已提交
3176 3177
    for (int32_t i = 0; i < pMeta->numOfVnodes; ++i) {
      SVnodeSidList *pSidLists = (SVnodeSidList *)rsp;
3178
      memcpy(pBuf, pSidLists, sizeof(SVnodeSidList));
H
hzcheng 已提交
3179

3180 3181
      pNewMetricMeta->list[i] = pBuf - (char *)pNewMetricMeta;  // offset value
      SVnodeSidList *pLists = (SVnodeSidList *)pBuf;
H
hzcheng 已提交
3182

S
slguan 已提交
3183
      tscTrace("%p metricmeta:vid:%d,numOfMeters:%d", pSql, i, pLists->numOfSids);
H
hzcheng 已提交
3184

3185
      pBuf += sizeof(SVnodeSidList) + sizeof(SMeterSidExtInfo *) * pSidLists->numOfSids;
S
slguan 已提交
3186
      rsp += sizeof(SVnodeSidList);
H
hzcheng 已提交
3187

3188
      size_t elemSize = sizeof(SMeterSidExtInfo) + pNewMetricMeta->tagLen;
S
slguan 已提交
3189
      for (int32_t j = 0; j < pSidLists->numOfSids; ++j) {
3190 3191
        pLists->pSidExtInfoList[j] = pBuf - (char *)pLists;
        memcpy(pBuf, rsp, elemSize);
3192 3193 3194 3195

        ((SMeterSidExtInfo *)pBuf)->uid = htobe64(((SMeterSidExtInfo *)pBuf)->uid);
        ((SMeterSidExtInfo *)pBuf)->sid = htonl(((SMeterSidExtInfo *)pBuf)->sid);

3196 3197
        rsp += elemSize;
        pBuf += elemSize;
S
slguan 已提交
3198
      }
H
hzcheng 已提交
3199
    }
S
slguan 已提交
3200

3201
    sizes[k] = pBuf - (char *)pNewMetricMeta;
H
hzcheng 已提交
3202 3203
  }

3204
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
S
slguan 已提交
3205 3206 3207
  for (int32_t i = 0; i < num; ++i) {
    char name[TSDB_MAX_TAGS_LEN + 1] = {0};

3208 3209
    SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
    tscGetMetricMetaCacheKey(&pSql->cmd, 0, name, pMeterMetaInfo->pMeterMeta->uid);
H
hzcheng 已提交
3210

S
slguan 已提交
3211 3212 3213
#ifdef _DEBUG_VIEW
    printf("generate the metric key:%s, index:%d\n", name, i);
#endif
H
hzcheng 已提交
3214

S
slguan 已提交
3215 3216
    // release the used metricmeta
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
H
hzcheng 已提交
3217

S
slguan 已提交
3218 3219 3220 3221 3222 3223 3224 3225 3226
    pMeterMetaInfo->pMetricMeta = (SMetricMeta *)taosAddDataIntoCache(tscCacheHandle, name, (char *)metricMetaList[i],
                                                                      sizes[i], tsMetricMetaKeepTimer);
    tfree(metricMetaList[i]);

    // failed to put into cache
    if (pMeterMetaInfo->pMetricMeta == NULL) {
      pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      goto _error_clean;
    }
H
hzcheng 已提交
3227 3228
  }

S
slguan 已提交
3229 3230 3231 3232 3233 3234 3235 3236 3237 3238
_error_clean:
  // free allocated resource
  for (int32_t i = 0; i < num; ++i) {
    tfree(metricMetaList[i]);
  }

  free(sizes);
  free(metricMetaList);

  return pSql->res.code;
H
hzcheng 已提交
3239 3240 3241 3242 3243 3244 3245 3246 3247 3248 3249
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
  SMeterMeta * pMeta;
  SShowRspMsg *pShow;
  SSchema *    pSchema;
  char         key[20];

3250 3251 3252 3253 3254
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);  //?

3255
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
3256 3257 3258 3259

  pShow = (SShowRspMsg *)pRes->pRsp;
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
3260
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
3261 3262 3263 3264 3265 3266 3267 3268 3269 3270 3271
  pMeta = &(pShow->meterMeta);

  pMeta->numOfColumns = ntohs(pMeta->numOfColumns);

  pSchema = (SSchema *)((char *)pMeta + sizeof(SMeterMeta));
  pMeta->sid = ntohs(pMeta->sid);
  for (int i = 0; i < pMeta->numOfColumns; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

3272
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
3273 3274
  strcpy(key + 1, "showlist");

S
slguan 已提交
3275
  taosRemoveDataFromCache(tscCacheHandle, (void *)&(pMeterMetaInfo->pMeterMeta), false);
H
hzcheng 已提交
3276 3277

  int32_t size = pMeta->numOfColumns * sizeof(SSchema) + sizeof(SMeterMeta);
S
slguan 已提交
3278 3279
  pMeterMetaInfo->pMeterMeta =
      (SMeterMeta *)taosAddDataIntoCache(tscCacheHandle, key, (char *)pMeta, size, tsMeterMetaKeepTimer);
3280
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutputCols;
S
slguan 已提交
3281
  SSchema *pMeterSchema = tsGetSchema(pMeterMetaInfo->pMeterMeta);
H
hzcheng 已提交
3282

3283
  tscColumnBaseInfoReserve(&pQueryInfo->colList, pMeta->numOfColumns);
S
slguan 已提交
3284 3285 3286 3287
  SColumnIndex index = {0};

  for (int16_t i = 0; i < pMeta->numOfColumns; ++i) {
    index.columnIndex = i;
3288 3289
    tscColumnBaseInfoInsert(pQueryInfo, &index);
    tscFieldInfoSetValFromSchema(&pQueryInfo->fieldsInfo, i, &pMeterSchema[i]);
H
hzcheng 已提交
3290 3291
  }

3292
  tscFieldInfoCalOffset(pQueryInfo);
H
hzcheng 已提交
3293 3294 3295 3296
  return 0;
}

int tscProcessConnectRsp(SSqlObj *pSql) {
3297
  char         temp[TSDB_METER_ID_LEN * 2];
H
hzcheng 已提交
3298 3299 3300 3301 3302 3303 3304
  SConnectRsp *pConnect;

  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

  pConnect = (SConnectRsp *)pRes->pRsp;
  strcpy(pObj->acctId, pConnect->acctId);  // copy acctId from response
3305 3306
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

H
hjxilinx 已提交
3307 3308
  assert(len <= tListLen(pObj->db));
  strncpy(pObj->db, temp, tListLen(pObj->db));
3309

S
slguan 已提交
3310
#ifdef CLUSTER
3311 3312
  SIpList *pIpList;
  char *   rsp = pRes->pRsp + sizeof(SConnectRsp);
S
slguan 已提交
3313 3314 3315 3316 3317 3318 3319 3320
  pIpList = (SIpList *)rsp;
  tscMgmtIpList.numOfIps = pIpList->numOfIps;
  for (int i = 0; i < pIpList->numOfIps; ++i) {
    tinet_ntoa(tscMgmtIpList.ipstr[i], pIpList->ip[i]);
    tscMgmtIpList.ip[i] = pIpList->ip[i];
  }

  rsp += sizeof(SIpList) + sizeof(int32_t) * pIpList->numOfIps;
H
hzcheng 已提交
3321

S
slguan 已提交
3322 3323
  tscPrintMgmtIp();
#endif
H
hzcheng 已提交
3324 3325 3326 3327 3328 3329 3330 3331 3332
  strcpy(pObj->sversion, pConnect->version);
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
3333
  STscObj *       pObj = pSql->pTscObj;
3334
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
S
slguan 已提交
3335 3336

  strcpy(pObj->db, pMeterMetaInfo->name);
H
hzcheng 已提交
3337 3338 3339 3340 3341 3342 3343 3344 3345
  return 0;
}

int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
  taosClearDataCache(tscCacheHandle);
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
3346
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
S
slguan 已提交
3347 3348

  SMeterMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name);
H
hzcheng 已提交
3349 3350 3351 3352 3353 3354 3355 3356 3357
  if (pMeterMeta == NULL) {
    /* not in cache, abort */
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
3358 3359
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
3360
   */
S
slguan 已提交
3361
  tscTrace("%p force release metermeta after drop table:%s", pSql, pMeterMetaInfo->name);
H
hzcheng 已提交
3362 3363
  taosRemoveDataFromCache(tscCacheHandle, (void **)&pMeterMeta, true);

S
slguan 已提交
3364 3365 3366
  if (pMeterMetaInfo->pMeterMeta) {
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), true);
H
hzcheng 已提交
3367 3368 3369 3370 3371 3372
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
3373
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
S
slguan 已提交
3374 3375

  SMeterMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name);
H
hzcheng 已提交
3376 3377 3378 3379
  if (pMeterMeta == NULL) { /* not in cache, abort */
    return 0;
  }

S
slguan 已提交
3380
  tscTrace("%p force release metermeta in cache after alter-table: %s", pSql, pMeterMetaInfo->name);
H
hzcheng 已提交
3381 3382
  taosRemoveDataFromCache(tscCacheHandle, (void **)&pMeterMeta, true);

S
slguan 已提交
3383
  if (pMeterMetaInfo->pMeterMeta) {
3384
    bool isSuperTable = UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo);
H
hzcheng 已提交
3385

S
slguan 已提交
3386 3387
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), true);
H
hzcheng 已提交
3388

3389
    if (isSuperTable) {  // if it is a super table, reset whole query cache
S
slguan 已提交
3390
      tscTrace("%p reset query cache since table:%s is stable", pSql, pMeterMetaInfo->name);
H
hzcheng 已提交
3391 3392 3393 3394 3395 3396 3397 3398 3399 3400 3401 3402 3403 3404 3405 3406 3407
      taosClearDataCache(tscCacheHandle);
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

  pRes->qhandle = *((uint64_t *)pRes->pRsp);
  pRes->data = NULL;
S
slguan 已提交
3408
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
3409 3410 3411 3412
  return 0;
}

int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
S
slguan 已提交
3413 3414 3415 3416
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;

H
hzcheng 已提交
3417 3418 3419 3420 3421 3422
  SRetrieveMeterRsp *pRetrieve = (SRetrieveMeterRsp *)pRes->pRsp;

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
  pRes->offset = htobe64(pRetrieve->offset);

S
slguan 已提交
3423
  pRes->useconds = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
3424
  pRes->data = pRetrieve->data;
H
hzcheng 已提交
3425

3426 3427
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
3428 3429
  pRes->row = 0;

H
hjxilinx 已提交
3430
  /**
H
hjxilinx 已提交
3431 3432
   * If the query result is exhausted, or current query is to free resource at server side,
   * the connection will be recycled.
H
hjxilinx 已提交
3433
   */
3434
  if ((pRes->numOfRows == 0 && !(tscProjectionQueryOnSTable(pCmd, 0) && pRes->offset > 0)) ||
3435
      ((pQueryInfo->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE)) {
H
hjxilinx 已提交
3436
    tscTrace("%p no result or free resource, recycle connection", pSql);
H
hzcheng 已提交
3437 3438 3439 3440 3441 3442 3443 3444 3445 3446
    taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pObj->user);
    pSql->thandle = NULL;
  } else {
    tscTrace("%p numOfRows:%d, offset:%d, not recycle connection", pSql, pRes->numOfRows, pRes->offset);
  }

  return 0;
}

int tscProcessRetrieveRspFromLocal(SSqlObj *pSql) {
3447 3448
  SSqlRes *   pRes = &pSql->res;
  SSqlCmd *   pCmd = &pSql->cmd;
3449
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
3450

H
hzcheng 已提交
3451 3452 3453 3454
  SRetrieveMeterRsp *pRetrieve = (SRetrieveMeterRsp *)pRes->pRsp;

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->data = pRetrieve->data;
3455

3456
  tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
3457 3458 3459 3460 3461 3462
  pRes->row = 0;
  return 0;
}

void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code);

3463
static int32_t doGetMeterMetaFromServer(SSqlObj *pSql, SMeterMetaInfo *pMeterMetaInfo) {
H
hzcheng 已提交
3464
  int32_t code = TSDB_CODE_SUCCESS;
3465

S
slguan 已提交
3466 3467 3468 3469 3470
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
    tscError("%p malloc failed for new sqlobj to get meter meta", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
3471

H
hzcheng 已提交
3472 3473 3474
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
3475

3476
  tscAddSubqueryInfo(&pNew->cmd);
3477 3478 3479 3480 3481

  SQueryInfo *pNewQueryInfo = NULL;
  tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo);

  pNew->cmd.createOnDemand = pSql->cmd.createOnDemand;  // create table if not exists
S
slguan 已提交
3482 3483 3484
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
    tscError("%p malloc failed for payload to get meter meta", pSql);
    free(pNew);
3485

S
slguan 已提交
3486 3487 3488
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

3489 3490
  SMeterMetaInfo *pNewMeterMetaInfo = tscAddEmptyMeterMetaInfo(pNewQueryInfo);
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
3491

3492 3493
  strcpy(pNewMeterMetaInfo->name, pMeterMetaInfo->name);
  memcpy(pNew->cmd.payload, pSql->cmd.payload, TSDB_DEFAULT_PAYLOAD_SIZE);  // tag information if table does not exists.
H
hzcheng 已提交
3494 3495 3496
  tscTrace("%p new pSqlObj:%p to get meterMeta", pSql, pNew);

  if (pSql->fp == NULL) {
S
slguan 已提交
3497 3498
    tsem_init(&pNew->rspSem, 0, 0);
    tsem_init(&pNew->emptyRspSem, 0, 1);
H
hzcheng 已提交
3499 3500

    code = tscProcessSql(pNew);
S
slguan 已提交
3501

3502 3503 3504 3505
    /*
     * Update cache only on succeeding in getting metermeta.
     * Transfer the ownership of metermeta to the new object, instead of invoking the release/acquire routine
     */
H
hzcheng 已提交
3506
    if (code == TSDB_CODE_SUCCESS) {
3507
      pMeterMetaInfo->pMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**) &pNewMeterMetaInfo->pMeterMeta);
3508
      assert(pMeterMetaInfo->pMeterMeta != NULL);
H
hzcheng 已提交
3509 3510
    }

3511
    tscTrace("%p get meter meta complete, code:%d, pMeterMeta:%p", pSql, code, pMeterMetaInfo->pMeterMeta);
H
hzcheng 已提交
3512 3513 3514 3515 3516
    tscFreeSqlObj(pNew);

  } else {
    pNew->fp = tscMeterMetaCallBack;
    pNew->param = pSql;
3517
    pNew->sqlstr = strdup(pSql->sqlstr);
H
hzcheng 已提交
3518 3519 3520 3521 3522 3523 3524 3525 3526 3527

    code = tscProcessSql(pNew);
    if (code == TSDB_CODE_SUCCESS) {
      code = TSDB_CODE_ACTION_IN_PROGRESS;
    }
  }

  return code;
}

3528 3529
int tscGetMeterMeta(SSqlObj *pSql, SMeterMetaInfo *pMeterMetaInfo) {
  assert(strlen(pMeterMetaInfo->name) != 0);
S
slguan 已提交
3530

3531 3532 3533 3534 3535
  // If this SMeterMetaInfo owns a metermeta, release it first
  if (pMeterMetaInfo->pMeterMeta != NULL) {
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), false);
  }
  
3536
  pMeterMetaInfo->pMeterMeta = (SMeterMeta *)taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name);
S
slguan 已提交
3537 3538 3539 3540 3541
  if (pMeterMetaInfo->pMeterMeta != NULL) {
    SMeterMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;

    tscTrace("%p retrieve meterMeta from cache, the number of columns:%d, numOfTags:%d", pSql, pMeterMeta->numOfColumns,
             pMeterMeta->numOfTags);
H
hzcheng 已提交
3542 3543 3544 3545 3546 3547 3548 3549

    return TSDB_CODE_SUCCESS;
  }

  /*
   * for async insert operation, release data block buffer before issue new object to get metermeta
   * because in metermeta callback function, the tscParse function will generate the submit data blocks
   */
3550
  return doGetMeterMetaFromServer(pSql, pMeterMetaInfo);
H
hzcheng 已提交
3551 3552
}

3553 3554 3555
int tscGetMeterMetaEx(SSqlObj *pSql, SMeterMetaInfo *pMeterMetaInfo, bool createIfNotExists) {
  pSql->cmd.createOnDemand = createIfNotExists;
  return tscGetMeterMeta(pSql, pMeterMetaInfo);
H
hzcheng 已提交
3556 3557 3558 3559
}

/*
 * in handling the renew metermeta problem during insertion,
S
slguan 已提交
3560
 *
H
hzcheng 已提交
3561 3562 3563 3564 3565
 * If the meter is created on demand during insertion, the routine usually waits for a short
 * period to re-issue the getMeterMeta msg, in which makes a greater change that vnode has
 * successfully created the corresponding table.
 */
static void tscWaitingForCreateTable(SSqlCmd *pCmd) {
S
slguan 已提交
3566
  if (pCmd->command == TSDB_SQL_INSERT) {
H
hzcheng 已提交
3567 3568 3569 3570 3571 3572 3573 3574 3575 3576 3577
    taosMsleep(50);  // todo: global config
  }
}

/**
 * in renew metermeta, do not retrieve metadata in cache.
 * @param pSql          sql object
 * @param meterId       meter id
 * @return              status code
 */
int tscRenewMeterMeta(SSqlObj *pSql, char *meterId) {
3578 3579
  int code = 0;

H
hzcheng 已提交
3580 3581
  // handle metric meta renew process
  SSqlCmd *pCmd = &pSql->cmd;
3582 3583

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
3584
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
3585 3586 3587 3588 3589

  // enforce the renew metermeta operation in async model
  if (pSql->fp == NULL) pSql->fp = (void *)0x1;

  /*
S
slguan 已提交
3590
   * 1. only update the metermeta in force model metricmeta is not updated
H
hzcheng 已提交
3591 3592
   * 2. if get metermeta failed, still get the metermeta
   */
S
slguan 已提交
3593 3594
  if (pMeterMetaInfo->pMeterMeta == NULL || !tscQueryOnMetric(pCmd)) {
    if (pMeterMetaInfo->pMeterMeta) {
H
hjxilinx 已提交
3595
      tscTrace("%p update meter meta, old: numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
S
slguan 已提交
3596
               pMeterMetaInfo->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid, pMeterMetaInfo->pMeterMeta);
H
hzcheng 已提交
3597
    }
3598

3599
    tscWaitingForCreateTable(pCmd);
S
slguan 已提交
3600
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
H
hzcheng 已提交
3601

3602
    code = doGetMeterMetaFromServer(pSql, pMeterMetaInfo);  // todo ??
H
hzcheng 已提交
3603
  } else {
H
hjxilinx 已提交
3604
    tscTrace("%p metric query not update metric meta, numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
S
slguan 已提交
3605 3606
             pMeterMetaInfo->pMeterMeta->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid,
             pMeterMetaInfo->pMeterMeta);
H
hzcheng 已提交
3607 3608 3609 3610 3611 3612 3613 3614 3615 3616 3617
  }

  if (code != TSDB_CODE_ACTION_IN_PROGRESS) {
    if (pSql->fp == (void *)0x1) {
      pSql->fp = NULL;
    }
  }

  return code;
}

3618
int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) {
S
slguan 已提交
3619 3620
  int      code = TSDB_CODE_NETWORK_UNAVAIL;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
3621 3622

  /*
3623
   * the query condition is serialized into pCmd->payload, we need to rebuild key for metricmeta info in cache.
H
hzcheng 已提交
3624
   */
3625
  bool    required = false;
3626

3627
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
3628
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
S
slguan 已提交
3629 3630
    char tagstr[TSDB_MAX_TAGS_LEN + 1] = {0};

3631
    SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
3632
    tscGetMetricMetaCacheKey(pCmd, clauseIndex, tagstr, pMeterMetaInfo->pMeterMeta->uid);
S
slguan 已提交
3633 3634 3635 3636 3637

    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);

    SMetricMeta *ppMeta = (SMetricMeta *)taosGetDataFromCache(tscCacheHandle, tagstr);
    if (ppMeta == NULL) {
3638
      required = true;
S
slguan 已提交
3639 3640 3641 3642 3643
      break;
    } else {
      pMeterMetaInfo->pMetricMeta = ppMeta;
    }
  }
H
hzcheng 已提交
3644

3645 3646
  // all metricmeta for one clause are retrieved from cache, no need to retrieve metricmeta from management node
  if (!required) {
H
hzcheng 已提交
3647 3648 3649
    return TSDB_CODE_SUCCESS;
  }

S
slguan 已提交
3650
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
3651 3652 3653 3654
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

  pNew->cmd.command = TSDB_SQL_METRIC;
3655 3656
  
  SQueryInfo *pNewQueryInfo = NULL;
3657 3658 3659
  if ((code = tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo)) != TSDB_CODE_SUCCESS) {
    return code;
  }
3660
  
3661
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
3662
    SMeterMetaInfo *pMMInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
3663

S
slguan 已提交
3664
    SMeterMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMMInfo->name);
3665
    tscAddMeterMetaInfo(pNewQueryInfo, pMMInfo->name, pMeterMeta, NULL, pMMInfo->numOfTags, pMMInfo->tagColumnIndex);
S
slguan 已提交
3666 3667 3668 3669 3670 3671
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
3672

3673
  tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond);
H
hzcheng 已提交
3674

3675 3676
  pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr;
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
H
hzcheng 已提交
3677

3678 3679
  pNewQueryInfo->slimit = pQueryInfo->slimit;
  pNewQueryInfo->order = pQueryInfo->order;
H
hzcheng 已提交
3680

3681 3682 3683 3684
//  if (pSql->fp != NULL && pSql->pStream == NULL) {
//    pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
//    tscFreeSubqueryInfo(pCmd);
//  }
H
hzcheng 已提交
3685 3686 3687

  tscTrace("%p allocate new pSqlObj:%p to get metricMeta", pSql, pNew);
  if (pSql->fp == NULL) {
S
slguan 已提交
3688 3689
    tsem_init(&pNew->rspSem, 0, 0);
    tsem_init(&pNew->emptyRspSem, 0, 1);
H
hzcheng 已提交
3690 3691

    code = tscProcessSql(pNew);
S
slguan 已提交
3692

3693 3694 3695 3696 3697 3698
    if (code == TSDB_CODE_SUCCESS) {//todo optimize the performance
      for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
        char tagstr[TSDB_MAX_TAGS_LEN] = {0};
    
        SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
        tscGetMetricMetaCacheKey(pCmd, 0, tagstr, pMeterMetaInfo->pMeterMeta->uid);
S
slguan 已提交
3699 3700

#ifdef _DEBUG_VIEW
3701
        printf("create metric key:%s, index:%d\n", tagstr, i);
S
slguan 已提交
3702
#endif
3703 3704 3705 3706
    
        taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
        pMeterMetaInfo->pMetricMeta = (SMetricMeta *)taosGetDataFromCache(tscCacheHandle, tagstr);
      }
S
slguan 已提交
3707
    }
3708
    
S
slguan 已提交
3709

H
hzcheng 已提交
3710 3711 3712 3713 3714 3715 3716 3717 3718 3719 3720 3721 3722 3723 3724 3725 3726 3727 3728
    tscFreeSqlObj(pNew);
  } else {
    pNew->fp = tscMeterMetaCallBack;
    pNew->param = pSql;
    code = tscProcessSql(pNew);
    if (code == TSDB_CODE_SUCCESS) {
      code = TSDB_CODE_ACTION_IN_PROGRESS;
    }
  }

  return code;
}

void tscInitMsgs() {
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildRetrieveMsg;

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
3729
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
3730

3731 3732
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
3733 3734

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
3735
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropAcctMsg;
H
hzcheng 已提交
3736 3737 3738
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
3739
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
3740 3741 3742
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
3743 3744 3745 3746 3747 3748 3749
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
  tscBuildMsg[TSDB_SQL_META] = tscBuildMeterMetaMsg;
  tscBuildMsg[TSDB_SQL_METRIC] = tscBuildMetricMetaMsg;
S
slguan 已提交
3750
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
3751 3752 3753 3754

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
3755 3756 3757
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
3758 3759 3760 3761 3762 3763 3764 3765 3766 3767

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromVnode;

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessMeterMetaRsp;
  tscProcessMsgRsp[TSDB_SQL_METRIC] = tscProcessMetricMetaRsp;
S
slguan 已提交
3768
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
3769 3770

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
3771
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromVnode;  // rsp handled by same function.
H
hzcheng 已提交
3772
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
3773

H
hzcheng 已提交
3774
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_TAGS] = tscProcessTagRetrieveRsp;
H
hjxilinx 已提交
3775 3776 3777 3778 3779
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS] = tscProcessTagRetrieveRsp;
3780

H
hzcheng 已提交
3781 3782 3783 3784 3785 3786 3787 3788 3789 3790 3791 3792 3793 3794 3795 3796
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_METRIC] = tscProcessRetrieveMetricRsp;

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;

  tscUpdateVnodeMsg[TSDB_SQL_SELECT] = tscUpdateVnodeInQueryMsg;
  tscUpdateVnodeMsg[TSDB_SQL_INSERT] = tscUpdateVnodeInSubmitMsg;
}