tscServer.c 117.7 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17 18
#include "tcache.h"
#include "trpc.h"
S
slguan 已提交
19
#include "tscJoinProcess.h"
H
hzcheng 已提交
20
#include "tscProfile.h"
21
#include "tscSQLParser.h"
H
hzcheng 已提交
22 23 24 25
#include "tscSecondaryMerge.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
S
slguan 已提交
26
#include "tscompression.h"
H
hzcheng 已提交
27 28 29 30 31 32 33
#include "tsocket.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"

#define TSC_MGMT_VNODE 999

S
slguan 已提交
34 35
int        tsMasterIndex = 0;
int        tsSlaveIndex = 1;
H
hzcheng 已提交
36

S
slguan 已提交
37
SRpcIpSet  tscMgmtIpList;
S
slguan 已提交
38 39
SRpcIpSet  tscDnodeIpSet;

40 41
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
42
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
S
slguan 已提交
43
void (*tscUpdateVnodeMsg[TSDB_SQL_MAX])(SSqlObj *pSql, char *buf);
H
hzcheng 已提交
44 45
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
46 47 48
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid);
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
49

S
slguan 已提交
50
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
hzcheng 已提交
51

S
slguan 已提交
52 53
void tscPrintMgmtIp() {
  if (tscMgmtIpList.numOfIps <= 0) {
S
slguan 已提交
54
    tscError("invalid mgmt IP list:%d", tscMgmtIpList.numOfIps);
S
slguan 已提交
55
  } else {
S
slguan 已提交
56
    for (int i = 0; i < tscMgmtIpList.numOfIps; ++i) {
S
slguan 已提交
57
      tscTrace("mgmt index:%d ip:%d", i, tscMgmtIpList.ip[i]);
S
slguan 已提交
58
    }
S
slguan 已提交
59 60 61
  }
}

S
slguan 已提交
62 63
void tscSetMgmtIpListFromCluster(SRpcIpSet *pIpList) {
  tscMgmtIpList.numOfIps = htons(pIpList->numOfIps);
S
slguan 已提交
64
  tscMgmtIpList.inUse = htons(pIpList->inUse);
S
slguan 已提交
65 66 67
  tscMgmtIpList.port = htons(pIpList->port);
  for (int32_t i = 0; i <tscMgmtIpList.numOfIps; ++i) {
    tscMgmtIpList.ip[i] = pIpList->ip[i];
S
slguan 已提交
68 69 70 71
  }
}

void tscSetMgmtIpListFromEdge() {
S
slguan 已提交
72 73
  if (tscMgmtIpList.numOfIps != 1) {
    tscMgmtIpList.numOfIps = 1;
S
slguan 已提交
74
    tscMgmtIpList.inUse = 0;
S
slguan 已提交
75
    tscMgmtIpList.port = tsMgmtShellPort;
S
slguan 已提交
76 77 78 79 80 81
    tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
    tscTrace("edge mgmt IP list:");
    tscPrintMgmtIp();
  }
}

S
slguan 已提交
82
void tscSetMgmtIpList(SRpcIpSet *pIpList) {
S
slguan 已提交
83 84 85 86 87 88 89 90
  /*
    * The iplist returned by the cluster edition is the current management nodes
    * and the iplist returned by the edge edition is empty
    */
  if (pIpList->numOfIps != 0) {
    tscSetMgmtIpListFromCluster(pIpList);
  } else {
    tscSetMgmtIpListFromEdge();
S
slguan 已提交
91 92 93
  }
}

H
hjxilinx 已提交
94 95 96 97 98 99 100
/*
 * For each management node, try twice at least in case of poor network situation.
 * If the client start to connect to a non-management node from the client, and the first retry may fail due to
 * the poor network quality. And then, the second retry get the response with redirection command.
 * The retry will not be executed since only *two* retry is allowed in case of single management node in the cluster.
 * Therefore, we need to multiply the retry times by factor of 2 to fix this problem.
 */
101
UNUSED_FUNC
H
hjxilinx 已提交
102 103 104 105 106
static int32_t tscGetMgmtConnMaxRetryTimes() {
  int32_t factor = 2;
  return tscMgmtIpList.numOfIps * factor;
}

H
hzcheng 已提交
107 108 109 110 111 112 113 114 115 116 117 118
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
  if (pObj != pObj->signature) {
    tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
    return;
  }

  SSqlObj *pSql = pObj->pHb;
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
S
slguan 已提交
119
    SHeartBeatRsp *pRsp = (SHeartBeatRsp *)pRes->pRsp;
S
slguan 已提交
120
    SRpcIpSet *      pIpList = &pRsp->ipList;
S
slguan 已提交
121
    tscSetMgmtIpList(pIpList);
S
slguan 已提交
122

H
hzcheng 已提交
123 124 125
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
    } else {
S
slguan 已提交
126 127
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
    }
  } else {
    tscTrace("heart beat failed, code:%d", code);
  }

  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;

  if (pObj == NULL) return;
  if (pObj->signature != pObj) return;
  if (pObj->pTimer != tmrId) return;

  if (pObj->pHb == NULL) {
S
slguan 已提交
144 145 146
    SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
    if (NULL == pSql) return;

H
hzcheng 已提交
147
    pSql->fp = tscProcessHeartBeatRsp;
H
hjxilinx 已提交
148
    
149 150 151 152
    SQueryInfo *pQueryInfo = NULL;
    tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
    pQueryInfo->command = TSDB_SQL_HB;
    
S
slguan 已提交
153 154 155 156 157
    if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
      tfree(pSql);
      return;
    }

H
hzcheng 已提交
158 159 160 161
    pSql->param = pObj;
    pSql->pTscObj = pObj;
    pSql->signature = pSql;
    pObj->pHb = pSql;
162
    tscAddSubqueryInfo(&pObj->pHb->cmd);
163

S
slguan 已提交
164
    tscTrace("%p pHb is allocated, pObj:%p", pObj->pHb, pObj);
H
hzcheng 已提交
165 166 167
  }

  if (tscShouldFreeHeatBeat(pObj->pHb)) {
S
slguan 已提交
168
    tscTrace("%p free HB object and release connection", pObj);
H
hzcheng 已提交
169 170 171 172 173 174 175 176 177
    tscFreeSqlObj(pObj->pHb);
    tscCloseTscObj(pObj);
    return;
  }

  tscProcessSql(pObj->pHb);
}

int tscSendMsgToServer(SSqlObj *pSql) {
S
slguan 已提交
178 179
  char *pMsg = rpcMallocCont(pSql->cmd.payloadLen);
  if (NULL == pMsg) {
S
slguan 已提交
180 181
    tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
H
hzcheng 已提交
182 183
  }

S
slguan 已提交
184
  pSql->ipList->ip[0] = inet_addr("192.168.0.1");
185 186
  SSqlCmd* pCmd = &pSql->cmd;
  
S
slguan 已提交
187
  if (pSql->cmd.command < TSDB_SQL_MGMT) {
S
slguan 已提交
188 189 190
    pSql->ipList->port = tsVnodeShellPort;
    tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
    memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen);
191 192 193 194 195 196 197 198 199

    SRpcMsg rpcMsg = {
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
      .contLen = pSql->cmd.payloadLen,
      .handle  = pSql,
      .code   = 0
    };
    rpcSendRequest(pVnodeConn, pSql->ipList, &rpcMsg);
S
slguan 已提交
200
  } else {
S
slguan 已提交
201 202 203
    pSql->ipList->port = tsMgmtShellPort;
    tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
    memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
204 205 206 207 208 209 210 211
    SRpcMsg rpcMsg = {
        .msgType = pSql->cmd.msgType,
        .pCont   = pMsg,
        .contLen = pSql->cmd.payloadLen,
        .handle  = pSql,
        .code   = 0
    };
    rpcSendRequest(pTscMgmtConn, pSql->ipList, &rpcMsg);
H
hzcheng 已提交
212 213
  }

S
slguan 已提交
214
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
215 216
}

217 218 219
void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
  tscPrint("response:%s is received, len:%d error:%s", taosMsg[rpcMsg->msgType], rpcMsg->contLen, tstrerror(rpcMsg->code));
  SSqlObj *pSql = (SSqlObj *)rpcMsg->handle;
S
slguan 已提交
220
  if (pSql == NULL || pSql->signature != pSql) {
H
hzcheng 已提交
221
    tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
S
slguan 已提交
222
    return;
H
hzcheng 已提交
223 224
  }

S
slguan 已提交
225 226 227
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;
228
  tscTrace("%p msg:%p is received from server", pSql, rpcMsg->pCont);
H
hzcheng 已提交
229 230

  if (pSql->freed || pObj->signature != pObj) {
S
slguan 已提交
231 232
    tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed,
             pObj, pObj->signature);
H
hzcheng 已提交
233
    tscFreeSqlObj(pSql);
234
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
235
    return;
H
hzcheng 已提交
236 237
  }

238 239
  if (rpcMsg->pCont == NULL) {
    rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
S
slguan 已提交
240 241
  } else {
    SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
242 243 244 245
    if (rpcMsg->code == TSDB_CODE_NOT_ACTIVE_TABLE || rpcMsg->code == TSDB_CODE_INVALID_TABLE_ID ||
        rpcMsg->code == TSDB_CODE_INVALID_VNODE_ID || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_VNODE ||
        rpcMsg->code == TSDB_CODE_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_SESSION ||
        rpcMsg->code == TSDB_CODE_TABLE_ID_MISMATCH) {
S
slguan 已提交
246 247 248 249 250 251 252 253 254 255 256
      /*
       * not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized,
       *                   the virtual node may have not create table till now, so try again by using the new metermeta.
       *                   2. this requested table may have been removed by other client, so we need to renew the
       *                   metermeta here.
       *
       * not_active_vnode: current vnode is move to other node due to node balance procedure or virtual node have been
       *                   removed. So, renew metermeta and try again.
       * not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore.
       */
      if (pCmd->command == TSDB_SQL_CONNECT) {
257 258
        rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
        rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
259 260
        return;
      } else if (pCmd->command == TSDB_SQL_HB) {
261 262
        rpcMsg->code = TSDB_CODE_NOT_READY;
        rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
263 264
        return;
      } else {
265
        tscTrace("%p it shall renew meter meta, code:%d", pSql, rpcMsg->code);
H
hzcheng 已提交
266

S
slguan 已提交
267
        pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
268
        pSql->res.code = rpcMsg->code;  // keep the previous error code
S
slguan 已提交
269

270
        rpcMsg->code = tscRenewMeterMeta(pSql, pMeterMetaInfo->name);
H
hzcheng 已提交
271

S
slguan 已提交
272 273
        if (pMeterMetaInfo->pMeterMeta) {
          tscSendMsgToServer(pSql);
274
          rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
275 276
          return;
        }
H
hzcheng 已提交
277 278
      }
    }
S
slguan 已提交
279
  }
H
hzcheng 已提交
280 281 282

  pSql->retry = 0;

S
slguan 已提交
283
  if (pSql->fp == NULL) tsem_wait(&pSql->emptyRspSem);
H
hzcheng 已提交
284 285 286

  pRes->rspLen = 0;
  if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
287
    pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_NETWORK_UNAVAIL;
H
hzcheng 已提交
288 289 290 291
  } else {
    tscTrace("%p query is cancelled, code:%d", pSql, pRes->code);
  }

S
slguan 已提交
292
  if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
293 294 295 296
    assert(rpcMsg->msgType == pCmd->msgType + 1);
    pRes->code = (int32_t)rpcMsg->code;
    pRes->rspType = rpcMsg->msgType;
    pRes->rspLen = rpcMsg->contLen;
H
hzcheng 已提交
297

S
slguan 已提交
298 299 300 301 302 303
    char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
    if (tmp == NULL) {
      pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
    } else {
      pRes->pRsp = tmp;
      if (pRes->rspLen) {
304
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
305 306 307 308
      }
    }

    // ignore the error information returned from mnode when set ignore flag in sql
H
hzcheng 已提交
309 310 311 312 313 314 315 316
    if (pRes->code == TSDB_CODE_DB_ALREADY_EXIST && pCmd->existsCheck && pRes->rspType == TSDB_MSG_TYPE_CREATE_DB_RSP) {
      pRes->code = TSDB_CODE_SUCCESS;
    }

    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
317 318
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP) {
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
319 320 321 322 323 324 325
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
S
slguan 已提交
326 327 328 329
      tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code,
               *(int32_t *)pRes->pRsp, pRes->rspLen);
    } else {
      tscTrace("%p cmd:%d code:%d rsp len:%d", pSql, pCmd->command, pRes->code, pRes->rspLen);
H
hzcheng 已提交
330 331 332 333
    }
  }

  if (pSql->fp == NULL) {
S
slguan 已提交
334
    tsem_post(&pSql->rspSem);
H
hzcheng 已提交
335 336
  } else {
    if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command])
337
      rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
hzcheng 已提交
338

339
    if (rpcMsg->code != TSDB_CODE_ACTION_IN_PROGRESS) {
H
hzcheng 已提交
340 341
      int   command = pCmd->command;
      void *taosres = tscKeepConn[command] ? pSql : NULL;
342
      rpcMsg->code = pRes->code ? -pRes->code : pRes->numOfRows;
H
hzcheng 已提交
343

344
      tscTrace("%p Async SQL result:%d res:%p", pSql, rpcMsg->code, taosres);
H
hzcheng 已提交
345 346

      /*
S
slguan 已提交
347 348 349
       * Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj
       * may be freed in UDF, and reused by other threads before tscShouldFreeAsyncSqlObj called, in which case
       * tscShouldFreeAsyncSqlObj checks an object which is actually allocated by other threads.
H
hzcheng 已提交
350 351 352 353 354 355
       *
       * If this block of memory is re-allocated for an insert thread, in which tscKeepConn[command] equals to 0,
       * the tscShouldFreeAsyncSqlObj will success and tscFreeSqlObj free it immediately.
       */
      bool shouldFree = tscShouldFreeAsyncSqlObj(pSql);
      if (command == TSDB_SQL_INSERT) {  // handle multi-vnode insertion situation
356
        (*pSql->fp)(pSql, taosres, rpcMsg->code);
H
hzcheng 已提交
357
      } else {
358
        (*pSql->fp)(pSql->param, taosres, rpcMsg->code);
H
hzcheng 已提交
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373
      }

      if (shouldFree) {
        // If it is failed, all objects allocated during execution taos_connect_a should be released
        if (command == TSDB_SQL_CONNECT) {
          taos_close(pObj);
          tscTrace("%p Async sql close failed connection", pSql);
        } else {
          tscFreeSqlObj(pSql);
          tscTrace("%p Async sql is automatically freed", pSql);
        }
      }
    }
  }

374
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
375 376
}

S
slguan 已提交
377
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
H
hjxilinx 已提交
378
static int      tscLaunchSTableSubqueries(SSqlObj *pSql);
H
hzcheng 已提交
379

S
slguan 已提交
380
// todo merge with callback
H
hjxilinx 已提交
381
int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySupporter *pSupporter) {
382
  SSqlCmd *   pCmd = &pSql->cmd;
H
hjxilinx 已提交
383
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
384

S
slguan 已提交
385 386 387 388
  pSql->res.qhandle = 0x1;
  pSql->res.numOfRows = 0;

  if (pSql->pSubs == NULL) {
H
hjxilinx 已提交
389
    pSql->pSubs = calloc(pSupporter->pState->numOfTotal, POINTER_BYTES);
S
slguan 已提交
390 391 392 393 394
    if (pSql->pSubs == NULL) {
      return TSDB_CODE_CLI_OUT_OF_MEMORY;
    }
  }

H
hjxilinx 已提交
395
  SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, NULL);
S
slguan 已提交
396 397 398
  if (pNew == NULL) {
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
399

S
slguan 已提交
400
  pSql->pSubs[pSql->numOfSubs++] = pNew;
H
hjxilinx 已提交
401
  assert(pSql->numOfSubs <= pSupporter->pState->numOfTotal);
H
hzcheng 已提交
402

403 404
  if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
    addGroupInfoForSubquery(pSql, pNew, 0, tableIndex);
S
slguan 已提交
405 406

    // refactor as one method
407
    SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
408
    assert(pNewQueryInfo != NULL);
409

410 411
    tscColumnBaseInfoUpdateTableIndex(&pNewQueryInfo->colList, 0);
    tscColumnBaseInfoCopy(&pSupporter->colList, &pNewQueryInfo->colList, 0);
S
slguan 已提交
412

H
hjxilinx 已提交
413
    tscSqlExprCopy(&pSupporter->exprsInfo, &pNewQueryInfo->exprsInfo, pSupporter->uid, false);
414
    tscFieldInfoCopyAll(&pSupporter->fieldsInfo, &pNewQueryInfo->fieldsInfo);
H
hjxilinx 已提交
415
    
416
    tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond);
S
slguan 已提交
417 418

    pNew->cmd.numOfCols = 0;
419
    pNewQueryInfo->intervalTime = 0;
420
    memset(&pNewQueryInfo->limit, 0, sizeof(SLimitVal));
421

422 423
    // backup the data and clear it in the sqlcmd object
    pSupporter->groupbyExpr = pNewQueryInfo->groupbyExpr;
424
    memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));
S
slguan 已提交
425

H
hjxilinx 已提交
426 427 428 429
    // this data needs to be transfer to support struct
    pNewQueryInfo->fieldsInfo.numOfOutputCols = 0;
    pNewQueryInfo->exprsInfo.numOfExprs = 0;
    
S
slguan 已提交
430
    // set the ts,tags that involved in join, as the output column of intermediate result
431
    tscClearSubqueryInfo(&pNew->cmd);
432

S
slguan 已提交
433
    SSchema      colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
S
slguan 已提交
434 435
    SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};

436
    tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL);
S
slguan 已提交
437 438

    // set the tags value for ts_comp function
439
    SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0);
S
slguan 已提交
440

441
    SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pNewQueryInfo, 0);
H
hjxilinx 已提交
442
    int16_t         tagColIndex = tscGetJoinTagColIndexByUid(&pSupporter->tagCond, pMeterMetaInfo->pMeterMeta->uid);
S
slguan 已提交
443 444 445 446 447 448 449 450

    pExpr->param->i64Key = tagColIndex;
    pExpr->numOfParams = 1;

    // add the filter tag column
    for (int32_t i = 0; i < pSupporter->colList.numOfCols; ++i) {
      SColumnBase *pColBase = &pSupporter->colList.pColList[i];
      if (pColBase->numOfFilters > 0) {  // copy to the pNew->cmd.colList if it is filtered.
451 452
        tscColumnBaseCopy(&pNewQueryInfo->colList.pColList[pNewQueryInfo->colList.numOfCols], pColBase);
        pNewQueryInfo->colList.numOfCols++;
S
slguan 已提交
453 454
      }
    }
455 456 457 458 459 460
  
    tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
             "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
             pSql, pNew, tableIndex, pMeterMetaInfo->vnodeIndex, pNewQueryInfo->type,
             pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
             pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pMeterInfo[0]->name);
H
hjxilinx 已提交
461
    tscPrintSelectClause(pNew, 0);
H
hjxilinx 已提交
462 463 464 465 466 467 468
  
    tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
             "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
             pSql, pNew, tableIndex, pMeterMetaInfo->vnodeIndex, pNewQueryInfo->type,
             pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
             pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pMeterInfo[0]->name);
    tscPrintSelectClause(pNew, 0);
S
slguan 已提交
469
  } else {
470
    SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
471
    pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
S
slguan 已提交
472
  }
473

H
hjxilinx 已提交
474
#ifdef _DEBUG_VIEW
H
hjxilinx 已提交
475
  tscPrintSelectClause(pNew, 0);
H
hjxilinx 已提交
476
#endif
477
  
S
slguan 已提交
478 479 480 481 482 483 484 485
  return tscProcessSql(pNew);
}

int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

  void *asyncFp = pSql->fp;
486 487 488 489
  if (pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_CONNECT || pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META || pCmd->command == TSDB_SQL_METRIC) {
    tscBuildMsg[pCmd->command](pSql, NULL);
S
slguan 已提交
490
  }
491 492 493

  int32_t code = tscSendMsgToServer(pSql);

S
slguan 已提交
494
  if (asyncFp) {
495
    if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
496 497 498 499 500 501
      pRes->code = code;
      tscQueueAsyncRes(pSql);
    }
    return 0;
  }

502
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
503 504 505 506 507 508
    pRes->code = code;
    return code;
  }

  tsem_wait(&pSql->rspSem);

509
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) (*tscProcessMsgRsp[pCmd->command])(pSql);
S
slguan 已提交
510 511 512 513 514 515 516

  tsem_post(&pSql->emptyRspSem);

  return pRes->code;
}

int tscProcessSql(SSqlObj *pSql) {
517 518 519
  char *   name = NULL;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
520 521
  
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
522 523 524
  SMeterMetaInfo *pMeterMetaInfo = NULL;
  int16_t         type = 0;

525 526 527 528 529
  if (pQueryInfo != NULL) {
    pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
    if (pMeterMetaInfo != NULL) {
      name = pMeterMetaInfo->name;
    }
530

531
    type = pQueryInfo->type;
532 533 534
  
    // for hearbeat, numOfTables == 0;
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
535
  }
536

537
  tscTrace("%p SQL cmd:%d will be processed, name:%s, type:%d", pSql, pCmd->command, name, type);
H
hzcheng 已提交
538
  if (pSql->cmd.command < TSDB_SQL_MGMT) {
H
hjxilinx 已提交
539 540 541 542 543
    // the pMeterMetaInfo cannot be NULL
    if (pMeterMetaInfo == NULL) {
      pSql->res.code = TSDB_CODE_OTHERS;
      return pSql->res.code;
    }
544

S
slguan 已提交
545 546
    // temp
    pSql->ipList = &tscMgmtIpList;
S
slguan 已提交
547 548 549 550 551 552 553 554 555 556
//    if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
//      pSql->index = pMeterMetaInfo->pMeterMeta->index;
//    } else {  // it must be the parent SSqlObj for super table query
//      if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) != 0) {
//        int32_t idx = pMeterMetaInfo->vnodeIndex;
//
//        SVnodeSidList *pSidList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx);
//        pSql->index = pSidList->index;
//      }
//    }
H
hzcheng 已提交
557
  } else if (pSql->cmd.command < TSDB_SQL_LOCAL) {
S
slguan 已提交
558
    pSql->ipList = &tscMgmtIpList;
H
hzcheng 已提交
559 560 561 562
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }

S
slguan 已提交
563
  // todo handle async situation
564 565
  if (QUERY_IS_JOIN_QUERY(type)) {
    if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) {
S
slguan 已提交
566
      SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
567

568
      pState->numOfTotal = pQueryInfo->numOfTables;
S
slguan 已提交
569

570
      for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
S
slguan 已提交
571 572 573 574
        SJoinSubquerySupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i);

        if (pSupporter == NULL) {  // failed to create support struct, abort current query
          tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i);
575
          pState->numOfCompleted = pQueryInfo->numOfTables - i - 1;
S
slguan 已提交
576 577 578 579 580
          pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;

          return pSql->res.code;
        }

H
hjxilinx 已提交
581
        int32_t code = tscLaunchJoinSubquery(pSql, i, pSupporter);
S
slguan 已提交
582 583 584 585 586 587 588 589
        if (code != TSDB_CODE_SUCCESS) {  // failed to create subquery object, quit query
          tscDestroyJoinSupporter(pSupporter);
          pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;

          break;
        }
      }

S
slguan 已提交
590 591
      tsem_post(&pSql->emptyRspSem);
      tsem_wait(&pSql->rspSem);
S
slguan 已提交
592

S
slguan 已提交
593
      tsem_post(&pSql->emptyRspSem);
S
slguan 已提交
594 595 596 597 598 599 600 601 602 603

      if (pSql->numOfSubs <= 0) {
        pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
      } else {
        pSql->cmd.command = TSDB_SQL_METRIC_JOIN_RETRIEVE;
      }

      return TSDB_CODE_SUCCESS;
    } else {
      // for first stage sub query, iterate all vnodes to get all timestamp
604
      if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
S
slguan 已提交
605 606 607 608
        return doProcessSql(pSql);
      }
    }
  }
H
hzcheng 已提交
609

610
  if (tscIsTwoStageMergeMetricQuery(pQueryInfo, 0)) {
H
hzcheng 已提交
611 612
    /*
     * (ref. line: 964)
H
hjxilinx 已提交
613
     * Before this function returns from tscLaunchSTableSubqueries and continues, pSql may have been released at user
H
hzcheng 已提交
614 615 616 617 618 619 620
     * program context after retrieving all data from vnodes. User function is called at tscRetrieveFromVnodeCallBack.
     *
     * when pSql being released, pSql->fp == NULL, it may pass the check of pSql->fp == NULL,
     * which causes deadlock. So we keep it as local variable.
     */
    void *fp = pSql->fp;

H
hjxilinx 已提交
621
    if (tscLaunchSTableSubqueries(pSql) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
622 623 624 625
      return pRes->code;
    }

    if (fp == NULL) {
S
slguan 已提交
626 627 628
      tsem_post(&pSql->emptyRspSem);
      tsem_wait(&pSql->rspSem);
      tsem_post(&pSql->emptyRspSem);
H
hzcheng 已提交
629 630 631 632 633 634 635 636

      // set the command flag must be after the semaphore been correctly set.
      pSql->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
    }

    return pSql->res.code;
  }

S
slguan 已提交
637 638
  return doProcessSql(pSql);
}
H
hzcheng 已提交
639

640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656
static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState* pState) {
  assert(numOfSubs <= pSql->numOfSubs && numOfSubs >= 0 && pState != NULL);
  
  for(int32_t i = 0; i < numOfSubs; ++i) {
    SSqlObj* pSub = pSql->pSubs[i];
    assert(pSub != NULL);
    
    SRetrieveSupport* pSupport = pSub->param;
  
    tfree(pSupport->localBuffer);
  
    pthread_mutex_unlock(&pSupport->queryMutex);
    pthread_mutex_destroy(&pSupport->queryMutex);
  
    tfree(pSupport);
  
    tscFreeSqlObj(pSub);
S
slguan 已提交
657
  }
658 659
  
  free(pState);
H
hzcheng 已提交
660 661
}

H
hjxilinx 已提交
662
int tscLaunchSTableSubqueries(SSqlObj *pSql) {
H
hzcheng 已提交
663
  SSqlRes *pRes = &pSql->res;
664
  SSqlCmd *pCmd = &pSql->cmd;
665

S
slguan 已提交
666
  // pRes->code check only serves in launching metric sub-queries
H
hzcheng 已提交
667
  if (pRes->code == TSDB_CODE_QUERY_CANCELLED) {
668 669
    pCmd->command = TSDB_SQL_RETRIEVE_METRIC;  // enable the abort of kill metric function.
    return pRes->code;
H
hzcheng 已提交
670 671 672 673
  }

  tExtMemBuffer **  pMemoryBuf = NULL;
  tOrderDescriptor *pDesc = NULL;
H
hjxilinx 已提交
674
  SColumnModel *    pModel = NULL;
H
hzcheng 已提交
675 676

  pRes->qhandle = 1;  // hack the qhandle check
677 678 679

  const uint32_t nBufferSize = (1 << 16);  // 64KB

680
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
681
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
682 683
  int32_t         numOfSubQueries = pMeterMetaInfo->pMetricMeta->numOfVnodes;
  assert(numOfSubQueries > 0);
H
hzcheng 已提交
684 685 686 687 688 689 690 691 692 693

  int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize);
  if (ret != 0) {
    pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
    if (pSql->fp) {
      tscQueueAsyncRes(pSql);
    }
    return pRes->code;
  }

694 695
  pSql->pSubs = calloc(numOfSubQueries, POINTER_BYTES);
  pSql->numOfSubs = numOfSubQueries;
696

697
  tscTrace("%p retrieved query data from %d vnode(s)", pSql, numOfSubQueries);
S
slguan 已提交
698
  SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
699
  pState->numOfTotal = numOfSubQueries;
H
hzcheng 已提交
700 701
  pRes->code = TSDB_CODE_SUCCESS;

702 703 704 705 706
  int32_t i = 0;
  for (; i < numOfSubQueries; ++i) {
    SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport));
    if (trs == NULL) {
      tscError("%p failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
H
hzcheng 已提交
707 708
      break;
    }
709
    
H
hzcheng 已提交
710 711
    trs->pExtMemBuffer = pMemoryBuf;
    trs->pOrderDescriptor = pDesc;
S
slguan 已提交
712
    trs->pState = pState;
713
    
H
hzcheng 已提交
714
    trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage));
715 716 717 718 719 720
    if (trs->localBuffer == NULL) {
      tscError("%p failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
      tfree(trs);
      break;
    }
    
H
hjxilinx 已提交
721
    trs->subqueryIndex = i;
H
hzcheng 已提交
722 723 724 725 726 727 728 729
    trs->pParentSqlObj = pSql;
    trs->pFinalColModel = pModel;

    pthread_mutexattr_t mutexattr = {0};
    pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE_NP);
    pthread_mutex_init(&trs->queryMutex, &mutexattr);
    pthread_mutexattr_destroy(&mutexattr);

730
    SSqlObj *pNew = tscCreateSqlObjForSubquery(pSql, trs, NULL);
S
slguan 已提交
731
    if (pNew == NULL) {
732 733 734
      tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
      tfree(trs->localBuffer);
      tfree(trs);
S
slguan 已提交
735 736 737 738
      break;
    }

    // todo handle multi-vnode situation
739
    if (pQueryInfo->tsBuf) {
740
      SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
741
      pNewQueryInfo->tsBuf = tsBufClone(pQueryInfo->tsBuf);
S
slguan 已提交
742
    }
743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767
    
    tscTrace("%p sub:%p create subquery success. orderOfSub:%d", pSql, pNew, trs->subqueryIndex);
  }
  
  if (i < numOfSubQueries) {
    tscError("%p failed to prepare subquery structure and launch subqueries", pSql);
    pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
  
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, numOfSubQueries);
    doCleanupSubqueries(pSql, i, pState);
    return pRes->code;   // free all allocated resource
  }
  
  if (pRes->code == TSDB_CODE_QUERY_CANCELLED) {
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, numOfSubQueries);
    doCleanupSubqueries(pSql, i, pState);
    return pRes->code;
  }
  
  for(int32_t j = 0; j < numOfSubQueries; ++j) {
    SSqlObj* pSub = pSql->pSubs[j];
    SRetrieveSupport* pSupport = pSub->param;
    
    tscTrace("%p sub:%p launch subquery, orderOfSub:%d.", pSql, pSub, pSupport->subqueryIndex);
    tscProcessSql(pSub);
H
hzcheng 已提交
768 769 770 771 772 773 774 775 776 777 778
  }

  return TSDB_CODE_SUCCESS;
}

static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) {
  tscTrace("%p start to free subquery result", pSql);

  if (pSql->res.code == TSDB_CODE_SUCCESS) {
    taos_free_result(pSql);
  }
S
slguan 已提交
779

H
hzcheng 已提交
780 781 782 783 784 785 786 787
  tfree(trsupport->localBuffer);

  pthread_mutex_unlock(&trsupport->queryMutex);
  pthread_mutex_destroy(&trsupport->queryMutex);

  tfree(trsupport);
}

S
slguan 已提交
788 789
static void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows);

H
hzcheng 已提交
790
static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t errCode) {
S
slguan 已提交
791 792 793 794 795 796 797 798 799
// set no disk space error info
#ifdef WINDOWS
  LPVOID lpMsgBuf;
  FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL,
                GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),  // Default language
                (LPTSTR)&lpMsgBuf, 0, NULL);
  tscError("sub:%p failed to flush data to disk:reason:%s", tres, lpMsgBuf);
  LocalFree(lpMsgBuf);
#else
H
hzcheng 已提交
800 801 802
  char buf[256] = {0};
  strerror_r(errno, buf, 256);
  tscError("sub:%p failed to flush data to disk:reason:%s", tres, buf);
S
slguan 已提交
803
#endif
H
hzcheng 已提交
804

S
slguan 已提交
805
  trsupport->pState->code = -errCode;
H
hzcheng 已提交
806 807 808 809
  trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;

  pthread_mutex_unlock(&trsupport->queryMutex);

S
slguan 已提交
810
  tscRetrieveFromVnodeCallBack(trsupport, tres, trsupport->pState->code);
H
hzcheng 已提交
811 812 813 814
}

static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
  SSqlObj *pPObj = trsupport->pParentSqlObj;
H
hjxilinx 已提交
815
  int32_t  subqueryIndex = trsupport->subqueryIndex;
H
hzcheng 已提交
816 817

  assert(pSql != NULL);
818 819 820
  SSubqueryState* pState = trsupport->pState;
  assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
         pPObj->numOfSubs == pState->numOfTotal);
H
hzcheng 已提交
821 822

  /* retrieved in subquery failed. OR query cancelled in retrieve phase. */
823 824
  if (pState->code == TSDB_CODE_SUCCESS && pPObj->res.code != TSDB_CODE_SUCCESS) {
    pState->code = -(int)pPObj->res.code;
H
hzcheng 已提交
825 826 827 828 829 830 831 832

    /*
     * kill current sub-query connection, which may retrieve data from vnodes;
     * Here we get: pPObj->res.code == TSDB_CODE_QUERY_CANCELLED
     */
    pSql->res.numOfRows = 0;
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;  // disable retry efforts
    tscTrace("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%d", trsupport->pParentSqlObj, pSql,
833
             subqueryIndex, pState->code);
H
hzcheng 已提交
834 835
  }

S
slguan 已提交
836
  if (numOfRows >= 0) {  // current query is successful, but other sub query failed, still abort current query.
H
hjxilinx 已提交
837 838
    tscTrace("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pPObj, pSql, numOfRows, subqueryIndex);
    tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pPObj, pSql,
839
        subqueryIndex, pState->code);
H
hzcheng 已提交
840
  } else {
841
    if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pState->code == TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
842
      /*
S
slguan 已提交
843 844
       * current query failed, and the retry count is less than the available
       * count, retry query clear previous retrieved data, then launch a new sub query
H
hzcheng 已提交
845
       */
H
hjxilinx 已提交
846
      tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]);
H
hzcheng 已提交
847 848 849 850 851

      // clear local saved number of results
      trsupport->localBuffer->numOfElems = 0;
      pthread_mutex_unlock(&trsupport->queryMutex);

S
slguan 已提交
852
      tscTrace("%p sub:%p retrieve failed, code:%d, orderOfSub:%d, retry:%d", trsupport->pParentSqlObj, pSql, numOfRows,
H
hjxilinx 已提交
853
               subqueryIndex, trsupport->numOfRetry);
S
slguan 已提交
854

855
      SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql);
S
slguan 已提交
856 857 858 859
      if (pNew == NULL) {
        tscError("%p sub:%p failed to create new subquery sqlobj due to out of memory, abort retry",
                 trsupport->pParentSqlObj, pSql);

860
        pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
861 862 863
        trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
        return;
      }
H
hzcheng 已提交
864 865 866

      tscProcessSql(pNew);
      return;
S
slguan 已提交
867
    } else {  // reach the maximum retry count, abort
868
      atomic_val_compare_exchange_32(&pState->code, TSDB_CODE_SUCCESS, numOfRows);
S
slguan 已提交
869
      tscError("%p sub:%p retrieve failed,code:%d,orderOfSub:%d failed.no more retry,set global code:%d", pPObj, pSql,
870
               numOfRows, subqueryIndex, pState->code);
H
hzcheng 已提交
871 872 873
    }
  }

H
Hongze Cheng 已提交
874 875
  int32_t numOfTotal = pState->numOfTotal;

876
  int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1);
H
Hongze Cheng 已提交
877
  if (finished < numOfTotal) {
878
    tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished);
H
hzcheng 已提交
879 880 881 882
    return tscFreeSubSqlObj(trsupport, pSql);
  }

  // all subqueries are failed
H
hjxilinx 已提交
883
  tscError("%p retrieve from %d vnode(s) completed,code:%d.FAILED.", pPObj, pState->numOfTotal, pState->code);
884
  pPObj->res.code = -(pState->code);
H
hzcheng 已提交
885 886 887

  // release allocated resource
  tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel,
888
                            pState->numOfTotal);
H
hzcheng 已提交
889

S
slguan 已提交
890
  tfree(trsupport->pState);
H
hzcheng 已提交
891 892
  tscFreeSubSqlObj(trsupport, pSql);

S
slguan 已提交
893
  // sync query, wait for the master SSqlObj to proceed
H
hzcheng 已提交
894 895
  if (pPObj->fp == NULL) {
    // sync query, wait for the master SSqlObj to proceed
S
slguan 已提交
896 897
    tsem_wait(&pPObj->emptyRspSem);
    tsem_wait(&pPObj->emptyRspSem);
H
hzcheng 已提交
898

S
slguan 已提交
899
    tsem_post(&pPObj->rspSem);
H
hzcheng 已提交
900 901 902

    pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
  } else {
S
slguan 已提交
903
    // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
904 905
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);

906
    if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) == TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
S
slguan 已提交
907 908 909 910 911
      (*pPObj->fp)(pPObj->param, pPObj, pPObj->res.code);
    } else {  // regular super table query
      if (pPObj->res.code != TSDB_CODE_SUCCESS) {
        tscQueueAsyncRes(pPObj);
      }
H
hzcheng 已提交
912 913 914 915 916 917
    }
  }
}

void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
  SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
H
hjxilinx 已提交
918
  int32_t           idx = trsupport->subqueryIndex;
H
hzcheng 已提交
919 920 921 922
  SSqlObj *         pPObj = trsupport->pParentSqlObj;
  tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;

  SSqlObj *pSql = (SSqlObj *)tres;
923
  if (pSql == NULL) {  // sql object has been released in error process, return immediately
H
hzcheng 已提交
924 925 926 927
    tscTrace("%p subquery has been released, idx:%d, abort", pPObj, idx);
    return;
  }

928 929 930 931
  SSubqueryState* pState = trsupport->pState;
  assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
      pPObj->numOfSubs == pState->numOfTotal);
  
H
hzcheng 已提交
932 933 934
  // query process and cancel query process may execute at the same time
  pthread_mutex_lock(&trsupport->queryMutex);

935
  if (numOfRows < 0 || pState->code < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
936 937 938
    return tscHandleSubRetrievalError(trsupport, pSql, numOfRows);
  }

939 940 941
  SSqlRes *   pRes = &pSql->res;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

942
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
943

S
slguan 已提交
944
  SVnodeSidList *vnodeInfo = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx);
H
hzcheng 已提交
945 946 947 948
  SVPeerDesc *   pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];

  if (numOfRows > 0) {
    assert(pRes->numOfRows == numOfRows);
949
    int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows);
H
hzcheng 已提交
950

S
slguan 已提交
951
    tscTrace("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%d from ip:%u,vid:%d,orderOfSub:%d", pPObj, pSql,
952
             pRes->numOfRows, pState->numOfRetrievedRows, pSvd->ip, pSvd->vnode, idx);
953
    
H
hjxilinx 已提交
954
    if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
955 956 957 958 959 960
      tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId64 " , current:%" PRId64,
          pPObj, pSql, tsMaxNumOfOrderedResults, num);
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_SORTED_RES_TOO_MANY);
      return;
    }
    
H
hzcheng 已提交
961 962 963 964

#ifdef _DEBUG_VIEW
    printf("received data from vnode: %d rows\n", pRes->numOfRows);
    SSrcColumnInfo colInfo[256] = {0};
965 966

    tscGetSrcColumnInfo(colInfo, pQueryInfo);
H
hjxilinx 已提交
967
    tColModelDisplayEx(pDesc->pColumnModel, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo);
H
hzcheng 已提交
968
#endif
S
slguan 已提交
969
    if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) {
S
slguan 已提交
970 971
      tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql,
               tsAvailTmpDirGB, tsMinimalTmpDirGB);
S
slguan 已提交
972 973 974
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
      return;
    }
975
    
S
slguan 已提交
976
    int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data,
977
                               pRes->numOfRows, pQueryInfo->groupbyExpr.orderType);
H
hzcheng 已提交
978 979 980 981 982 983 984 985
    if (ret < 0) {
      // set no disk space error info, and abort retry
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
    } else {
      pthread_mutex_unlock(&trsupport->queryMutex);
      taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param);
    }

S
slguan 已提交
986 987
  } else {  // all data has been retrieved to client
    /* data in from current vnode is stored in cache and disk */
H
hjxilinx 已提交
988
    uint32_t numOfRowsFromVnode = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems;
S
slguan 已提交
989 990
    tscTrace("%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, pSvd->ip,
             pSvd->vnode, numOfRowsFromVnode, idx);
H
hzcheng 已提交
991

H
hjxilinx 已提交
992
    tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity);
H
hzcheng 已提交
993 994

#ifdef _DEBUG_VIEW
L
lihui 已提交
995
    printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->numOfElems);
H
hzcheng 已提交
996
    SSrcColumnInfo colInfo[256] = {0};
997
    tscGetSrcColumnInfo(colInfo, pQueryInfo);
H
hjxilinx 已提交
998
    tColModelDisplayEx(pDesc->pColumnModel, trsupport->localBuffer->data, trsupport->localBuffer->numOfElems,
H
hzcheng 已提交
999 1000
                       trsupport->localBuffer->numOfElems, colInfo);
#endif
1001
    
S
slguan 已提交
1002
    if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) {
S
slguan 已提交
1003 1004
      tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql,
               tsAvailTmpDirGB, tsMinimalTmpDirGB);
S
slguan 已提交
1005 1006 1007
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
      return;
    }
H
hzcheng 已提交
1008 1009 1010

    // each result for a vnode is ordered as an independant list,
    // then used as an input of loser tree for disk-based merge routine
1011 1012
    int32_t ret = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer,
                                    pQueryInfo->groupbyExpr.orderType);
H
hzcheng 已提交
1013 1014 1015 1016
    if (ret != 0) {
      /* set no disk space error info, and abort retry */
      return tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
    }
1017
  
H
Hongze Cheng 已提交
1018 1019
    // keep this value local variable, since the pState variable may be released by other threads, if atomic_add opertion
    // increases the finished value up to pState->numOfTotal value, which means all subqueries are completed.
H
hjxilinx 已提交
1020
    // In this case, the comparsion between finished value and released pState->numOfTotal is not safe.
H
Hongze Cheng 已提交
1021 1022
    int32_t numOfTotal = pState->numOfTotal;

1023
    int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1);
H
Hongze Cheng 已提交
1024
    if (finished < numOfTotal) {
1025
      tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished);
H
hzcheng 已提交
1026 1027 1028 1029
      return tscFreeSubSqlObj(trsupport, pSql);
    }

    // all sub-queries are returned, start to local merge process
H
hjxilinx 已提交
1030
    pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage;
H
hzcheng 已提交
1031

S
slguan 已提交
1032
    tscTrace("%p retrieve from %d vnodes completed.final NumOfRows:%d,start to build loser tree", pPObj,
H
Hongze Cheng 已提交
1033
             pState->numOfTotal, pState->numOfRetrievedRows);
1034
    
1035
    SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);
1036
    tscClearInterpInfo(pPQueryInfo);
1037

1038
    tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel,
H
hzcheng 已提交
1039 1040 1041 1042 1043 1044 1045 1046
                          &pPObj->cmd, &pPObj->res);
    tscTrace("%p build loser tree completed", pPObj);

    pPObj->res.precision = pSql->res.precision;
    pPObj->res.numOfRows = 0;
    pPObj->res.row = 0;

    // only free once
1047 1048
    tfree(trsupport->pState);
    
H
hzcheng 已提交
1049 1050 1051
    tscFreeSubSqlObj(trsupport, pSql);

    if (pPObj->fp == NULL) {
S
slguan 已提交
1052 1053
      tsem_wait(&pPObj->emptyRspSem);
      tsem_wait(&pPObj->emptyRspSem);
H
hzcheng 已提交
1054

S
slguan 已提交
1055
      tsem_post(&pPObj->rspSem);
H
hzcheng 已提交
1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068
    } else {
      // set the command flag must be after the semaphore been correctly set.
      pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
      if (pPObj->res.code == TSDB_CODE_SUCCESS) {
        (*pPObj->fp)(pPObj->param, pPObj, 0);
      } else {
        tscQueueAsyncRes(pPObj);
      }
    }
  }
}

void tscKillMetricQuery(SSqlObj *pSql) {
1069 1070 1071 1072
  SSqlCmd* pCmd = &pSql->cmd;
  
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  if (!tscIsTwoStageMergeMetricQuery(pQueryInfo, 0)) {
H
hzcheng 已提交
1073 1074 1075 1076 1077 1078
    return;
  }

  for (int i = 0; i < pSql->numOfSubs; ++i) {
    SSqlObj *pSub = pSql->pSubs[i];

S
slguan 已提交
1079
    if (pSub == NULL) {
H
hzcheng 已提交
1080 1081
      continue;
    }
S
slguan 已提交
1082

H
hzcheng 已提交
1083 1084 1085 1086 1087
    /*
     * here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
     * sub-queries not correctly released and master sql object of metric query reaches an abnormal state.
     */
    pSql->pSubs[i]->res.code = TSDB_CODE_QUERY_CANCELLED;
S
slguan 已提交
1088
    //taosStopRpcConn(pSql->pSubs[i]->thandle);
H
hzcheng 已提交
1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109
  }

  /*
   * 1. if the subqueries are not launched or partially launched, we need to waiting the launched
   * query return to successfully free allocated resources.
   * 2. if no any subqueries are launched yet, which means the metric query only in parse sql stage,
   * set the res.code, and return.
   */
  const int64_t MAX_WAITING_TIME = 10000;  // 10 Sec.
  int64_t       stime = taosGetTimestampMs();

  while (pSql->cmd.command != TSDB_SQL_RETRIEVE_METRIC && pSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
    taosMsleep(100);
    if (taosGetTimestampMs() - stime > MAX_WAITING_TIME) {
      break;
    }
  }

  tscTrace("%p metric query is cancelled", pSql);
}

S
slguan 已提交
1110
static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int retCode);
H
hzcheng 已提交
1111

S
slguan 已提交
1112
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
1113 1114 1115
  const int32_t table_index = 0;
  
  SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, prevSqlObj);
S
slguan 已提交
1116
  if (pNew != NULL) {  // the sub query of two-stage super table query
1117
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
1118
    pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
1119 1120
    
    assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1);
1121 1122

    // launch subquery for each vnode, so the subquery index equals to the vnodeIndex.
1123
    SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, table_index);
H
hjxilinx 已提交
1124
    pMeterMetaInfo->vnodeIndex = trsupport->subqueryIndex;
1125

H
hjxilinx 已提交
1126
    pSql->pSubs[trsupport->subqueryIndex] = pNew;
1127
  }
H
hzcheng 已提交
1128 1129 1130 1131

  return pNew;
}

S
slguan 已提交
1132
void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
H
hzcheng 已提交
1133
  SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
H
hjxilinx 已提交
1134 1135 1136 1137
  
  SSqlObj*  pParentSql = trsupport->pParentSqlObj;
  SSqlObj*  pSql = (SSqlObj *)tres;
  
1138
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1139 1140 1141
  assert(pSql->cmd.numOfClause == 1 && pSql->cmd.pQueryInfo[0]->numOfTables == 1);
  
  int32_t idx = pMeterMetaInfo->vnodeIndex;
H
hzcheng 已提交
1142 1143

  SVnodeSidList *vnodeInfo = NULL;
S
slguan 已提交
1144 1145 1146 1147
  SVPeerDesc *   pSvd = NULL;
  if (pMeterMetaInfo->pMetricMeta != NULL) {
    vnodeInfo = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx);
    pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];
H
hzcheng 已提交
1148 1149
  }

1150 1151
  SSubqueryState* pState = trsupport->pState;
  assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
H
hjxilinx 已提交
1152
         pParentSql->numOfSubs == pState->numOfTotal);
1153
  
H
hjxilinx 已提交
1154
  if (pParentSql->res.code != TSDB_CODE_SUCCESS || pState->code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
1155
    // metric query is killed, Note: code must be less than 0
H
hzcheng 已提交
1156
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
H
hjxilinx 已提交
1157 1158
    if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
      code = -(int)(pParentSql->res.code);
H
hzcheng 已提交
1159
    } else {
1160
      code = pState->code;
H
hzcheng 已提交
1161
    }
H
hjxilinx 已提交
1162
    tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%d", pParentSql, pSql,
H
hjxilinx 已提交
1163
             trsupport->subqueryIndex, code);
H
hzcheng 已提交
1164 1165 1166
  }

  /*
S
slguan 已提交
1167
   * if a query on a vnode is failed, all retrieve operations from vnode that occurs later
H
hzcheng 已提交
1168 1169
   * than this one are actually not necessary, we simply call the tscRetrieveFromVnodeCallBack
   * function to abort current and remain retrieve process.
S
slguan 已提交
1170 1171
   *
   * NOTE: threadsafe is required.
H
hzcheng 已提交
1172
   */
S
slguan 已提交
1173
  if (code != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1174
    if (trsupport->numOfRetry++ >= MAX_NUM_OF_SUBQUERY_RETRY) {
H
hjxilinx 已提交
1175
      tscTrace("%p sub:%p reach the max retry count,set global code:%d", pParentSql, pSql, code);
1176
      atomic_val_compare_exchange_32(&pState->code, 0, code);
H
hzcheng 已提交
1177
    } else {  // does not reach the maximum retry count, go on
H
hjxilinx 已提交
1178
      tscTrace("%p sub:%p failed code:%d, retry:%d", pParentSql, pSql, code, trsupport->numOfRetry);
S
slguan 已提交
1179

H
hjxilinx 已提交
1180
      SSqlObj *pNew = tscCreateSqlObjForSubquery(pParentSql, trsupport, pSql);
S
slguan 已提交
1181 1182
      if (pNew == NULL) {
        tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d",
L
lihui 已提交
1183
                 trsupport->pParentSqlObj, pSql, pSvd != NULL ? pSvd->vnode : -1, trsupport->subqueryIndex);
H
hzcheng 已提交
1184

1185
        pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
1186 1187
        trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
      } else {
1188
        SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
1189
        assert(pNewQueryInfo->pMeterInfo[0]->pMeterMeta != NULL && pNewQueryInfo->pMeterInfo[0]->pMetricMeta != NULL);
S
slguan 已提交
1190 1191 1192
        tscProcessSql(pNew);
        return;
      }
H
hzcheng 已提交
1193 1194 1195
    }
  }

1196
  if (pState->code != TSDB_CODE_SUCCESS) {  // failed, abort
H
hzcheng 已提交
1197
    if (vnodeInfo != NULL) {
H
hjxilinx 已提交
1198
      tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", pParentSql, pSql,
H
hzcheng 已提交
1199
               vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
1200
               trsupport->subqueryIndex, pState->code);
H
hzcheng 已提交
1201
    } else {
H
hjxilinx 已提交
1202
      tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", pParentSql, pSql,
1203
               trsupport->subqueryIndex, pState->code);
H
hzcheng 已提交
1204 1205
    }

1206
    tscRetrieveFromVnodeCallBack(param, tres, pState->code);
H
hzcheng 已提交
1207
  } else {  // success, proceed to retrieve data from dnode
L
lihui 已提交
1208 1209
    if (vnodeInfo != NULL) {
      tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
H
hzcheng 已提交
1210
             vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
H
hjxilinx 已提交
1211
             trsupport->subqueryIndex);
L
lihui 已提交
1212 1213 1214 1215
    } else {
      tscTrace("%p sub:%p query complete, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
             trsupport->subqueryIndex);
    }
H
hzcheng 已提交
1216 1217 1218 1219 1220

    taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param);
  }
}

1221
int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1222 1223 1224 1225 1226
  char *pMsg, *pStart;

  pStart = pSql->cmd.payload + tsRpcHeadSize;
  pMsg = pStart;

1227
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *)pMsg;
S
slguan 已提交
1228
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
S
slguan 已提交
1229 1230
  pMsg += sizeof(pSql->res.qhandle);

1231
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
S
slguan 已提交
1232
  pRetrieveMsg->free = htons(pQueryInfo->type);
1233
  pMsg += sizeof(pQueryInfo->type);
H
hzcheng 已提交
1234

1235
  pSql->cmd.payloadLen = pMsg - pStart;
S
slguan 已提交
1236
  pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE;
H
hzcheng 已提交
1237

1238
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1239 1240
}

S
slguan 已提交
1241
void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) {
1242 1243 1244
  //SShellSubmitMsg *pShellMsg;
  //char *           pMsg;
  //SMeterMetaInfo * pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, pSql->cmd.clauseIndex, 0);
S
slguan 已提交
1245

1246
  //STableMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
H
hzcheng 已提交
1247

1248
  //pMsg = buf + tsRpcHeadSize;
H
hzcheng 已提交
1249

S
slguan 已提交
1250 1251
  //TODO set iplist
  //pShellMsg = (SShellSubmitMsg *)pMsg;
S
slguan 已提交
1252 1253 1254
  //pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode);
  //tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pSql->index].ip),
  //         htons(pShellMsg->vnode));
H
hzcheng 已提交
1255 1256
}

1257
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1258 1259
  SShellSubmitMsg *pShellMsg;
  char *           pMsg, *pStart;
S
slguan 已提交
1260

1261 1262 1263
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);

S
slguan 已提交
1264
  STableMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
H
hzcheng 已提交
1265 1266 1267 1268 1269

  pStart = pSql->cmd.payload + tsRpcHeadSize;
  pMsg = pStart;

  pShellMsg = (SShellSubmitMsg *)pMsg;
1270 1271

  pShellMsg->import = htons(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT) ? 0 : 1);
S
slguan 已提交
1272
  pShellMsg->vnode = 0; //htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode);
1273
  pShellMsg->numOfSid = htonl(pSql->cmd.numOfTablesInSubmit);  // number of meters to be inserted
H
hzcheng 已提交
1274

S
slguan 已提交
1275
  // pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here
S
slguan 已提交
1276 1277 1278 1279
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
  tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pMeterMeta->index].ip),
           htons(pShellMsg->vnode));

S
slguan 已提交
1280 1281
  pSql->cmd.payloadLen = sizeof(SShellSubmitMsg);

1282
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1283 1284
}

S
slguan 已提交
1285
void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char *buf) {
S
slguan 已提交
1286 1287 1288 1289 1290
  //TODO
//  SSqlCmd *       pCmd = &pSql->cmd;
//  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
//
//  char *          pStart = buf + tsRpcHeadSize;
S
slguan 已提交
1291
//  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart;
S
slguan 已提交
1292 1293
//
//  if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {  // pColumnModel == NULL, query on meter
S
slguan 已提交
1294
//    STableMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
S
slguan 已提交
1295 1296
//    pQueryMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode);
//  } else {  // query on metric
S
slguan 已提交
1297
//    SSuperTableMeta *  pMetricMeta = pMeterMetaInfo->pMetricMeta;
S
slguan 已提交
1298 1299 1300
//    SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
//    pQueryMsg->vnode = htons(pVnodeSidList->vpeerDesc[pSql->index].vnode);
//  }
H
hzcheng 已提交
1301 1302 1303 1304 1305 1306
}

/*
 * for meter query, simply return the size <= 1k
 * for metric query, estimate size according to meter tags
 */
1307
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
1308
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
1309
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
1310

1311
  int32_t srcColListSize = pQueryInfo->colList.numOfCols * sizeof(SColumnInfo);
1312

H
hjxilinx 已提交
1313
  int32_t         exprSize = sizeof(SSqlFuncExprMsg) * pQueryInfo->exprsInfo.numOfExprs;
1314
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
1315 1316

  // meter query without tags values
1317
  if (!UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) {
S
slguan 已提交
1318
    return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize;
H
hzcheng 已提交
1319 1320
  }

S
slguan 已提交
1321
  SSuperTableMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta;
H
hjxilinx 已提交
1322
  SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
H
hzcheng 已提交
1323

S
slguan 已提交
1324
  int32_t meterInfoSize = (pMetricMeta->tagLen + sizeof(STableSidExtInfo)) * pVnodeSidList->numOfSids;
H
hjxilinx 已提交
1325
  int32_t outputColumnSize = pQueryInfo->exprsInfo.numOfExprs * sizeof(SSqlFuncExprMsg);
H
hzcheng 已提交
1326

S
slguan 已提交
1327
  int32_t size = meterInfoSize + outputColumnSize + srcColListSize + exprSize + MIN_QUERY_MSG_PKT_SIZE;
1328 1329
  if (pQueryInfo->tsBuf != NULL) {
    size += pQueryInfo->tsBuf->fileSize;
S
slguan 已提交
1330 1331 1332
  }

  return size;
H
hzcheng 已提交
1333 1334
}

S
slguan 已提交
1335
static char *doSerializeTableInfo(SSqlObj *pSql, int32_t numOfTables, int32_t vnodeId, char *pMsg) {
1336
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, pSql->cmd.clauseIndex, 0);
1337

S
slguan 已提交
1338 1339
  STableMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta;
  SSuperTableMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta;
1340

S
slguan 已提交
1341
  tscTrace("%p vid:%d, query on %d meters", pSql, vnodeId, numOfTables);
1342 1343
  if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
#ifdef _DEBUG_VIEW
L
lihui 已提交
1344
    tscTrace("%p sid:%d, uid:%" PRIu64, pSql, pMeterMetaInfo->pMeterMeta->sid, pMeterMetaInfo->pMeterMeta->uid);
1345
#endif
S
slguan 已提交
1346
    STableSidExtInfo *pMeterInfo = (STableSidExtInfo *)pMsg;
1347 1348
    pMeterInfo->sid = htonl(pMeterMeta->sid);
    pMeterInfo->uid = htobe64(pMeterMeta->uid);
1349
    pMeterInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pMeterMeta->uid));
S
slguan 已提交
1350
    pMsg += sizeof(STableSidExtInfo);
1351 1352
  } else {
    SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
1353

S
slguan 已提交
1354 1355 1356
    for (int32_t i = 0; i < numOfTables; ++i) {
      STableSidExtInfo *pMeterInfo = (STableSidExtInfo *)pMsg;
      STableSidExtInfo *pQueryMeterInfo = tscGetMeterSidInfo(pVnodeSidList, i);
1357

1358 1359
      pMeterInfo->sid = htonl(pQueryMeterInfo->sid);
      pMeterInfo->uid = htobe64(pQueryMeterInfo->uid);
1360
      pMeterInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pQueryMeterInfo->uid));
1361
      
S
slguan 已提交
1362
      pMsg += sizeof(STableSidExtInfo);
1363

1364 1365 1366 1367
      memcpy(pMsg, pQueryMeterInfo->tags, pMetricMeta->tagLen);
      pMsg += pMetricMeta->tagLen;

#ifdef _DEBUG_VIEW
L
lihui 已提交
1368
      tscTrace("%p sid:%d, uid:%" PRId64, pSql, pQueryMeterInfo->sid, pQueryMeterInfo->uid);
1369 1370 1371
#endif
    }
  }
1372

1373 1374 1375
  return pMsg;
}

1376
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1377 1378
  SSqlCmd *pCmd = &pSql->cmd;

1379
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1380

S
slguan 已提交
1381 1382 1383 1384 1385
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
    return -1;
  }

1386
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1387
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
1388
  
S
slguan 已提交
1389
  char *          pStart = pCmd->payload + tsRpcHeadSize;
H
hzcheng 已提交
1390

S
slguan 已提交
1391 1392
  STableMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta;
  SSuperTableMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta;
H
hzcheng 已提交
1393

S
slguan 已提交
1394
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart;
H
hzcheng 已提交
1395 1396

  int32_t msgLen = 0;
S
slguan 已提交
1397
  int32_t numOfTables = 0;
H
hzcheng 已提交
1398

S
slguan 已提交
1399
  if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
S
slguan 已提交
1400
    numOfTables = 1;
H
hzcheng 已提交
1401 1402

    tscTrace("%p query on vnode: %d, number of sid:%d, meter id: %s", pSql,
S
slguan 已提交
1403
             pMeterMeta->vpeerDesc[pMeterMeta->index].vnode, 1, pMeterMetaInfo->name);
H
hzcheng 已提交
1404 1405 1406 1407

    pQueryMsg->vnode = htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode);
    pQueryMsg->uid = pMeterMeta->uid;
    pQueryMsg->numOfTagsCols = 0;
1408
  } else {  // query on super table
H
hjxilinx 已提交
1409 1410
    if (pMeterMetaInfo->vnodeIndex < 0) {
      tscError("%p error vnodeIdx:%d", pSql, pMeterMetaInfo->vnodeIndex);
H
hzcheng 已提交
1411 1412 1413
      return -1;
    }

H
hjxilinx 已提交
1414
    SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
H
hzcheng 已提交
1415 1416
    uint32_t       vnodeId = pVnodeSidList->vpeerDesc[pVnodeSidList->index].vnode;

S
slguan 已提交
1417 1418 1419
    numOfTables = pVnodeSidList->numOfSids;
    if (numOfTables <= 0) {
      tscError("%p vid:%d,error numOfTables in query message:%d", pSql, vnodeId, numOfTables);
H
hzcheng 已提交
1420 1421 1422
      return -1;  // error
    }

S
slguan 已提交
1423
    tscTrace("%p query on vid:%d, number of sid:%d", pSql, vnodeId, numOfTables);
H
hzcheng 已提交
1424 1425 1426
    pQueryMsg->vnode = htons(vnodeId);
  }

S
slguan 已提交
1427
  pQueryMsg->numOfSids = htonl(numOfTables);
S
slguan 已提交
1428
  pQueryMsg->numOfTagsCols = htons(pMeterMetaInfo->numOfTags);
H
hzcheng 已提交
1429

1430 1431 1432
  if (pQueryInfo->order.order == TSQL_SO_ASC) {
    pQueryMsg->skey = htobe64(pQueryInfo->stime);
    pQueryMsg->ekey = htobe64(pQueryInfo->etime);
H
hzcheng 已提交
1433
  } else {
1434 1435
    pQueryMsg->skey = htobe64(pQueryInfo->etime);
    pQueryMsg->ekey = htobe64(pQueryInfo->stime);
H
hzcheng 已提交
1436 1437
  }

1438 1439
  pQueryMsg->order = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId);
H
hzcheng 已提交
1440

1441
  pQueryMsg->interpoType = htons(pQueryInfo->interpoType);
H
hzcheng 已提交
1442

1443 1444
  pQueryMsg->limit = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset = htobe64(pQueryInfo->limit.offset);
H
hzcheng 已提交
1445

1446
  pQueryMsg->numOfCols = htons(pQueryInfo->colList.numOfCols);
H
hzcheng 已提交
1447

1448
  if (pQueryInfo->colList.numOfCols <= 0) {
H
hzcheng 已提交
1449 1450 1451 1452 1453 1454 1455 1456 1457
    tscError("%p illegal value of numOfCols in query msg: %d", pSql, pMeterMeta->numOfColumns);
    return -1;
  }

  if (pMeterMeta->numOfTags < 0) {
    tscError("%p illegal value of numOfTagsCols in query msg: %d", pSql, pMeterMeta->numOfTags);
    return -1;
  }

1458
  pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime);
1459
  pQueryMsg->intervalTimeUnit = pQueryInfo->intervalTimeUnit;
H
hjxilinx 已提交
1460
  pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime);
1461
  
1462 1463
  if (pQueryInfo->intervalTime < 0) {
    tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
H
hzcheng 已提交
1464 1465 1466
    return -1;
  }

1467 1468
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
S
slguan 已提交
1469 1470 1471
    return -1;
  }

1472
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
S
slguan 已提交
1473 1474

  if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {  // query on meter
H
hzcheng 已提交
1475 1476 1477 1478 1479
    pQueryMsg->tagLength = 0;
  } else {  // query on metric
    pQueryMsg->tagLength = htons(pMetricMeta->tagLen);
  }

1480 1481
  pQueryMsg->queryType = htons(pQueryInfo->type);
  pQueryMsg->numOfOutputCols = htons(pQueryInfo->exprsInfo.numOfExprs);
H
hzcheng 已提交
1482

1483
  if (pQueryInfo->fieldsInfo.numOfOutputCols < 0) {
1484 1485
    tscError("%p illegal value of number of output columns in query msg: %d", pSql,
             pQueryInfo->fieldsInfo.numOfOutputCols);
H
hzcheng 已提交
1486 1487 1488 1489
    return -1;
  }

  // set column list ids
1490
  char *   pMsg = (char *)(pQueryMsg->colList) + pQueryInfo->colList.numOfCols * sizeof(SColumnInfo);
S
slguan 已提交
1491
  SSchema *pSchema = tsGetSchema(pMeterMeta);
H
hzcheng 已提交
1492

1493 1494
  for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) {
    SColumnBase *pCol = tscColumnBaseInfoGet(&pQueryInfo->colList, i);
S
slguan 已提交
1495
    SSchema *    pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
1496

S
slguan 已提交
1497
    if (pCol->colIndex.columnIndex >= pMeterMeta->numOfColumns || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
H
hzcheng 已提交
1498
        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
S
slguan 已提交
1499 1500
      tscError("%p vid:%d sid:%d id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s", pSql,
               htons(pQueryMsg->vnode), pMeterMeta->sid, pMeterMetaInfo->name, pMeterMeta->numOfColumns, pCol->colIndex,
H
hzcheng 已提交
1501 1502
               pColSchema->name);

S
slguan 已提交
1503
      return -1;  // 0 means build msg failed
H
hzcheng 已提交
1504 1505 1506 1507 1508
    }

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
    pQueryMsg->colList[i].type = htons(pColSchema->type);
S
slguan 已提交
1509
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
1510

S
slguan 已提交
1511 1512 1513
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
1514

S
slguan 已提交
1515 1516 1517 1518
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
      pFilterMsg->filterOnBinary = htons(pColFilter->filterOnBinary);

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
1519

S
slguan 已提交
1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530
      if (pColFilter->filterOnBinary) {
        pFilterMsg->len = htobe64(pColFilter->len);
        memcpy(pMsg, (void *)pColFilter->pz, pColFilter->len + 1);
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
1531

S
slguan 已提交
1532 1533 1534 1535 1536
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
        return -1;
      }
    }
H
hzcheng 已提交
1537 1538 1539 1540
  }

  bool hasArithmeticFunction = false;

S
slguan 已提交
1541
  SSqlFuncExprMsg *pSqlFuncExpr = (SSqlFuncExprMsg *)pMsg;
H
hzcheng 已提交
1542

H
hjxilinx 已提交
1543
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
1544
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
1545

S
slguan 已提交
1546
    if (pExpr->functionId == TSDB_FUNC_ARITHM) {
H
hzcheng 已提交
1547 1548 1549
      hasArithmeticFunction = true;
    }

1550
    if (!tscValidateColumnId(pMeterMetaInfo, pExpr->colInfo.colId)) {
H
hzcheng 已提交
1551 1552 1553 1554 1555 1556 1557
      /* column id is not valid according to the cached metermeta, the meter meta is expired */
      tscError("%p table schema is not matched with parsed sql", pSql);
      return -1;
    }

    pSqlFuncExpr->colInfo.colId = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIdx = htons(pExpr->colInfo.colIdx);
S
slguan 已提交
1558
    pSqlFuncExpr->colInfo.flag = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
1559

S
slguan 已提交
1560
    pSqlFuncExpr->functionId = htons(pExpr->functionId);
H
hzcheng 已提交
1561 1562 1563 1564 1565 1566 1567 1568 1569
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
    pMsg += sizeof(SSqlFuncExprMsg);

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
S
slguan 已提交
1570 1571 1572

        // by plus one char to make the string null-terminated
        pMsg += pExpr->param[j].nLen + 1;
H
hzcheng 已提交
1573 1574 1575 1576 1577 1578 1579 1580 1581 1582
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

    pSqlFuncExpr = (SSqlFuncExprMsg *)pMsg;
  }

  int32_t len = 0;
  if (hasArithmeticFunction) {
1583 1584
    SColumnBase *pColBase = pQueryInfo->colList.pColList;
    for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) {
S
slguan 已提交
1585
      char *  name = pSchema[pColBase[i].colIndex.columnIndex].name;
H
hzcheng 已提交
1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596
      int32_t lenx = strlen(name);
      memcpy(pMsg, name, lenx);
      *(pMsg + lenx) = ',';

      len += (lenx + 1);  // one for comma
      pMsg += (lenx + 1);
    }
  }

  pQueryMsg->colNameLen = htonl(len);

1597
  // serialize the table info (sid, uid, tags)
S
slguan 已提交
1598
  pMsg = doSerializeTableInfo(pSql, numOfTables, htons(pQueryMsg->vnode), pMsg);
H
hzcheng 已提交
1599

S
slguan 已提交
1600 1601 1602
  // only include the required tag column schema. If a tag is not required, it won't be sent to vnode
  if (pMeterMetaInfo->numOfTags > 0) {
    // always transfer tag schema to vnode if exists
S
slguan 已提交
1603
    SSchema *pTagSchema = tsGetTagSchema(pMeterMeta);
H
hzcheng 已提交
1604

S
slguan 已提交
1605 1606
    for (int32_t j = 0; j < pMeterMetaInfo->numOfTags; ++j) {
      if (pMeterMetaInfo->tagColumnIndex[j] == TSDB_TBNAME_COLUMN_INDEX) {
S
slguan 已提交
1607
        SSchema tbSchema = {
S
slguan 已提交
1608
            .bytes = TSDB_TABLE_NAME_LEN, .colId = TSDB_TBNAME_COLUMN_INDEX, .type = TSDB_DATA_TYPE_BINARY};
S
slguan 已提交
1609
        memcpy(pMsg, &tbSchema, sizeof(SSchema));
H
hzcheng 已提交
1610
      } else {
S
slguan 已提交
1611
        memcpy(pMsg, &pTagSchema[pMeterMetaInfo->tagColumnIndex[j]], sizeof(SSchema));
H
hzcheng 已提交
1612 1613
      }

S
slguan 已提交
1614
      pMsg += sizeof(SSchema);
H
hzcheng 已提交
1615 1616 1617
    }
  }

1618
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
S
slguan 已提交
1619 1620
  if (pGroupbyExpr->numOfGroupCols != 0) {
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
1621 1622
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
      SColIndexEx *pCol = &pGroupbyExpr->columnInfo[j];

      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

      *((int16_t *)pMsg) += pCol->colIdx;
      pMsg += sizeof(pCol->colIdx);

      *((int16_t *)pMsg) += pCol->colIdxInBuf;
      pMsg += sizeof(pCol->colIdxInBuf);

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
1637 1638 1639
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
1640 1641 1642
    }
  }

1643 1644 1645 1646
  if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
      *((int64_t *)pMsg) = htobe64(pQueryInfo->defaultVal[i]);
      pMsg += sizeof(pQueryInfo->defaultVal[0]);
S
slguan 已提交
1647 1648 1649 1650 1651 1652 1653 1654
    }
  }

  // compressed ts block
  pQueryMsg->tsOffset = htonl(pMsg - pStart);
  int32_t tsLen = 0;
  int32_t numOfBlocks = 0;

1655 1656 1657
  if (pQueryInfo->tsBuf != NULL) {
    STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pMeterMetaInfo->vnodeIndex);
    assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL);  // this query should not be sent
S
slguan 已提交
1658 1659

    // todo refactor
1660 1661
    fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET);
    fread(pMsg, pBlockInfo->compLen, 1, pQueryInfo->tsBuf->f);
S
slguan 已提交
1662 1663 1664 1665

    pMsg += pBlockInfo->compLen;
    tsLen = pBlockInfo->compLen;
    numOfBlocks = pBlockInfo->numOfBlocks;
H
hzcheng 已提交
1666 1667
  }

S
slguan 已提交
1668 1669
  pQueryMsg->tsLen = htonl(tsLen);
  pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
1670 1671
  if (pQueryInfo->tsBuf != NULL) {
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
H
hzcheng 已提交
1672 1673 1674 1675 1676 1677
  }

  msgLen = pMsg - pStart;

  tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1678
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hzcheng 已提交
1679 1680

  assert(msgLen + minMsgSize() <= size);
1681 1682

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1683 1684
}

1685 1686
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1687 1688
  pCmd->payloadLen = sizeof(SCreateDbMsg);
  pCmd->msgType = TSDB_MSG_TYPE_CREATE_DB;
H
hzcheng 已提交
1689

S
slguan 已提交
1690 1691 1692 1693 1694 1695
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

  SCreateDbMsg *pCreateDbMsg = (SCreateDbMsg*)pCmd->payload;
1696

1697 1698
  assert(pCmd->numOfClause == 1);
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
L
lihui 已提交
1699
  strncpy(pCreateDbMsg->db, pMeterMetaInfo->name, tListLen(pCreateDbMsg->db));
H
hzcheng 已提交
1700

1701
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1702 1703
}

1704 1705
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1706 1707 1708 1709 1710
  pCmd->payloadLen = sizeof(SCreateDnodeMsg);
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1711

S
slguan 已提交
1712
  SCreateDnodeMsg *pCreate = (SCreateDnodeMsg *)pCmd->payload;
1713 1714
  strncpy(pCreate->ip, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
  pCmd->msgType = TSDB_MSG_TYPE_CREATE_DNODE;
H
hzcheng 已提交
1715

1716
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1717 1718
}

1719 1720
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1721
  pCmd->payloadLen = sizeof(SCreateAcctMsg);
S
slguan 已提交
1722 1723 1724 1725
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1726

S
slguan 已提交
1727
  SCreateAcctMsg *pAlterMsg = (SCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
1728

1729 1730
  SSQLToken *pName = &pInfo->pDCLInfo->user.user;
  SSQLToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
1731

1732 1733
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
1734

1735
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
1736

1737 1738 1739 1740 1741 1742 1743 1744
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
1745

1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
1759

1760 1761
  pCmd->msgType = TSDB_MSG_TYPE_CREATE_ACCT;
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1762 1763
}

1764 1765
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1766
  pCmd->payloadLen = sizeof(SCreateUserMsg);
S
slguan 已提交
1767

S
slguan 已提交
1768 1769 1770 1771 1772 1773
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

  SCreateUserMsg *pAlterMsg = (SCreateUserMsg*)pCmd->payload;
H
hzcheng 已提交
1774

1775 1776 1777
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
  pAlterMsg->flag = pUser->type;
H
hzcheng 已提交
1778

1779 1780 1781 1782
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
1783 1784
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
1785
  }
H
hzcheng 已提交
1786

1787 1788 1789 1790 1791
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pCmd->msgType = TSDB_MSG_TYPE_ALTER_USER;
  } else {
    pCmd->msgType = TSDB_MSG_TYPE_CREATE_USER;
  }
H
hzcheng 已提交
1792

1793
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1794 1795
}

1796 1797
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1798
  pCmd->payloadLen = sizeof(SCMCfgDnodeMsg);
H
hzcheng 已提交
1799

S
slguan 已提交
1800 1801 1802 1803
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1804

S
slguan 已提交
1805
  pCmd->msgType = TSDB_MSG_TYPE_MD_CONFIG_DNODE;
1806 1807
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
1808

1809 1810
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1811
  pCmd->payloadLen = sizeof(SDropDbMsg);
H
hzcheng 已提交
1812

S
slguan 已提交
1813 1814 1815 1816 1817 1818
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

  SDropDbMsg *pDropDbMsg = (SDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
1819

1820
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
1821 1822
  strncpy(pDropDbMsg->db, pMeterMetaInfo->name, tListLen(pDropDbMsg->db));
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1823

1824 1825
  pCmd->msgType = TSDB_MSG_TYPE_DROP_DB;
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1826 1827
}

1828 1829
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1830
  pCmd->payloadLen = sizeof(SDropTableMsg);
H
hzcheng 已提交
1831

S
slguan 已提交
1832 1833 1834 1835
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1836

S
slguan 已提交
1837 1838
  SDropTableMsg *pDropTableMsg = (SDropTableMsg*)pCmd->payload;
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
S
slguan 已提交
1839
  strcpy(pDropTableMsg->tableId, pMeterMetaInfo->name);
S
slguan 已提交
1840
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1841

1842 1843
  pCmd->msgType = TSDB_MSG_TYPE_DROP_TABLE;
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1844 1845
}

1846
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1847 1848 1849 1850 1851 1852
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->payloadLen = sizeof(SDropDnodeMsg);
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1853

S
slguan 已提交
1854
  SDropDnodeMsg *pDrop = (SDropDnodeMsg *)pCmd->payload;
1855
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
1856 1857
  strcpy(pDrop->ip, pMeterMetaInfo->name);
  pCmd->msgType = TSDB_MSG_TYPE_DROP_DNODE;
H
hzcheng 已提交
1858

1859
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1860 1861
}

1862 1863
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1864 1865
  pCmd->payloadLen = sizeof(SDropUserMsg);
  pCmd->msgType = TSDB_MSG_TYPE_DROP_USER;
H
hzcheng 已提交
1866

S
slguan 已提交
1867 1868 1869 1870
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1871

S
slguan 已提交
1872
  SDropUserMsg *pDropMsg = (SDropUserMsg*)pCmd->payload;
1873
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
S
slguan 已提交
1874
  strcpy(pDropMsg->user, pMeterMetaInfo->name);
H
hzcheng 已提交
1875

1876
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1877 1878
}

1879 1880
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1881
  pCmd->payloadLen = sizeof(SUseDbMsg);
H
hzcheng 已提交
1882

S
slguan 已提交
1883 1884 1885 1886
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
1887

S
slguan 已提交
1888
  SUseDbMsg *pUseDbMsg = (SUseDbMsg*)pCmd->payload;
1889
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
S
slguan 已提交
1890
  strcpy(pUseDbMsg->db, pMeterMetaInfo->name);
H
hzcheng 已提交
1891 1892
  pCmd->msgType = TSDB_MSG_TYPE_USE_DB;

1893
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1894 1895
}

1896
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1897
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1898 1899 1900
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_SHOW;
  pCmd->payloadLen = sizeof(SShowMsg) + 100;
H
hzcheng 已提交
1901

S
slguan 已提交
1902 1903 1904
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
1905
  }
H
hzcheng 已提交
1906

S
slguan 已提交
1907
  SShowMsg *pShowMsg = (SShowMsg*)pCmd->payload;
S
slguan 已提交
1908

1909
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
S
slguan 已提交
1910
  size_t nameLen = strlen(pMeterMetaInfo->name);
S
slguan 已提交
1911
  if (nameLen > 0) {
S
slguan 已提交
1912
    strcpy(pShowMsg->db, pMeterMetaInfo->name);  // prefix is set here
H
hzcheng 已提交
1913
  } else {
S
slguan 已提交
1914
    strcpy(pShowMsg->db, pObj->db);
H
hzcheng 已提交
1915 1916
  }

1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
    SSQLToken *pPattern = &pShowInfo->pattern;
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
    SSQLToken *pIpAddr = &pShowInfo->prefix;
    assert(pIpAddr->n > 0 && pIpAddr->type > 0);
H
hzcheng 已提交
1929

1930 1931 1932 1933
    strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n);
    pShowMsg->payloadLen = htons(pIpAddr->n);
  }

S
slguan 已提交
1934
  pCmd->payloadLen = sizeof(SShowMsg) + pShowMsg->payloadLen;
1935
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1936 1937
}

1938
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1939
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1940
  pCmd->payloadLen = sizeof(SKillQueryMsg);
H
hzcheng 已提交
1941

S
slguan 已提交
1942 1943 1944 1945
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1946

S
slguan 已提交
1947
  SKillQueryMsg *pKill = (SKillQueryMsg*)pCmd->payload;
1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960
  strncpy(pKill->queryId, pInfo->pDCLInfo->ip.z, pInfo->pDCLInfo->ip.n);
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
      pCmd->msgType = TSDB_MSG_TYPE_KILL_QUERY;
      break;
    case TSDB_SQL_KILL_CONNECTION:
      pCmd->msgType = TSDB_MSG_TYPE_KILL_CONNECTION;
      break;
    case TSDB_SQL_KILL_STREAM:
      pCmd->msgType = TSDB_MSG_TYPE_KILL_STREAM;
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1961 1962
}

1963
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1964 1965
  SSqlCmd *pCmd = &(pSql->cmd);

S
slguan 已提交
1966
  int32_t size = minMsgSize() + sizeof(SMgmtHead) + sizeof(SCreateTableMsg);
H
hzcheng 已提交
1967

1968
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1969
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
1970 1971
    size += sizeof(STagData);
  } else {
S
slguan 已提交
1972
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1973
  }
1974

1975 1976 1977
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1978 1979 1980 1981

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1982
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1983
  int              msgLen = 0;
S
slguan 已提交
1984
  SSchema *        pSchema;
H
hzcheng 已提交
1985
  int              size = 0;
1986 1987 1988
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
1989
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
1990 1991

  // Reallocate the payload size
1992
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1993 1994
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1995
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
1996
  }
H
hzcheng 已提交
1997 1998


S
slguan 已提交
1999 2000
  SCreateTableMsg *pCreateTableMsg = (SCreateTableMsg *)pCmd->payload;
  strcpy(pCreateTableMsg->tableId, pMeterMetaInfo->name);
2001 2002

  // use dbinfo from table id without modifying current db info
S
slguan 已提交
2003
  tscGetDBInfoFromMeterId(pMeterMetaInfo->name, pCreateTableMsg->db);
H
hzcheng 已提交
2004

2005 2006 2007 2008
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;

H
hzcheng 已提交
2009 2010 2011 2012
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
S
slguan 已提交
2013
  char *pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
2014

2015 2016 2017
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
    memcpy(pMsg, &pInfo->pCreateTableInfo->usingInfo.tagdata, sizeof(STagData));
H
hzcheng 已提交
2018
    pMsg += sizeof(STagData);
2019
  } else {  // create (super) table
2020
    pSchema = (SSchema *)pCreateTableMsg->schema;
2021

H
hzcheng 已提交
2022
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
2023
      TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
H
hzcheng 已提交
2024 2025 2026 2027

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
2028

H
hzcheng 已提交
2029 2030 2031 2032
      pSchema++;
    }

    pMsg = (char *)pSchema;
2033 2034
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
2035

2036 2037 2038
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
2039 2040 2041
    }
  }

2042
  tscClearFieldInfo(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
2043

S
slguan 已提交
2044
  msgLen = pMsg - (char*)pCreateTableMsg;
H
hzcheng 已提交
2045 2046 2047 2048
  pCmd->payloadLen = msgLen;
  pCmd->msgType = TSDB_MSG_TYPE_CREATE_TABLE;

  assert(msgLen + minMsgSize() <= size);
2049
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2050 2051 2052
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
2053
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
S
slguan 已提交
2054
  return minMsgSize() + sizeof(SMgmtHead) + sizeof(SAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
2055 2056 2057
         TSDB_EXTRA_PAYLOAD_SIZE;
}

2058
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2059
  SAlterTableMsg *pAlterTableMsg;
2060
  char *          pMsg;
H
hzcheng 已提交
2061 2062 2063
  int             msgLen = 0;
  int             size = 0;

2064 2065 2066
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

2067
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
2068 2069

  size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
2070 2071 2072 2073
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
    return -1;
  }
H
hzcheng 已提交
2074

S
slguan 已提交
2075
  pAlterTableMsg = (SAlterTableMsg *)pCmd->payload;
H
hzcheng 已提交
2076

S
slguan 已提交
2077
  tscGetDBInfoFromMeterId(pMeterMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
2078

2079 2080
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;

2081
  strcpy(pAlterTableMsg->tableId, pMeterMetaInfo->name);
2082
  pAlterTableMsg->type = htons(pAlterInfo->type);
2083

2084
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
2085
  memcpy(pAlterTableMsg->tagVal, pAlterInfo->tagData.data, TSDB_MAX_TAGS_LEN);
H
hzcheng 已提交
2086

S
slguan 已提交
2087
  SSchema *pSchema = pAlterTableMsg->schema;
2088 2089
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
    TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
H
hzcheng 已提交
2090 2091 2092 2093 2094 2095 2096 2097 2098

    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;

S
slguan 已提交
2099
  msgLen = pMsg - (char*)pAlterTableMsg;
H
hzcheng 已提交
2100 2101 2102 2103
  pCmd->payloadLen = msgLen;
  pCmd->msgType = TSDB_MSG_TYPE_ALTER_TABLE;

  assert(msgLen + minMsgSize() <= size);
2104

2105
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2106 2107
}

2108
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
2109 2110 2111
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->payloadLen = sizeof(SAlterDbMsg);
  pCmd->msgType = TSDB_MSG_TYPE_ALTER_DB;
H
hzcheng 已提交
2112

S
slguan 已提交
2113 2114 2115 2116
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
2117

S
slguan 已提交
2118 2119
  SAlterDbMsg *pAlterDbMsg = (SAlterDbMsg*)pCmd->payload;
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
S
slguan 已提交
2120
  strcpy(pAlterDbMsg->db, pMeterMetaInfo->name);
H
hzcheng 已提交
2121

2122
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2123 2124
}

2125
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
2126 2127 2128
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_RETRIEVE;
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
2129

S
slguan 已提交
2130 2131 2132
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
H
hzcheng 已提交
2133
  }
S
slguan 已提交
2134

S
slguan 已提交
2135 2136 2137 2138
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
2139

2140
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2141 2142
}

2143
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
2144
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
2145 2146 2147
    return pRes->code;
  }

2148
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
H
hjxilinx 已提交
2149 2150
    int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
    pRes->tsrow[i] = (pRes->data + offset * pRes->numOfRows);
H
hzcheng 已提交
2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
2165

2166
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2167

H
hzcheng 已提交
2168 2169 2170 2171 2172 2173 2174
  pRes->code = TSDB_CODE_SUCCESS;

  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

2175
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
2176
  } else {
S
slguan 已提交
2177
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
2193
  SSqlCmd *       pCmd = &pSql->cmd;
2194
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
S
slguan 已提交
2195 2196

  int32_t numOfRes = pMeterMetaInfo->pMeterMeta->numOfColumns + pMeterMetaInfo->pMeterMeta->numOfTags;
H
hzcheng 已提交
2197 2198 2199 2200 2201

  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

int tscProcessTagRetrieveRsp(SSqlObj *pSql) {
2202 2203
  SSqlCmd *pCmd = &pSql->cmd;

2204
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2205
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
S
slguan 已提交
2206 2207

  int32_t numOfRes = 0;
2208
  if (tscSqlExprGet(pQueryInfo, 0)->functionId == TSDB_FUNC_TAGPRJ) {
S
slguan 已提交
2209
    numOfRes = pMeterMetaInfo->pMetricMeta->numOfTables;
H
hzcheng 已提交
2210 2211 2212 2213 2214 2215 2216 2217 2218 2219
  } else {
    numOfRes = 1;  // for count function, there is only one output.
  }
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

int tscProcessRetrieveMetricRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
2220 2221
  pRes->code = tscDoLocalreduce(pSql);
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
2222 2223

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
2224
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
2225 2226 2227 2228
  }

  pRes->row = 0;

2229
  uint8_t code = pRes->code;
H
hzcheng 已提交
2230
  if (pSql->fp) {  // async retrieve metric data
2231 2232
    if (pRes->code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
H
hzcheng 已提交
2233 2234 2235 2236 2237 2238 2239 2240
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

S
slguan 已提交
2241
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
2242

2243
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2244
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
2245 2246
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_CONNECT;
S
slguan 已提交
2247
  pCmd->payloadLen = sizeof(SConnectMsg);
H
hzcheng 已提交
2248

S
slguan 已提交
2249 2250 2251 2252 2253
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

S
slguan 已提交
2254
  SConnectMsg *pConnect = (SConnectMsg*)pCmd->payload;
H
hzcheng 已提交
2255 2256 2257 2258 2259

  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
  strcpy(pConnect->db, db);
S
slguan 已提交
2260
  strcpy(pConnect->clientVersion, version);
S
slguan 已提交
2261
  strcpy(pConnect->msgVersion, "");
H
hzcheng 已提交
2262

2263
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2264 2265
}

2266
int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
2267
  STableInfoMsg *pInfoMsg;
S
slguan 已提交
2268
  char *         pMsg;
H
hzcheng 已提交
2269 2270 2271 2272 2273
  int            msgLen = 0;

  char *tmpData = 0;
  if (pSql->cmd.allocSize > 0) {
    tmpData = calloc(1, pSql->cmd.allocSize);
2274 2275 2276 2277
    if (NULL == tmpData) {
      return TSDB_CODE_CLI_OUT_OF_MEMORY;
    }

H
hzcheng 已提交
2278 2279 2280 2281
    // STagData is in binary format, strncpy is not available
    memcpy(tmpData, pSql->cmd.payload, pSql->cmd.allocSize);
  }

2282 2283 2284
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

2285
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
S
slguan 已提交
2286

S
slguan 已提交
2287
  pInfoMsg = (STableInfoMsg *)pCmd->payload;
S
slguan 已提交
2288
  strcpy(pInfoMsg->tableId, pMeterMetaInfo->name);
2289
  pInfoMsg->createFlag = htons(pSql->cmd.createOnDemand ? 1 : 0);
S
slguan 已提交
2290 2291

  pMsg = (char*)pInfoMsg + sizeof(STableInfoMsg);
H
hzcheng 已提交
2292

2293
  if (pSql->cmd.createOnDemand) {
H
hzcheng 已提交
2294 2295 2296 2297
    memcpy(pInfoMsg->tags, tmpData, sizeof(STagData));
    pMsg += sizeof(STagData);
  }

S
slguan 已提交
2298
  msgLen = pMsg - (char*)pInfoMsg;
H
hzcheng 已提交
2299
  pCmd->payloadLen = msgLen;
S
slguan 已提交
2300
  pCmd->msgType = TSDB_MSG_TYPE_TABLE_META;
H
hzcheng 已提交
2301 2302 2303 2304

  tfree(tmpData);

  assert(msgLen + minMsgSize() <= pCmd->allocSize);
2305
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2306 2307
}

S
slguan 已提交
2308 2309
/**
 *  multi meter meta req pkg format:
S
slguan 已提交
2310
 *  | SMgmtHead | SMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
2311 2312
 *      no used         4B
 **/
2313
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
    tmpData = calloc(1, pCmd->payloadLen + 1);
    if (NULL == tmpData) return -1;
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
S
slguan 已提交
2326
  memset(pMgmt->db, 0, TSDB_TABLE_ID_LEN);  // server don't need the db
S
slguan 已提交
2327

S
slguan 已提交
2328 2329
  SMultiTableInfoMsg *pInfoMsg = (SMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
2330 2331

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
2332
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
2333 2334 2335 2336
  }

  tfree(tmpData);

S
slguan 已提交
2337
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SMultiTableInfoMsg);
S
slguan 已提交
2338
  pCmd->msgType = TSDB_MSG_TYPE_MULTI_TABLE_META;
S
slguan 已提交
2339 2340 2341

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

S
slguan 已提交
2342
  tscTrace("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
2343 2344 2345 2346 2347
           pCmd->payloadLen);

  return pCmd->payloadLen;
}

H
hzcheng 已提交
2348 2349
static int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
  const int32_t defaultSize =
S
slguan 已提交
2350
      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
2351
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hzcheng 已提交
2352

S
slguan 已提交
2353
  int32_t n = 0;
2354 2355
  for (int32_t i = 0; i < pQueryInfo->tagCond.numOfTagCond; ++i) {
    n += strlen(pQueryInfo->tagCond.cond[i].cond);
H
hzcheng 已提交
2356
  }
S
slguan 已提交
2357

H
hjxilinx 已提交
2358
  int32_t tagLen = n * TSDB_NCHAR_SIZE;
2359 2360
  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
H
hjxilinx 已提交
2361
  }
2362

S
slguan 已提交
2363
  int32_t joinCondLen = (TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2;
S
slguan 已提交
2364
  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
H
hjxilinx 已提交
2365 2366
  
  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndexEx);
S
slguan 已提交
2367

H
hjxilinx 已提交
2368
  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
S
slguan 已提交
2369 2370

  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
H
hzcheng 已提交
2371 2372
}

2373
int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
2374
  SSuperTableMetaMsg *pMetaMsg;
H
hzcheng 已提交
2375 2376
  char *          pMsg, *pStart;
  int             msgLen = 0;
S
slguan 已提交
2377
  int             tableIndex = 0;
H
hzcheng 已提交
2378

2379 2380 2381
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

2382
  STagCond *pTagCond = &pQueryInfo->tagCond;
S
slguan 已提交
2383

2384
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, tableIndex);
H
hzcheng 已提交
2385 2386

  int32_t size = tscEstimateMetricMetaMsgSize(pCmd);
S
slguan 已提交
2387 2388 2389 2390
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for metric meter msg", pSql);
    return -1;
  }
H
hzcheng 已提交
2391 2392 2393 2394 2395

  pStart = pCmd->payload + tsRpcHeadSize;
  pMsg = pStart;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
S
slguan 已提交
2396
  tscGetDBInfoFromMeterId(pMeterMetaInfo->name, pMgmt->db);
H
hzcheng 已提交
2397 2398 2399

  pMsg += sizeof(SMgmtHead);

S
slguan 已提交
2400
  pMetaMsg = (SSuperTableMetaMsg *)pMsg;
S
slguan 已提交
2401
  pMetaMsg->numOfTables = htonl(pQueryInfo->numOfTables);
S
slguan 已提交
2402

S
slguan 已提交
2403
  pMsg += sizeof(SSuperTableMetaMsg);
S
slguan 已提交
2404 2405 2406 2407 2408

  int32_t offset = pMsg - (char *)pMetaMsg;
  pMetaMsg->join = htonl(offset);

  // todo refactor
S
slguan 已提交
2409
  pMetaMsg->joinCondLen = htonl((TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2);
H
hzcheng 已提交
2410

S
slguan 已提交
2411
  memcpy(pMsg, pTagCond->joinInfo.left.tableId, TSDB_TABLE_ID_LEN);
S
slguan 已提交
2412
  pMsg += TSDB_TABLE_ID_LEN;
H
hzcheng 已提交
2413

S
slguan 已提交
2414 2415 2416
  *(int16_t *)pMsg = pTagCond->joinInfo.left.tagCol;
  pMsg += sizeof(int16_t);

S
slguan 已提交
2417
  memcpy(pMsg, pTagCond->joinInfo.right.tableId, TSDB_TABLE_ID_LEN);
S
slguan 已提交
2418
  pMsg += TSDB_TABLE_ID_LEN;
S
slguan 已提交
2419 2420 2421 2422

  *(int16_t *)pMsg = pTagCond->joinInfo.right.tagCol;
  pMsg += sizeof(int16_t);

2423
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
2424
    pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, i);
S
slguan 已提交
2425 2426 2427 2428 2429
    uint64_t uid = pMeterMetaInfo->pMeterMeta->uid;

    offset = pMsg - (char *)pMetaMsg;
    pMetaMsg->metaElem[i] = htonl(offset);

S
slguan 已提交
2430 2431
    SSuperTableMetaElemMsg *pElem = (SSuperTableMetaElemMsg *)pMsg;
    pMsg += sizeof(SSuperTableMetaElemMsg);
S
slguan 已提交
2432 2433 2434 2435 2436

    // convert to unicode before sending to mnode for metric query
    int32_t condLen = 0;
    if (pTagCond->numOfTagCond > 0) {
      SCond *pCond = tsGetMetricQueryCondPos(pTagCond, uid);
H
hjxilinx 已提交
2437
      if (pCond != NULL && pCond->cond != NULL) {
H
hjxilinx 已提交
2438
        condLen = strlen(pCond->cond) + 1;
2439

H
hjxilinx 已提交
2440
        bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE);
S
slguan 已提交
2441 2442 2443 2444 2445
        if (!ret) {
          tscError("%p mbs to ucs4 failed:%s", pSql, tsGetMetricQueryCondPos(pTagCond, uid));
          return 0;
        }
      }
H
hzcheng 已提交
2446 2447
    }

S
slguan 已提交
2448
    pElem->condLen = htonl(condLen);
H
hzcheng 已提交
2449

S
slguan 已提交
2450 2451 2452
    offset = pMsg - (char *)pMetaMsg;
    pElem->cond = htonl(offset);
    pMsg += condLen * TSDB_NCHAR_SIZE;
H
hzcheng 已提交
2453

S
slguan 已提交
2454 2455 2456
    pElem->rel = htons(pTagCond->relType);
    if (pTagCond->tbnameCond.uid == uid) {
      offset = pMsg - (char *)pMetaMsg;
H
hzcheng 已提交
2457

S
slguan 已提交
2458
      pElem->tableCond = htonl(offset);
H
hjxilinx 已提交
2459 2460 2461 2462 2463 2464 2465
      
      uint32_t len = 0;
      if (pTagCond->tbnameCond.cond != NULL) {
        len = strlen(pTagCond->tbnameCond.cond);
        memcpy(pMsg, pTagCond->tbnameCond.cond, len);
      }
      
H
hjxilinx 已提交
2466 2467
      pElem->tableCondLen = htonl(len);
      pMsg += len;
S
slguan 已提交
2468 2469
    }

2470
    SSqlGroupbyExpr *pGroupby = &pQueryInfo->groupbyExpr;
S
slguan 已提交
2471

H
hjxilinx 已提交
2472
    if (pGroupby->tableIndex != i && pGroupby->numOfGroupCols > 0) {
S
slguan 已提交
2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487
      pElem->orderType = 0;
      pElem->orderIndex = 0;
      pElem->numOfGroupCols = 0;
    } else {
      pElem->numOfGroupCols = htons(pGroupby->numOfGroupCols);
      for (int32_t j = 0; j < pMeterMetaInfo->numOfTags; ++j) {
        pElem->tagCols[j] = htons(pMeterMetaInfo->tagColumnIndex[j]);
      }

      if (pGroupby->numOfGroupCols != 0) {
        pElem->orderIndex = htons(pGroupby->orderIndex);
        pElem->orderType = htons(pGroupby->orderType);
        offset = pMsg - (char *)pMetaMsg;

        pElem->groupbyTagColumnList = htonl(offset);
2488 2489
        for (int32_t j = 0; j < pQueryInfo->groupbyExpr.numOfGroupCols; ++j) {
          SColIndexEx *pCol = &pQueryInfo->groupbyExpr.columnInfo[j];
2490 2491
          SColIndexEx *pDestCol = (SColIndexEx *)pMsg;

H
hjxilinx 已提交
2492 2493 2494 2495
          pDestCol->colIdxInBuf = 0;
          pDestCol->colIdx = htons(pCol->colIdx);
          pDestCol->colId = htons(pDestCol->colId);
          pDestCol->flag = htons(pDestCol->flag);
H
hjxilinx 已提交
2496
          strncpy(pDestCol->name, pCol->name, tListLen(pCol->name));
2497

H
hjxilinx 已提交
2498
          pMsg += sizeof(SColIndexEx);
S
slguan 已提交
2499 2500
        }
      }
H
hzcheng 已提交
2501
    }
S
slguan 已提交
2502

S
slguan 已提交
2503
    strcpy(pElem->tableId, pMeterMetaInfo->name);
S
slguan 已提交
2504 2505 2506 2507
    pElem->numOfTags = htons(pMeterMetaInfo->numOfTags);

    int16_t len = pMsg - (char *)pElem;
    pElem->elemLen = htons(len);  // redundant data for integrate check
H
hzcheng 已提交
2508 2509 2510 2511
  }

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
S
slguan 已提交
2512
  pCmd->msgType = TSDB_MSG_TYPE_STABLE_META;
H
hzcheng 已提交
2513
  assert(msgLen + minMsgSize() <= size);
2514 2515
  
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
2516 2517
}

2518
int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) {
H
hzcheng 已提交
2519 2520 2521 2522
  int      size = 0;
  STscObj *pObj = pSql->pTscObj;

  size += tsRpcHeadSize + sizeof(SMgmtHead);
S
slguan 已提交
2523
  size += sizeof(SQqueryList);
H
hzcheng 已提交
2524 2525 2526

  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
S
slguan 已提交
2527
    size += sizeof(SQueryDesc);
H
hzcheng 已提交
2528 2529 2530
    tpSql = tpSql->next;
  }

S
slguan 已提交
2531
  size += sizeof(SStreamList);
H
hzcheng 已提交
2532 2533
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
S
slguan 已提交
2534
    size += sizeof(SStreamDesc);
H
hzcheng 已提交
2535 2536 2537 2538 2539 2540
    pStream = pStream->next;
  }

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

2541
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
2542 2543 2544 2545 2546 2547 2548 2549 2550
  char *pMsg, *pStart;
  int   msgLen = 0;
  int   size = 0;

  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;

  pthread_mutex_lock(&pObj->mutex);

2551
  size = tscEstimateHeartBeatMsgLength(pSql);
S
slguan 已提交
2552 2553 2554 2555
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for heartbeat msg", pSql);
    return -1;
  }
H
hzcheng 已提交
2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575

  pMsg = pCmd->payload + tsRpcHeadSize;
  pStart = pMsg;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
  strcpy(pMgmt->db, pObj->db);
  pMsg += sizeof(SMgmtHead);

  pMsg = tscBuildQueryStreamDesc(pMsg, pObj);
  pthread_mutex_unlock(&pObj->mutex);

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
  pCmd->msgType = TSDB_MSG_TYPE_HEARTBEAT;

  assert(msgLen + minMsgSize() <= size);
  return msgLen;
}

int tscProcessMeterMetaRsp(SSqlObj *pSql) {
S
slguan 已提交
2576
  STableMeta *pMeta;
S
slguan 已提交
2577
  SSchema *   pSchema;
H
hzcheng 已提交
2578

S
slguan 已提交
2579
  pMeta = (STableMeta *)pSql->res.pRsp;
H
hzcheng 已提交
2580 2581

  pMeta->sid = htonl(pMeta->sid);
S
slguan 已提交
2582
  pMeta->sversion = htons(pMeta->sversion);
H
hzcheng 已提交
2583 2584
  pMeta->vgid = htonl(pMeta->vgid);
  pMeta->uid = htobe64(pMeta->uid);
S
slguan 已提交
2585
  pMeta->contLen = htons(pMeta->contLen);
H
hzcheng 已提交
2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598

  if (pMeta->sid < 0 || pMeta->vgid < 0) {
    tscError("invalid meter vgid:%d, sid%d", pMeta->vgid, pMeta->sid);
    return TSDB_CODE_INVALID_VALUE;
  }

  pMeta->numOfColumns = htons(pMeta->numOfColumns);

  if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    tscError("invalid numOfTags:%d", pMeta->numOfTags);
    return TSDB_CODE_INVALID_VALUE;
  }

2599
  if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns <= 0) {
H
hzcheng 已提交
2600 2601 2602 2603 2604 2605 2606 2607 2608
    tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
    return TSDB_CODE_INVALID_VALUE;
  }

  for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
    pMeta->vpeerDesc[i].vnode = htonl(pMeta->vpeerDesc[i].vnode);
  }

  pMeta->rowSize = 0;
S
slguan 已提交
2609
  pSchema = (SSchema *)(pSql->res.pRsp + sizeof(STableMeta));
H
hzcheng 已提交
2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622

  int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);

    // ignore the tags length
    if (i < pMeta->numOfColumns) {
      pMeta->rowSize += pSchema->bytes;
    }
    pSchema++;
  }

S
slguan 已提交
2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635
//  rsp += numOfTotalCols * sizeof(SSchema);
//
//  int32_t  tagLen = 0;
//  SSchema *pTagsSchema = tsGetTagSchema(pMeta);
//
//  if (pMeta->tableType == TSDB_TABLE_TYPE_CHILD_TABLE) {
//    for (int32_t i = 0; i < pMeta->numOfTags; ++i) {
//      tagLen += pTagsSchema[i].bytes;
//    }
//  }
//
//  rsp += tagLen;
//  int32_t size = (int32_t)(rsp - (char *)pMeta);
H
hzcheng 已提交
2636 2637

  // pMeta->index = rand() % TSDB_VNODES_SUPPORT;
S
slguan 已提交
2638
//  pMeta->index = 0;
H
hzcheng 已提交
2639 2640

  // todo add one more function: taosAddDataIfNotExists();
2641
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
2642
  assert(pMeterMetaInfo->pMeterMeta == NULL);
H
hzcheng 已提交
2643

S
slguan 已提交
2644
  pMeterMetaInfo->pMeterMeta = (STableMeta *)taosAddDataIntoCache(tscCacheHandle, pMeterMetaInfo->name, (char *)pMeta,
S
slguan 已提交
2645
                                                                  pMeta->contLen, tsMeterMetaKeepTimer);
2646
  // todo handle out of memory case
S
slguan 已提交
2647
  if (pMeterMetaInfo->pMeterMeta == NULL) return 0;
H
hzcheng 已提交
2648 2649 2650 2651

  return TSDB_CODE_OTHERS;
}

S
slguan 已提交
2652 2653
/**
 *  multi meter meta rsp pkg format:
S
slguan 已提交
2654
 *  | STaosRsp | ieType | SMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
2655 2656 2657
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
S
slguan 已提交
2658
  SSchema *pSchema;
S
slguan 已提交
2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674
  uint8_t  ieType;
  int32_t  totalNum;
  int32_t  i;

  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
    pSql->res.code = TSDB_CODE_INVALID_IE;
    pSql->res.numOfTotal = 0;
    return TSDB_CODE_OTHERS;
  }

  rsp++;

S
slguan 已提交
2675 2676 2677
  SMultiTableInfoMsg *pInfo = (SMultiTableInfoMsg *)rsp;
  totalNum = htonl(pInfo->numOfTables);
  rsp += sizeof(SMultiTableInfoMsg);
S
slguan 已提交
2678 2679

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
2680
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
2681
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
    pMeta->vgid = htonl(pMeta->vgid);
    pMeta->uid = htobe64(pMeta->uid);

    if (pMeta->sid <= 0 || pMeta->vgid < 0) {
      tscError("invalid meter vgid:%d, sid%d", pMeta->vgid, pMeta->sid);
      pSql->res.code = TSDB_CODE_INVALID_VALUE;
      pSql->res.numOfTotal = i;
      return TSDB_CODE_OTHERS;
    }

    pMeta->numOfColumns = htons(pMeta->numOfColumns);

    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
      tscError("invalid tag value count:%d", pMeta->numOfTags);
      pSql->res.code = TSDB_CODE_INVALID_VALUE;
      pSql->res.numOfTotal = i;
      return TSDB_CODE_OTHERS;
    }

    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
      tscError("invalid numOfTags:%d", pMeta->numOfTags);
      pSql->res.code = TSDB_CODE_INVALID_VALUE;
      pSql->res.numOfTotal = i;
      return TSDB_CODE_OTHERS;
    }

    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
      pSql->res.code = TSDB_CODE_INVALID_VALUE;
      pSql->res.numOfTotal = i;
      return TSDB_CODE_OTHERS;
    }

    for (int j = 0; j < TSDB_VNODES_SUPPORT; ++j) {
      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    }

    pMeta->rowSize = 0;
S
slguan 已提交
2723
    rsp += sizeof(SMultiTableMeta);
S
slguan 已提交
2724
    pSchema = (SSchema *)rsp;
S
slguan 已提交
2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737

    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    for (int j = 0; j < numOfTotalCols; ++j) {
      pSchema->bytes = htons(pSchema->bytes);
      pSchema->colId = htons(pSchema->colId);

      // ignore the tags length
      if (j < pMeta->numOfColumns) {
        pMeta->rowSize += pSchema->bytes;
      }
      pSchema++;
    }

S
slguan 已提交
2738
    rsp += numOfTotalCols * sizeof(SSchema);
S
slguan 已提交
2739 2740

    int32_t  tagLen = 0;
S
slguan 已提交
2741
    SSchema *pTagsSchema = tsGetTagSchema(pMeta);
S
slguan 已提交
2742

S
#1177  
slguan 已提交
2743
    if (pMeta->tableType == TSDB_TABLE_TYPE_CHILD_TABLE) {
S
slguan 已提交
2744 2745 2746 2747 2748 2749
      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
        tagLen += pTagsSchema[j].bytes;
      }
    }

    rsp += tagLen;
S
slguan 已提交
2750
    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
S
slguan 已提交
2751 2752

    pMeta->index = 0;
S
slguan 已提交
2753
    (void)taosAddDataIntoCache(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsMeterMetaKeepTimer);
S
slguan 已提交
2754 2755 2756 2757 2758 2759 2760 2761
  }

  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
  tscTrace("%p load multi-metermeta resp complete num:%d", pSql, pSql->res.numOfTotal);
  return TSDB_CODE_SUCCESS;
}

H
hzcheng 已提交
2762
int tscProcessMetricMetaRsp(SSqlObj *pSql) {
S
slguan 已提交
2763
  SSuperTableMeta *pMeta;
H
hzcheng 已提交
2764
  uint8_t      ieType;
S
slguan 已提交
2765 2766 2767 2768
  void **      metricMetaList = NULL;
  int32_t *    sizes = NULL;

  char *rsp = pSql->res.pRsp;
H
hzcheng 已提交
2769 2770 2771 2772 2773 2774 2775 2776 2777

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
    return TSDB_CODE_INVALID_IE;
  }

  rsp++;

S
slguan 已提交
2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793
  int32_t num = htons(*(int16_t *)rsp);
  rsp += sizeof(int16_t);

  metricMetaList = calloc(1, POINTER_BYTES * num);
  sizes = calloc(1, sizeof(int32_t) * num);

  // return with error code
  if (metricMetaList == NULL || sizes == NULL) {
    tfree(metricMetaList);
    tfree(sizes);
    pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;

    return pSql->res.code;
  }

  for (int32_t k = 0; k < num; ++k) {
S
slguan 已提交
2794
    pMeta = (SSuperTableMeta *)rsp;
S
slguan 已提交
2795 2796

    size_t size = (size_t)pSql->res.rspLen - 1;
S
slguan 已提交
2797
    rsp = rsp + sizeof(SSuperTableMeta);
S
slguan 已提交
2798

S
slguan 已提交
2799
    pMeta->numOfTables = htonl(pMeta->numOfTables);
S
slguan 已提交
2800 2801 2802
    pMeta->numOfVnodes = htonl(pMeta->numOfVnodes);
    pMeta->tagLen = htons(pMeta->tagLen);

S
slguan 已提交
2803
    size += pMeta->numOfVnodes * sizeof(SVnodeSidList *) + pMeta->numOfTables * sizeof(STableSidExtInfo *);
H
hzcheng 已提交
2804

2805 2806
    char *pBuf = calloc(1, size);
    if (pBuf == NULL) {
S
slguan 已提交
2807 2808 2809
      pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      goto _error_clean;
    }
H
hzcheng 已提交
2810

S
slguan 已提交
2811
    SSuperTableMeta *pNewMetricMeta = (SSuperTableMeta *)pBuf;
S
slguan 已提交
2812
    metricMetaList[k] = pNewMetricMeta;
H
hzcheng 已提交
2813

S
slguan 已提交
2814
    pNewMetricMeta->numOfTables = pMeta->numOfTables;
S
slguan 已提交
2815 2816
    pNewMetricMeta->numOfVnodes = pMeta->numOfVnodes;
    pNewMetricMeta->tagLen = pMeta->tagLen;
H
hzcheng 已提交
2817

S
slguan 已提交
2818
    pBuf = pBuf + sizeof(SSuperTableMeta) + pNewMetricMeta->numOfVnodes * sizeof(SVnodeSidList *);
H
hzcheng 已提交
2819

S
slguan 已提交
2820 2821
    for (int32_t i = 0; i < pMeta->numOfVnodes; ++i) {
      SVnodeSidList *pSidLists = (SVnodeSidList *)rsp;
2822
      memcpy(pBuf, pSidLists, sizeof(SVnodeSidList));
H
hzcheng 已提交
2823

2824 2825
      pNewMetricMeta->list[i] = pBuf - (char *)pNewMetricMeta;  // offset value
      SVnodeSidList *pLists = (SVnodeSidList *)pBuf;
H
hzcheng 已提交
2826

S
slguan 已提交
2827
      tscTrace("%p metricmeta:vid:%d,numOfTables:%d", pSql, i, pLists->numOfSids);
H
hzcheng 已提交
2828

S
slguan 已提交
2829
      pBuf += sizeof(SVnodeSidList) + sizeof(STableSidExtInfo *) * pSidLists->numOfSids;
S
slguan 已提交
2830
      rsp += sizeof(SVnodeSidList);
H
hzcheng 已提交
2831

S
slguan 已提交
2832
      size_t elemSize = sizeof(STableSidExtInfo) + pNewMetricMeta->tagLen;
S
slguan 已提交
2833
      for (int32_t j = 0; j < pSidLists->numOfSids; ++j) {
2834 2835
        pLists->pSidExtInfoList[j] = pBuf - (char *)pLists;
        memcpy(pBuf, rsp, elemSize);
2836

S
slguan 已提交
2837 2838
        ((STableSidExtInfo *)pBuf)->uid = htobe64(((STableSidExtInfo *)pBuf)->uid);
        ((STableSidExtInfo *)pBuf)->sid = htonl(((STableSidExtInfo *)pBuf)->sid);
2839

2840 2841
        rsp += elemSize;
        pBuf += elemSize;
S
slguan 已提交
2842
      }
H
hzcheng 已提交
2843
    }
S
slguan 已提交
2844

2845
    sizes[k] = pBuf - (char *)pNewMetricMeta;
H
hzcheng 已提交
2846 2847
  }

2848
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
S
slguan 已提交
2849 2850 2851
  for (int32_t i = 0; i < num; ++i) {
    char name[TSDB_MAX_TAGS_LEN + 1] = {0};

2852
    SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
H
hjxilinx 已提交
2853
    tscGetMetricMetaCacheKey(pQueryInfo, name, pMeterMetaInfo->pMeterMeta->uid);
H
hzcheng 已提交
2854

S
slguan 已提交
2855 2856 2857
#ifdef _DEBUG_VIEW
    printf("generate the metric key:%s, index:%d\n", name, i);
#endif
H
hzcheng 已提交
2858

S
slguan 已提交
2859 2860
    // release the used metricmeta
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
H
hzcheng 已提交
2861

S
slguan 已提交
2862
    pMeterMetaInfo->pMetricMeta = (SSuperTableMeta *)taosAddDataIntoCache(tscCacheHandle, name, (char *)metricMetaList[i],
S
slguan 已提交
2863 2864 2865 2866 2867 2868 2869 2870
                                                                      sizes[i], tsMetricMetaKeepTimer);
    tfree(metricMetaList[i]);

    // failed to put into cache
    if (pMeterMetaInfo->pMetricMeta == NULL) {
      pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      goto _error_clean;
    }
H
hzcheng 已提交
2871 2872
  }

S
slguan 已提交
2873 2874 2875 2876 2877 2878 2879 2880 2881 2882
_error_clean:
  // free allocated resource
  for (int32_t i = 0; i < num; ++i) {
    tfree(metricMetaList[i]);
  }

  free(sizes);
  free(metricMetaList);

  return pSql->res.code;
H
hzcheng 已提交
2883 2884 2885 2886 2887 2888
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
S
slguan 已提交
2889
  STableMeta * pMeta;
S
slguan 已提交
2890 2891
  SShowRsp *pShow;
  SSchema *    pSchema;
H
hzcheng 已提交
2892 2893
  char         key[20];

2894 2895 2896 2897 2898
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);  //?

2899
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
2900

S
slguan 已提交
2901
  pShow = (SShowRsp *)pRes->pRsp;
S
slguan 已提交
2902
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
2903 2904
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
2905
  tscResetForNextRetrieve(pRes);
S
slguan 已提交
2906
  pMeta = &(pShow->tableMeta);
H
hzcheng 已提交
2907 2908 2909

  pMeta->numOfColumns = ntohs(pMeta->numOfColumns);

S
slguan 已提交
2910
  pSchema = (SSchema *)((char *)pMeta + sizeof(STableMeta));
H
hzcheng 已提交
2911 2912 2913 2914 2915 2916
  pMeta->sid = ntohs(pMeta->sid);
  for (int i = 0; i < pMeta->numOfColumns; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

2917
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
2918 2919
  strcpy(key + 1, "showlist");

S
slguan 已提交
2920
  taosRemoveDataFromCache(tscCacheHandle, (void *)&(pMeterMetaInfo->pMeterMeta), false);
H
hzcheng 已提交
2921

S
slguan 已提交
2922
  int32_t size = pMeta->numOfColumns * sizeof(SSchema) + sizeof(STableMeta);
S
slguan 已提交
2923
  pMeterMetaInfo->pMeterMeta =
S
slguan 已提交
2924
      (STableMeta *)taosAddDataIntoCache(tscCacheHandle, key, (char *)pMeta, size, tsMeterMetaKeepTimer);
2925
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutputCols;
S
slguan 已提交
2926
  SSchema *pMeterSchema = tsGetSchema(pMeterMetaInfo->pMeterMeta);
H
hzcheng 已提交
2927

2928
  tscColumnBaseInfoReserve(&pQueryInfo->colList, pMeta->numOfColumns);
S
slguan 已提交
2929 2930 2931 2932
  SColumnIndex index = {0};

  for (int16_t i = 0; i < pMeta->numOfColumns; ++i) {
    index.columnIndex = i;
2933 2934
    tscColumnBaseInfoInsert(pQueryInfo, &index);
    tscFieldInfoSetValFromSchema(&pQueryInfo->fieldsInfo, i, &pMeterSchema[i]);
H
hjxilinx 已提交
2935 2936 2937
    
    pQueryInfo->fieldsInfo.pSqlExpr[i] = tscSqlExprInsert(pQueryInfo, i, TSDB_FUNC_TS_DUMMY, &index,
                     pMeterSchema[i].type, pMeterSchema[i].bytes, pMeterSchema[i].bytes);
H
hzcheng 已提交
2938 2939
  }

2940
  tscFieldInfoCalOffset(pQueryInfo);
H
hzcheng 已提交
2941 2942 2943 2944
  return 0;
}

int tscProcessConnectRsp(SSqlObj *pSql) {
S
slguan 已提交
2945
  char temp[TSDB_TABLE_ID_LEN * 2];
H
hzcheng 已提交
2946 2947 2948
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

S
slguan 已提交
2949
  SConnectRsp *pConnect = (SConnectRsp *)pRes->pRsp;
H
hzcheng 已提交
2950
  strcpy(pObj->acctId, pConnect->acctId);  // copy acctId from response
2951 2952
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

H
hjxilinx 已提交
2953 2954 2955
  assert(len <= tListLen(pObj->db));
  strncpy(pObj->db, temp, tListLen(pObj->db));
  
S
slguan 已提交
2956
//  SIpList *    pIpList;
S
slguan 已提交
2957
//  char *rsp = pRes->pRsp + sizeof(SConnectRsp);
S
slguan 已提交
2958 2959
//  pIpList = (SIpList *)rsp;
//  tscSetMgmtIpList(pIpList);
H
hzcheng 已提交
2960

S
slguan 已提交
2961
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
2962 2963 2964 2965 2966 2967 2968 2969
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
2970
  STscObj *       pObj = pSql->pTscObj;
2971
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
S
slguan 已提交
2972 2973

  strcpy(pObj->db, pMeterMetaInfo->name);
H
hzcheng 已提交
2974 2975 2976 2977 2978 2979 2980 2981 2982
  return 0;
}

int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
  taosClearDataCache(tscCacheHandle);
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
2983
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
S
slguan 已提交
2984

S
slguan 已提交
2985
  STableMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name);
H
hzcheng 已提交
2986 2987 2988 2989 2990 2991 2992 2993 2994
  if (pMeterMeta == NULL) {
    /* not in cache, abort */
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
2995 2996
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
2997
   */
S
slguan 已提交
2998
  tscTrace("%p force release metermeta after drop table:%s", pSql, pMeterMetaInfo->name);
H
hzcheng 已提交
2999 3000
  taosRemoveDataFromCache(tscCacheHandle, (void **)&pMeterMeta, true);

S
slguan 已提交
3001 3002 3003
  if (pMeterMetaInfo->pMeterMeta) {
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), true);
H
hzcheng 已提交
3004 3005 3006 3007 3008 3009
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
3010
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
S
slguan 已提交
3011

S
slguan 已提交
3012
  STableMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name);
H
hzcheng 已提交
3013 3014 3015 3016
  if (pMeterMeta == NULL) { /* not in cache, abort */
    return 0;
  }

S
slguan 已提交
3017
  tscTrace("%p force release metermeta in cache after alter-table: %s", pSql, pMeterMetaInfo->name);
H
hzcheng 已提交
3018 3019
  taosRemoveDataFromCache(tscCacheHandle, (void **)&pMeterMeta, true);

S
slguan 已提交
3020
  if (pMeterMetaInfo->pMeterMeta) {
3021
    bool isSuperTable = UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo);
H
hzcheng 已提交
3022

S
slguan 已提交
3023 3024
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), true);
H
hzcheng 已提交
3025

3026
    if (isSuperTable) {  // if it is a super table, reset whole query cache
S
slguan 已提交
3027
      tscTrace("%p reset query cache since table:%s is stable", pSql, pMeterMetaInfo->name);
H
hzcheng 已提交
3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042
      taosClearDataCache(tscCacheHandle);
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

3043
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
3044 3045 3046
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
3047
  pRes->data = NULL;
S
slguan 已提交
3048
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
3049 3050 3051 3052
  return 0;
}

int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
S
slguan 已提交
3053 3054 3055
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
3056
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
3057 3058 3059 3060

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
  pRes->offset = htobe64(pRetrieve->offset);
S
slguan 已提交
3061
  pRes->useconds = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
3062
  pRes->data = pRetrieve->data;
H
hjxilinx 已提交
3063
  
3064 3065
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  tscSetResultPointer(pQueryInfo, pRes);
3066

weixin_48148422's avatar
weixin_48148422 已提交
3067
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
3068 3069 3070 3071 3072
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutputCols;
    
    TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, numOfCols - 1);
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
3073 3074
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
3075
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
3076
    p += sizeof(int32_t);
S
slguan 已提交
3077
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
3078 3079 3080 3081
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
3082
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
3083
    }
3084 3085
  }

H
hzcheng 已提交
3086 3087
  pRes->row = 0;

S
slguan 已提交
3088
  tscTrace("%p numOfRows:%d, offset:%d", pSql, pRes->numOfRows, pRes->offset);
H
hzcheng 已提交
3089 3090 3091 3092 3093

  return 0;
}

int tscProcessRetrieveRspFromLocal(SSqlObj *pSql) {
3094 3095
  SSqlRes *   pRes = &pSql->res;
  SSqlCmd *   pCmd = &pSql->cmd;
3096
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
3097

S
slguan 已提交
3098
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
3099 3100 3101

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->data = pRetrieve->data;
3102

3103
  tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
3104 3105 3106 3107 3108 3109
  pRes->row = 0;
  return 0;
}

void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code);

3110
static int32_t doGetMeterMetaFromServer(SSqlObj *pSql, SMeterMetaInfo *pMeterMetaInfo) {
H
hzcheng 已提交
3111
  int32_t code = TSDB_CODE_SUCCESS;
3112

S
slguan 已提交
3113 3114 3115 3116 3117
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
    tscError("%p malloc failed for new sqlobj to get meter meta", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
3118

H
hzcheng 已提交
3119 3120 3121
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
3122

3123
  tscAddSubqueryInfo(&pNew->cmd);
3124 3125 3126 3127 3128

  SQueryInfo *pNewQueryInfo = NULL;
  tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo);

  pNew->cmd.createOnDemand = pSql->cmd.createOnDemand;  // create table if not exists
S
slguan 已提交
3129 3130 3131
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
    tscError("%p malloc failed for payload to get meter meta", pSql);
    free(pNew);
3132

S
slguan 已提交
3133 3134 3135
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

3136 3137
  SMeterMetaInfo *pNewMeterMetaInfo = tscAddEmptyMeterMetaInfo(pNewQueryInfo);
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
3138

3139 3140
  strcpy(pNewMeterMetaInfo->name, pMeterMetaInfo->name);
  memcpy(pNew->cmd.payload, pSql->cmd.payload, TSDB_DEFAULT_PAYLOAD_SIZE);  // tag information if table does not exists.
S
slguan 已提交
3141
  tscTrace("%p new pSqlObj:%p to get tableMeta", pSql, pNew);
H
hzcheng 已提交
3142 3143

  if (pSql->fp == NULL) {
S
slguan 已提交
3144 3145
    tsem_init(&pNew->rspSem, 0, 0);
    tsem_init(&pNew->emptyRspSem, 0, 1);
H
hzcheng 已提交
3146 3147

    code = tscProcessSql(pNew);
S
slguan 已提交
3148

3149 3150 3151 3152
    /*
     * Update cache only on succeeding in getting metermeta.
     * Transfer the ownership of metermeta to the new object, instead of invoking the release/acquire routine
     */
H
hzcheng 已提交
3153
    if (code == TSDB_CODE_SUCCESS) {
3154
      pMeterMetaInfo->pMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**) &pNewMeterMetaInfo->pMeterMeta);
3155
      assert(pMeterMetaInfo->pMeterMeta != NULL);
H
hzcheng 已提交
3156 3157
    }

3158
    tscTrace("%p get meter meta complete, code:%d, pMeterMeta:%p", pSql, code, pMeterMetaInfo->pMeterMeta);
H
hzcheng 已提交
3159 3160 3161 3162 3163
    tscFreeSqlObj(pNew);

  } else {
    pNew->fp = tscMeterMetaCallBack;
    pNew->param = pSql;
3164
    pNew->sqlstr = strdup(pSql->sqlstr);
H
hzcheng 已提交
3165 3166 3167 3168 3169 3170 3171 3172 3173 3174

    code = tscProcessSql(pNew);
    if (code == TSDB_CODE_SUCCESS) {
      code = TSDB_CODE_ACTION_IN_PROGRESS;
    }
  }

  return code;
}

3175 3176
int tscGetMeterMeta(SSqlObj *pSql, SMeterMetaInfo *pMeterMetaInfo) {
  assert(strlen(pMeterMetaInfo->name) != 0);
S
slguan 已提交
3177

3178 3179 3180 3181 3182
  // If this SMeterMetaInfo owns a metermeta, release it first
  if (pMeterMetaInfo->pMeterMeta != NULL) {
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), false);
  }
  
S
slguan 已提交
3183
  pMeterMetaInfo->pMeterMeta = (STableMeta *)taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name);
S
slguan 已提交
3184
  if (pMeterMetaInfo->pMeterMeta != NULL) {
S
slguan 已提交
3185
    STableMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
S
slguan 已提交
3186

S
slguan 已提交
3187
    tscTrace("%p retrieve tableMeta from cache, the number of columns:%d, numOfTags:%d", pSql, pMeterMeta->numOfColumns,
S
slguan 已提交
3188
             pMeterMeta->numOfTags);
H
hzcheng 已提交
3189 3190 3191 3192 3193 3194 3195 3196

    return TSDB_CODE_SUCCESS;
  }

  /*
   * for async insert operation, release data block buffer before issue new object to get metermeta
   * because in metermeta callback function, the tscParse function will generate the submit data blocks
   */
3197
  return doGetMeterMetaFromServer(pSql, pMeterMetaInfo);
H
hzcheng 已提交
3198 3199
}

3200 3201 3202
int tscGetMeterMetaEx(SSqlObj *pSql, SMeterMetaInfo *pMeterMetaInfo, bool createIfNotExists) {
  pSql->cmd.createOnDemand = createIfNotExists;
  return tscGetMeterMeta(pSql, pMeterMetaInfo);
H
hzcheng 已提交
3203 3204 3205 3206
}

/*
 * in handling the renew metermeta problem during insertion,
S
slguan 已提交
3207
 *
H
hzcheng 已提交
3208 3209 3210 3211 3212
 * If the meter is created on demand during insertion, the routine usually waits for a short
 * period to re-issue the getMeterMeta msg, in which makes a greater change that vnode has
 * successfully created the corresponding table.
 */
static void tscWaitingForCreateTable(SSqlCmd *pCmd) {
S
slguan 已提交
3213
  if (pCmd->command == TSDB_SQL_INSERT) {
H
hzcheng 已提交
3214 3215 3216 3217 3218 3219 3220
    taosMsleep(50);  // todo: global config
  }
}

/**
 * in renew metermeta, do not retrieve metadata in cache.
 * @param pSql          sql object
S
slguan 已提交
3221
 * @param tableId       meter id
H
hzcheng 已提交
3222 3223
 * @return              status code
 */
S
slguan 已提交
3224
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) {
3225 3226
  int code = 0;

H
hzcheng 已提交
3227 3228
  // handle metric meta renew process
  SSqlCmd *pCmd = &pSql->cmd;
3229 3230

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
3231
  SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hzcheng 已提交
3232 3233 3234 3235 3236

  // enforce the renew metermeta operation in async model
  if (pSql->fp == NULL) pSql->fp = (void *)0x1;

  /*
S
slguan 已提交
3237
   * 1. only update the metermeta in force model metricmeta is not updated
H
hzcheng 已提交
3238 3239
   * 2. if get metermeta failed, still get the metermeta
   */
S
slguan 已提交
3240 3241
  if (pMeterMetaInfo->pMeterMeta == NULL || !tscQueryOnMetric(pCmd)) {
    if (pMeterMetaInfo->pMeterMeta) {
H
hjxilinx 已提交
3242
      tscTrace("%p update meter meta, old: numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
S
slguan 已提交
3243
               pMeterMetaInfo->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid, pMeterMetaInfo->pMeterMeta);
H
hzcheng 已提交
3244
    }
3245

3246
    tscWaitingForCreateTable(pCmd);
S
slguan 已提交
3247
    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
H
hzcheng 已提交
3248

3249
    code = doGetMeterMetaFromServer(pSql, pMeterMetaInfo);  // todo ??
H
hzcheng 已提交
3250
  } else {
H
hjxilinx 已提交
3251
    tscTrace("%p metric query not update metric meta, numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
S
slguan 已提交
3252 3253
             pMeterMetaInfo->pMeterMeta->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid,
             pMeterMetaInfo->pMeterMeta);
H
hzcheng 已提交
3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264
  }

  if (code != TSDB_CODE_ACTION_IN_PROGRESS) {
    if (pSql->fp == (void *)0x1) {
      pSql->fp = NULL;
    }
  }

  return code;
}

3265
int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) {
S
slguan 已提交
3266 3267
  int      code = TSDB_CODE_NETWORK_UNAVAIL;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
3268 3269

  /*
3270
   * the query condition is serialized into pCmd->payload, we need to rebuild key for metricmeta info in cache.
H
hzcheng 已提交
3271
   */
3272
  bool    required = false;
3273

3274
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
3275
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
S
slguan 已提交
3276 3277
    char tagstr[TSDB_MAX_TAGS_LEN + 1] = {0};

3278
    SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
H
hjxilinx 已提交
3279
    tscGetMetricMetaCacheKey(pQueryInfo, tagstr, pMeterMetaInfo->pMeterMeta->uid);
S
slguan 已提交
3280 3281 3282

    taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);

S
slguan 已提交
3283
    SSuperTableMeta *ppMeta = (SSuperTableMeta *)taosGetDataFromCache(tscCacheHandle, tagstr);
S
slguan 已提交
3284
    if (ppMeta == NULL) {
3285
      required = true;
S
slguan 已提交
3286 3287 3288 3289 3290
      break;
    } else {
      pMeterMetaInfo->pMetricMeta = ppMeta;
    }
  }
H
hzcheng 已提交
3291

3292 3293
  // all metricmeta for one clause are retrieved from cache, no need to retrieve metricmeta from management node
  if (!required) {
H
hzcheng 已提交
3294 3295 3296
    return TSDB_CODE_SUCCESS;
  }

S
slguan 已提交
3297
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
3298 3299 3300 3301
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

  pNew->cmd.command = TSDB_SQL_METRIC;
3302 3303
  
  SQueryInfo *pNewQueryInfo = NULL;
3304 3305 3306
  if ((code = tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo)) != TSDB_CODE_SUCCESS) {
    return code;
  }
3307
  
3308
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
3309
    SMeterMetaInfo *pMMInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
3310

S
slguan 已提交
3311
    STableMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMMInfo->name);
3312
    tscAddMeterMetaInfo(pNewQueryInfo, pMMInfo->name, pMeterMeta, NULL, pMMInfo->numOfTags, pMMInfo->tagColumnIndex);
S
slguan 已提交
3313 3314 3315 3316 3317 3318
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
3319

3320
  tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond);
H
hzcheng 已提交
3321

3322 3323
  pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr;
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
H
hzcheng 已提交
3324

3325 3326
  pNewQueryInfo->slimit = pQueryInfo->slimit;
  pNewQueryInfo->order = pQueryInfo->order;
H
hjxilinx 已提交
3327 3328 3329 3330 3331
  
  STagCond* pTagCond = &pNewQueryInfo->tagCond;
  tscTrace("%p new sqlobj:%p info, numOfTables:%d, slimit:%" PRId64 ", soffset:%" PRId64 ", order:%d, tbname cond:%s",
      pSql, pNew, pNewQueryInfo->numOfTables, pNewQueryInfo->slimit.limit, pNewQueryInfo->slimit.offset,
      pNewQueryInfo->order.order, pTagCond->tbnameCond.cond)
H
hzcheng 已提交
3332

3333 3334 3335 3336
//  if (pSql->fp != NULL && pSql->pStream == NULL) {
//    pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
//    tscFreeSubqueryInfo(pCmd);
//  }
H
hzcheng 已提交
3337 3338 3339

  tscTrace("%p allocate new pSqlObj:%p to get metricMeta", pSql, pNew);
  if (pSql->fp == NULL) {
S
slguan 已提交
3340 3341
    tsem_init(&pNew->rspSem, 0, 0);
    tsem_init(&pNew->emptyRspSem, 0, 1);
H
hzcheng 已提交
3342 3343

    code = tscProcessSql(pNew);
S
slguan 已提交
3344

3345 3346 3347 3348 3349
    if (code == TSDB_CODE_SUCCESS) {//todo optimize the performance
      for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
        char tagstr[TSDB_MAX_TAGS_LEN] = {0};
    
        SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
H
hjxilinx 已提交
3350
        tscGetMetricMetaCacheKey(pQueryInfo, tagstr, pMeterMetaInfo->pMeterMeta->uid);
S
slguan 已提交
3351 3352

#ifdef _DEBUG_VIEW
3353
        printf("create metric key:%s, index:%d\n", tagstr, i);
S
slguan 已提交
3354
#endif
3355 3356
    
        taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
S
slguan 已提交
3357
        pMeterMetaInfo->pMetricMeta = (SSuperTableMeta *)taosGetDataFromCache(tscCacheHandle, tagstr);
3358
      }
S
slguan 已提交
3359 3360
    }

H
hzcheng 已提交
3361 3362 3363 3364 3365 3366 3367 3368 3369 3370 3371 3372 3373 3374
    tscFreeSqlObj(pNew);
  } else {
    pNew->fp = tscMeterMetaCallBack;
    pNew->param = pSql;
    code = tscProcessSql(pNew);
    if (code == TSDB_CODE_SUCCESS) {
      code = TSDB_CODE_ACTION_IN_PROGRESS;
    }
  }

  return code;
}

void tscInitMsgs() {
S
slguan 已提交
3375 3376 3377
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildRetrieveMsg;
H
hzcheng 已提交
3378 3379

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
3380
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
3381

3382 3383
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
3384 3385

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
3386
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropAcctMsg;
H
hzcheng 已提交
3387 3388 3389
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
3390
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
3391 3392 3393
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
3394 3395 3396 3397 3398 3399 3400
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
  tscBuildMsg[TSDB_SQL_META] = tscBuildMeterMetaMsg;
  tscBuildMsg[TSDB_SQL_METRIC] = tscBuildMetricMetaMsg;
S
slguan 已提交
3401
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
3402 3403 3404 3405

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
3406 3407 3408
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
3409 3410 3411 3412 3413 3414 3415 3416 3417 3418

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromVnode;

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessMeterMetaRsp;
  tscProcessMsgRsp[TSDB_SQL_METRIC] = tscProcessMetricMetaRsp;
S
slguan 已提交
3419
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
3420 3421

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
3422
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromVnode;  // rsp handled by same function.
H
hzcheng 已提交
3423
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
3424

H
hzcheng 已提交
3425
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_TAGS] = tscProcessTagRetrieveRsp;
H
hjxilinx 已提交
3426 3427 3428 3429 3430
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION] = tscProcessTagRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS] = tscProcessTagRetrieveRsp;
3431

H
hzcheng 已提交
3432 3433 3434 3435 3436 3437 3438 3439 3440 3441 3442 3443 3444 3445 3446 3447
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_METRIC] = tscProcessRetrieveMetricRsp;

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;

  tscUpdateVnodeMsg[TSDB_SQL_SELECT] = tscUpdateVnodeInQueryMsg;
  tscUpdateVnodeMsg[TSDB_SQL_INSERT] = tscUpdateVnodeInSubmitMsg;
}