tscServer.c 81.2 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
17
#include "qsqltype.h"
H
hzcheng 已提交
18 19
#include "tcache.h"
#include "trpc.h"
20 21
#include "tscLocalMerge.h"
#include "tscLog.h"
H
hzcheng 已提交
22 23 24 25 26 27 28 29 30 31
#include "tscProfile.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"

#define TSC_MGMT_VNODE 999

S
slguan 已提交
32
SRpcIpSet  tscMgmtIpSet;
S
slguan 已提交
33 34
SRpcIpSet  tscDnodeIpSet;

35 36
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
37 38 39
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
40

B
Bomin Zhang 已提交
41
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt);
42 43
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
44

S
slguan 已提交
45
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
hzcheng 已提交
46

47
static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
48 49
  SRpcIpSet* pIpList = &pSql->ipList;
  
50
  pIpList->numOfIps = pVgroupInfo->numOfIps;
51 52
  pIpList->inUse    = 0;
  
53 54 55
  for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) {
    strcpy(pIpList->fqdn[i], pVgroupInfo->ipAddr[i].fqdn);
    pIpList->port[i] = pVgroupInfo->ipAddr[i].port;
56 57 58
  }
}

S
slguan 已提交
59
void tscPrintMgmtIp() {
S
slguan 已提交
60
  if (tscMgmtIpSet.numOfIps <= 0) {
S
Shengliang Guan 已提交
61
    tscError("invalid mnode IP list:%d", tscMgmtIpSet.numOfIps);
S
slguan 已提交
62
  } else {
S
slguan 已提交
63
    for (int i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
S
Shengliang Guan 已提交
64
      tscTrace("mnode index:%d %s:%d", i, tscMgmtIpSet.fqdn[i], tscMgmtIpSet.port[i]);
S
slguan 已提交
65
    }
S
slguan 已提交
66 67 68
  }
}

69
void tscSetMgmtIpList(SRpcIpSet *pIpList) {
S
slguan 已提交
70 71 72
  tscMgmtIpSet.numOfIps = pIpList->numOfIps;
  tscMgmtIpSet.inUse = pIpList->inUse;
  for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
J
jtao1735 已提交
73
    tscMgmtIpSet.port[i] = htons(pIpList->port[i]);
S
slguan 已提交
74 75 76
  }
}

S
slguan 已提交
77
void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) {
S
slguan 已提交
78
  tscMgmtIpSet = *pIpSet;
S
Shengliang Guan 已提交
79
  tscTrace("mnode IP list is changed for ufp is called, numOfIps:%d inUse:%d", tscMgmtIpSet.numOfIps, tscMgmtIpSet.inUse);
S
slguan 已提交
80 81 82
  for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
    tscTrace("index:%d fqdn:%s port:%d", i, tscMgmtIpSet.fqdn[i], tscMgmtIpSet.port[i]);
  }
S
slguan 已提交
83 84
}

H
hjxilinx 已提交
85 86 87 88 89 90 91
/*
 * For each management node, try twice at least in case of poor network situation.
 * If the client start to connect to a non-management node from the client, and the first retry may fail due to
 * the poor network quality. And then, the second retry get the response with redirection command.
 * The retry will not be executed since only *two* retry is allowed in case of single management node in the cluster.
 * Therefore, we need to multiply the retry times by factor of 2 to fix this problem.
 */
92
UNUSED_FUNC
H
hjxilinx 已提交
93 94
static int32_t tscGetMgmtConnMaxRetryTimes() {
  int32_t factor = 2;
S
slguan 已提交
95
  return tscMgmtIpSet.numOfIps * factor;
H
hjxilinx 已提交
96 97
}

H
hzcheng 已提交
98 99 100 101 102 103 104 105 106 107 108 109
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
  if (pObj != pObj->signature) {
    tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
    return;
  }

  SSqlObj *pSql = pObj->pHb;
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
110
    SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
S
slguan 已提交
111
    SRpcIpSet *      pIpList = &pRsp->ipList;
112 113
    if (pIpList->numOfIps > 0) 
      tscSetMgmtIpList(pIpList);
S
slguan 已提交
114

S
Shengliang Guan 已提交
115 116
    pSql->pTscObj->connId = htonl(pRsp->connId);

H
hzcheng 已提交
117 118 119
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
    } else {
S
slguan 已提交
120 121
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
122 123
    }
  } else {
H
hjxilinx 已提交
124
    tscTrace("heart beat failed, code:%s", tstrerror(code));
H
hzcheng 已提交
125 126 127 128 129 130 131 132 133 134 135 136 137
  }

  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;

  if (pObj == NULL) return;
  if (pObj->signature != pObj) return;
  if (pObj->pTimer != tmrId) return;

  if (pObj->pHb == NULL) {
S
slguan 已提交
138 139 140
    SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
    if (NULL == pSql) return;

H
hzcheng 已提交
141
    pSql->fp = tscProcessHeartBeatRsp;
H
hjxilinx 已提交
142
    
143 144 145 146
    SQueryInfo *pQueryInfo = NULL;
    tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
    pQueryInfo->command = TSDB_SQL_HB;
    
147
    pSql->cmd.command = TSDB_SQL_HB;
S
slguan 已提交
148 149 150 151 152
    if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
      tfree(pSql);
      return;
    }

153
    pSql->cmd.command = TSDB_SQL_HB;
H
hzcheng 已提交
154 155 156 157
    pSql->param = pObj;
    pSql->pTscObj = pObj;
    pSql->signature = pSql;
    pObj->pHb = pSql;
158
    tscAddSubqueryInfo(&pObj->pHb->cmd);
159

S
slguan 已提交
160
    tscTrace("%p pHb is allocated, pObj:%p", pObj->pHb, pObj);
H
hzcheng 已提交
161 162 163
  }

  if (tscShouldFreeHeatBeat(pObj->pHb)) {
S
slguan 已提交
164
    tscTrace("%p free HB object and release connection", pObj);
H
hzcheng 已提交
165 166 167 168 169 170 171 172 173
    tscFreeSqlObj(pObj->pHb);
    tscCloseTscObj(pObj);
    return;
  }

  tscProcessSql(pObj->pHb);
}

int tscSendMsgToServer(SSqlObj *pSql) {
174
  STscObj* pObj = pSql->pTscObj;
H
hjxilinx 已提交
175 176 177
  SSqlCmd* pCmd = &pSql->cmd;
  
  char *pMsg = rpcMallocCont(pCmd->payloadLen);
S
slguan 已提交
178
  if (NULL == pMsg) {
S
slguan 已提交
179
    tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]);
180
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
181 182
  }

S
slguan 已提交
183
  if (pSql->cmd.command < TSDB_SQL_MGMT) {
S
slguan 已提交
184
    memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen);
J
jtao1735 已提交
185 186 187 188 189
  } else {
    pSql->ipList = tscMgmtIpSet;
    memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
190
  // tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]);
191

J
jtao1735 已提交
192
  SRpcMsg rpcMsg = {
193 194 195 196
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
      .contLen = pSql->cmd.payloadLen,
      .handle  = pSql,
H
hjxilinx 已提交
197
      .code    = 0
J
jtao1735 已提交
198
  };
H
hzcheng 已提交
199

H
Haojun Liao 已提交
200
  pSql->pRpcCtx = rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg);
S
slguan 已提交
201
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
202 203
}

J
jtao1735 已提交
204
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
205
  SSqlObj *pSql = (SSqlObj *)rpcMsg->handle;
H
Haojun Liao 已提交
206
  if (pSql == NULL || pSql->signature != pSql) {
207 208 209
    tscError("%p sql is already released", pSql->signature);
    return;
  }
210

H
Haojun Liao 已提交
211
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
212 213
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
214

H
Haojun Liao 已提交
215 216 217 218 219 220 221 222 223 224 225
  if (pObj->signature != pObj) {
    tscTrace("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);

    tscFreeSqlObj(pSql);
    rpcFreeCont(rpcMsg->pCont);
    return;
  }

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
    tscTrace("%p sqlObj needs to be released or DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command,
S
slguan 已提交
226
             pObj, pObj->signature);
H
Haojun Liao 已提交
227

H
Haojun Liao 已提交
228
    tscFreeSqlObj(pSql);
229
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
230
    return;
H
hzcheng 已提交
231 232
  }

J
jtao1735 已提交
233 234 235 236 237 238
  if (pCmd->command < TSDB_SQL_MGMT) {
    if (pIpSet) pSql->ipList = *pIpSet;
  } else {
    if (pIpSet) tscMgmtIpSet = *pIpSet;
  }

239
  if (rpcMsg->pCont == NULL) {
240
    rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
S
slguan 已提交
241
  } else {
242
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
243 244
    if (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID || 
        rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
S
slguan 已提交
245
      if (pCmd->command == TSDB_SQL_CONNECT) {
246
        rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
247
        rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
248 249
        return;
      } else if (pCmd->command == TSDB_SQL_HB) {
250
        rpcMsg->code = TSDB_CODE_RPC_NOT_READY;
251
        rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
252
        return;
H
Haojun Liao 已提交
253
      } else if (pCmd->command == TSDB_SQL_META) {
H
Haojun Liao 已提交
254
        // get table meta query will not retry, do nothing
S
slguan 已提交
255
      } else {
256
        tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);
257
        
258
        pSql->res.code = rpcMsg->code;  // keep the previous error code
259 260 261
        if (pSql->retry > pSql->maxRetry) {
          tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
        } else {
H
Haojun Liao 已提交
262 263 264 265 266 267 268
          rpcMsg->code = tscRenewTableMeta(pSql, pTableMetaInfo->name);

          // if there is an error occurring, proceed to the following error handling procedure.
          // todo add test cases
          if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
            rpcFreeCont(rpcMsg->pCont);
            return;
269
          }
S
slguan 已提交
270
        }
H
hzcheng 已提交
271 272
      }
    }
S
slguan 已提交
273
  }
274
  
H
hzcheng 已提交
275
  pRes->rspLen = 0;
276
  
277 278
  if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
    pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hzcheng 已提交
279
  } else {
B
Bomin Zhang 已提交
280
    tscTrace("%p query is cancelled, code:%s", pSql, tstrerror(pRes->code));
H
hzcheng 已提交
281 282
  }

S
slguan 已提交
283 284 285 286 287
  if (pRes->code == TSDB_CODE_SUCCESS) {
    tscTrace("%p reset retry counter to be 0 due to success rsp, old:%d", pSql, pSql->retry);
    pSql->retry = 0;
  }

288
  if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
289
    assert(rpcMsg->msgType == pCmd->msgType + 1);
290
    pRes->code    = rpcMsg->code;
291
    pRes->rspType = rpcMsg->msgType;
292
    pRes->rspLen  = rpcMsg->contLen;
H
hzcheng 已提交
293

294
    if (pRes->rspLen > 0 && rpcMsg->pCont) {
295 296
      char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
      if (tmp == NULL) {
297
        pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
298 299
      } else {
        pRes->pRsp = tmp;
300
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
301
      }
302 303
    } else {
      pRes->pRsp = NULL;
S
slguan 已提交
304 305
    }

H
hzcheng 已提交
306 307 308 309
    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
310
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
311
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
312 313 314 315 316 317 318
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
319 320
      tscTrace("%p SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql, sqlCmd[pCmd->command], 
          tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
S
slguan 已提交
321
    } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
322
      tscTrace("%p SQL cmd:%s, code:%s rspLen:%d", pSql, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
H
hzcheng 已提交
323 324
    }
  }
325
  
H
Haojun Liao 已提交
326
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
327
    rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
Haojun Liao 已提交
328
  }
S
Shengliang Guan 已提交
329

330
  if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
H
Haojun Liao 已提交
331
    rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? pRes->numOfRows: pRes->code;
332
    
H
hjxilinx 已提交
333
    bool shouldFree = tscShouldBeFreed(pSql);
H
Haojun Liao 已提交
334
    (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
H
hzcheng 已提交
335

336
    if (shouldFree) {
337
      tscTrace("%p sqlObj is automatically freed", pSql);
sangshuduo's avatar
sangshuduo 已提交
338
      tscFreeSqlObj(pSql);
H
hzcheng 已提交
339 340 341
    }
  }

342
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
343 344
}

S
slguan 已提交
345 346 347
int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
348 349
  int32_t code = TSDB_CODE_SUCCESS;
  
H
hjxilinx 已提交
350 351 352 353 354 355 356
  if (pCmd->command == TSDB_SQL_SELECT ||
      pCmd->command == TSDB_SQL_FETCH ||
      pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT ||
      pCmd->command == TSDB_SQL_CONNECT ||
      pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META ||
H
hjxilinx 已提交
357
      pCmd->command == TSDB_SQL_STABLEVGROUP) {
358 359 360 361 362 363
    pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
  }
  
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return pRes->code;
S
slguan 已提交
364
  }
365

366
  code = tscSendMsgToServer(pSql);
367
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
368
    pRes->code = code;
H
hjxilinx 已提交
369
    tscQueueAsyncRes(pSql);
S
slguan 已提交
370
  }
H
hjxilinx 已提交
371 372
  
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
373 374 375
}

int tscProcessSql(SSqlObj *pSql) {
376 377
  char *   name = NULL;
  SSqlCmd *pCmd = &pSql->cmd;
378 379
  
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
380
  STableMetaInfo *pTableMetaInfo = NULL;
381
  uint32_t        type = 0;
382

383
  if (pQueryInfo != NULL) {
H
hjxilinx 已提交
384
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
385 386
    if (pTableMetaInfo != NULL) {
      name = pTableMetaInfo->name;
387
    }
388

389
    type = pQueryInfo->type;
390
  
H
hjxilinx 已提交
391
    // while numOfTables equals to 0, it must be Heartbeat
H
hjxilinx 已提交
392
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
393
  }
394

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
395
  tscTrace("%p SQL cmd:%s will be processed, name:%s, type:%d", pSql, sqlCmd[pCmd->command], name, type);
H
hjxilinx 已提交
396
  if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL
H
hjxilinx 已提交
397
    if (pTableMetaInfo == NULL) {
398
      pSql->res.code = TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
399 400
      return pSql->res.code;
    }
H
hjxilinx 已提交
401 402
  } else if (pCmd->command < TSDB_SQL_LOCAL) {
    pSql->ipList = tscMgmtIpSet; //?
H
hzcheng 已提交
403 404 405
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }
406
  
S
slguan 已提交
407 408
  return doProcessSql(pSql);
}
H
hzcheng 已提交
409

H
hjxilinx 已提交
410
void tscKillSTableQuery(SSqlObj *pSql) {
411 412 413
  SSqlCmd* pCmd = &pSql->cmd;
  
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
414
  if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
H
hzcheng 已提交
415 416 417 418 419
    return;
  }

  for (int i = 0; i < pSql->numOfSubs; ++i) {
    SSqlObj *pSub = pSql->pSubs[i];
S
slguan 已提交
420
    if (pSub == NULL) {
H
hzcheng 已提交
421 422
      continue;
    }
S
slguan 已提交
423

H
hzcheng 已提交
424 425
    /*
     * here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
H
Haojun Liao 已提交
426
     * sub-queries not correctly released and master sql object of super table query reaches an abnormal state.
H
hzcheng 已提交
427
     */
428
    pSql->pSubs[i]->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
H
Haojun Liao 已提交
429
    rpcCancelRequest(pSql->pSubs[i]->pRpcCtx);
H
hzcheng 已提交
430 431 432 433 434
  }

  /*
   * 1. if the subqueries are not launched or partially launched, we need to waiting the launched
   * query return to successfully free allocated resources.
435
   * 2. if no any subqueries are launched yet, which means the super table query only in parse sql stage,
H
hzcheng 已提交
436 437 438 439 440
   * set the res.code, and return.
   */
  const int64_t MAX_WAITING_TIME = 10000;  // 10 Sec.
  int64_t       stime = taosGetTimestampMs();

H
hjxilinx 已提交
441
  while (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
H
hzcheng 已提交
442 443 444 445 446 447
    taosMsleep(100);
    if (taosGetTimestampMs() - stime > MAX_WAITING_TIME) {
      break;
    }
  }

H
hjxilinx 已提交
448
  tscTrace("%p super table query cancelled", pSql);
H
hzcheng 已提交
449 450
}

J
jtao1735 已提交
451
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
452 453 454 455 456
  char *pMsg, *pStart;

  pStart = pSql->cmd.payload + tsRpcHeadSize;
  pMsg = pStart;

457
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *)pMsg;
S
slguan 已提交
458
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
S
slguan 已提交
459 460
  pMsg += sizeof(pSql->res.qhandle);

461
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
S
slguan 已提交
462
  pRetrieveMsg->free = htons(pQueryInfo->type);
463
  pMsg += sizeof(pQueryInfo->type);
H
hzcheng 已提交
464

465
  // todo valid the vgroupId at the client side
H
hjxilinx 已提交
466 467
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  
weixin_48148422's avatar
weixin_48148422 已提交
468
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
469
    int32_t vgIndex = pTableMetaInfo->vgroupIndex;
470
    
H
hjxilinx 已提交
471 472
    SVgroupsInfo* pVgroupInfo = pTableMetaInfo->vgroupList;
    pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
473
  } else {
H
hjxilinx 已提交
474
    STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
475
    pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
476 477
  }
  
478 479 480 481
  pMsg += sizeof(SRetrieveTableMsg);
  
  pRetrieveMsg->header.contLen = htonl(pSql->cmd.payloadLen);
  
J
jtao1735 已提交
482
  pSql->cmd.msgType = TSDB_MSG_TYPE_FETCH;
483
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
484 485
}

486
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hjxilinx 已提交
487
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
488
  STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
H
hjxilinx 已提交
489
  
490 491 492 493
  char* pMsg = pSql->cmd.payload + tsRpcHeadSize;
  
  // NOTE: shell message size should not include SMsgDesc
  int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
H
hjxilinx 已提交
494
  
495
  SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
496 497
  
  pMsgDesc->numOfVnodes = htonl(1);       //todo set the right number of vnodes
498
  pMsg += sizeof(SMsgDesc);
H
hjxilinx 已提交
499
  
500
  SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
H
hjxilinx 已提交
501
  int32_t vgId = pTableMeta->vgroupInfo.vgId;
502
  
H
hjxilinx 已提交
503
  pShellMsg->header.vgId = htonl(vgId);
504
  pShellMsg->header.contLen = htonl(size);
505
  pShellMsg->length = pShellMsg->header.contLen;
H
hjxilinx 已提交
506
  
507
  pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit);  // number of meters to be inserted
H
hzcheng 已提交
508

H
hjxilinx 已提交
509
  // pSql->cmd.payloadLen is set during copying data into payload
S
slguan 已提交
510
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
511
  tscSetDnodeIpList(pSql, &pTableMeta->vgroupInfo);
512
  
S
slguan 已提交
513
  tscTrace("%p build submit msg, vgId:%d numOfVgroup:%d numberOfIP:%d", pSql, vgId, htonl(pMsgDesc->numOfVnodes), pSql->ipList.numOfIps);
514
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
515 516 517
}

/*
518
 * for table query, simply return the size <= 1k
H
hzcheng 已提交
519
 */
520
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
521
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
522
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
523

524
  int32_t srcColListSize = taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo);
H
hjxilinx 已提交
525 526 527 528
  
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
  int32_t exprSize = sizeof(SSqlFuncMsg) * numOfExprs;
  
529
  return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + 4096;
H
hzcheng 已提交
530 531
}

532
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
533
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
B
Bomin Zhang 已提交
534
  TSKEY dfltKey = htobe64(pQueryMsg->window.skey);
535

H
hjxilinx 已提交
536
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
weixin_48148422's avatar
weixin_48148422 已提交
537
  if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
538 539
    
    SCMVgroupInfo* pVgroupInfo = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
540
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
541 542
      int32_t index = pTableMetaInfo->vgroupIndex;
      assert(index >= 0);
H
hjxilinx 已提交
543
  
544 545
      pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
      tscTrace("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups);
H
hjxilinx 已提交
546 547
    } else {
      pVgroupInfo = &pTableMeta->vgroupInfo;
548
    }
weixin_48148422's avatar
weixin_48148422 已提交
549

550 551
    tscSetDnodeIpList(pSql, pVgroupInfo);
    pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
weixin_48148422's avatar
weixin_48148422 已提交
552

553 554 555
    STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
    pTableIdInfo->tid = htonl(pTableMeta->sid);
    pTableIdInfo->uid = htobe64(pTableMeta->uid);
B
Bomin Zhang 已提交
556
    pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid, dfltKey));
weixin_48148422's avatar
weixin_48148422 已提交
557

558 559
    pQueryMsg->numOfTables = htonl(1);  // set the number of tables
    pMsg += sizeof(STableIdInfo);
H
Haojun Liao 已提交
560
  } else { // it is a subquery of the super table query, this IP info is acquired from vgroupInfo
561 562 563
    int32_t index = pTableMetaInfo->vgroupIndex;
    int32_t numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables);
    assert(index >= 0 && index < numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
564

565
    tscTrace("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
566

567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582
    SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
    
    // set the vgroup info
    tscSetDnodeIpList(pSql, &pTableIdList->vgInfo);
    pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
    
    int32_t numOfTables = taosArrayGetSize(pTableIdList->itemList);
    pQueryMsg->numOfTables = htonl(numOfTables);  // set the number of tables
  
    // serialize each table id info
    for(int32_t i = 0; i < numOfTables; ++i) {
      STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i);
      
      STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
      pTableIdInfo->tid = htonl(pItem->tid);
      pTableIdInfo->uid = htobe64(pItem->uid);
B
Bomin Zhang 已提交
583
      pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pItem->uid, dfltKey));
584 585 586 587
      pMsg += sizeof(STableIdInfo);
    }
  }
  
H
Haojun Liao 已提交
588 589
  tscTrace("%p vgId:%d, query on table:%s, tid:%d, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name,
      pTableMeta->sid, pTableMeta->uid);
H
hjxilinx 已提交
590
  
591 592 593
  return pMsg;
}

594
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
595 596
  SSqlCmd *pCmd = &pSql->cmd;

597
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
598

S
slguan 已提交
599 600
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
H
Haojun Liao 已提交
601
    return -1;  // todo add test for this
S
slguan 已提交
602
  }
603
  
604
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
605
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
606
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
607
  
H
hjxilinx 已提交
608
  if (taosArrayGetSize(pQueryInfo->colList) <= 0 && !tscQueryTags(pQueryInfo)) {
609 610 611
    tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
    return -1;
  }
612 613 614 615 616 617 618 619 620 621
  
  if (pQueryInfo->intervalTime < 0) {
    tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
    return -1;
  }
  
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
    return -1;
  }
622 623

  char *pStart = pCmd->payload + tsRpcHeadSize;
H
hzcheng 已提交
624

S
slguan 已提交
625
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart;
H
hzcheng 已提交
626

627
  int32_t numOfTags = taosArrayGetSize(pTableMetaInfo->tagColList);
628
  
629
  if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
H
hjxilinx 已提交
630 631
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey);
H
hzcheng 已提交
632
  } else {
H
hjxilinx 已提交
633 634
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.ekey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey);
H
hzcheng 已提交
635 636
  }

637 638
  pQueryMsg->order          = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId     = htons(pQueryInfo->order.orderColId);
H
Haojun Liao 已提交
639
  pQueryMsg->fillType       = htons(pQueryInfo->fillType);
640 641
  pQueryMsg->limit          = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset         = htobe64(pQueryInfo->limit.offset);
642
  pQueryMsg->numOfCols      = htons(taosArrayGetSize(pQueryInfo->colList));
643 644
  pQueryMsg->intervalTime   = htobe64(pQueryInfo->intervalTime);
  pQueryMsg->slidingTime    = htobe64(pQueryInfo->slidingTime);
H
hjxilinx 已提交
645
  pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
646
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
647
  pQueryMsg->numOfTags      = htonl(numOfTags);
weixin_48148422's avatar
weixin_48148422 已提交
648
  pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
H
Haojun Liao 已提交
649
  pQueryMsg->queryType      = htonl(pQueryInfo->type);
H
hjxilinx 已提交
650 651 652
  
  size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
  pQueryMsg->numOfOutput = htons(numOfOutput);
H
hzcheng 已提交
653 654

  // set column list ids
655 656
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
H
hjxilinx 已提交
657
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
658
  
659 660 661
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
    SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
662

663
    if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
664 665
        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
      tscError("%p sid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
B
Bomin Zhang 已提交
666
          pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
667 668
               pColSchema->name);

669
      return TSDB_CODE_TSC_INVALID_SQL;
670
    }
H
hzcheng 已提交
671 672 673

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
674
    pQueryMsg->colList[i].type  = htons(pColSchema->type);
S
slguan 已提交
675
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
676

S
slguan 已提交
677 678 679
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
680

S
slguan 已提交
681
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
682
      pFilterMsg->filterstr = htons(pColFilter->filterstr);
S
slguan 已提交
683 684

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
685

686
      if (pColFilter->filterstr) {
S
slguan 已提交
687 688 689 690 691 692 693 694 695 696
        pFilterMsg->len = htobe64(pColFilter->len);
        memcpy(pMsg, (void *)pColFilter->pz, pColFilter->len + 1);
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
697

S
slguan 已提交
698 699 700 701 702
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
        return -1;
      }
    }
H
hzcheng 已提交
703 704
  }

H
hjxilinx 已提交
705
  SSqlFuncMsg *pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hjxilinx 已提交
706
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
707
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
708

H
hjxilinx 已提交
709
    if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId)) {
710
      /* column id is not valid according to the cached table meta, the table meta is expired */
H
hzcheng 已提交
711 712 713 714
      tscError("%p table schema is not matched with parsed sql", pSql);
      return -1;
    }

715 716 717
    pSqlFuncExpr->colInfo.colId    = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
    pSqlFuncExpr->colInfo.flag     = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
718

719
    pSqlFuncExpr->functionId  = htons(pExpr->functionId);
H
hzcheng 已提交
720
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
H
hjxilinx 已提交
721
    pMsg += sizeof(SSqlFuncMsg);
H
hzcheng 已提交
722 723

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
H
Haojun Liao 已提交
724
      // todo add log
H
hzcheng 已提交
725 726 727 728 729
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
730
        pMsg += pExpr->param[j].nLen;
H
hzcheng 已提交
731 732 733 734 735
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

H
hjxilinx 已提交
736
    pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hzcheng 已提交
737
  }
738
  
739
  // serialize the table info (sid, uid, tags)
740 741
  pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
  
742
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
743
  if (pGroupbyExpr->numOfGroupCols > 0) {
S
slguan 已提交
744
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
745 746
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
747
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
748 749
      SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
  
S
slguan 已提交
750 751 752
      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

753 754
      *((int16_t *)pMsg) += pCol->colIndex;
      pMsg += sizeof(pCol->colIndex);
S
slguan 已提交
755 756 757

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
758 759 760
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
761 762 763
    }
  }

764
  if (pQueryInfo->fillType != TSDB_FILL_NONE) {
H
hjxilinx 已提交
765
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
766 767
      *((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]);
      pMsg += sizeof(pQueryInfo->fillVal[0]);
S
slguan 已提交
768 769
    }
  }
770 771 772 773 774 775 776 777 778
  
  if (numOfTags != 0) {
    int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
    int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
    int32_t total = numOfTagColumns + numOfColumns;
    
    pSchema = tscGetTableTagSchema(pTableMeta);
    
    for (int32_t i = 0; i < numOfTags; ++i) {
H
hjxilinx 已提交
779
      SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
780 781 782 783 784 785
      SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];

      if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
          (pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
        tscError("%p sid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
                 pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, total, numOfTagColumns,
B
Bomin Zhang 已提交
786
                 pCol->colIndex.columnIndex, pColSchema->name);
787

788
        return TSDB_CODE_TSC_INVALID_SQL;
789 790 791 792 793 794 795 796 797 798 799 800
      }
  
      SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
  
      pTagCol->colId = htons(pColSchema->colId);
      pTagCol->bytes = htons(pColSchema->bytes);
      pTagCol->type  = htons(pColSchema->type);
      pTagCol->numOfFilters = 0;
      
      pMsg += sizeof(SColumnInfo);
    }
  }
S
slguan 已提交
801

H
Haojun Liao 已提交
802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822
  // serialize tag column query condition
  if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
    STagCond* pTagCond = &pQueryInfo->tagCond;
    
    SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->uid);
    if (pCond != NULL && pCond->cond != NULL) {
      pQueryMsg->tagCondLen = htons(pCond->len);
      memcpy(pMsg, pCond->cond, pCond->len);
      
      pMsg += pCond->len;
    }
  }
  
  if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
    *pMsg = 0;
    pMsg++;
  } else {
    strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
    pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
  }

S
slguan 已提交
823 824 825 826 827
  // compressed ts block
  pQueryMsg->tsOffset = htonl(pMsg - pStart);
  int32_t tsLen = 0;
  int32_t numOfBlocks = 0;

828
  if (pQueryInfo->tsBuf != NULL) {
H
hjxilinx 已提交
829
    STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vgroupIndex);
830
    assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL);  // this query should not be sent
S
slguan 已提交
831 832

    // todo refactor
833 834
    fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET);
    fread(pMsg, pBlockInfo->compLen, 1, pQueryInfo->tsBuf->f);
S
slguan 已提交
835 836 837 838

    pMsg += pBlockInfo->compLen;
    tsLen = pBlockInfo->compLen;
    numOfBlocks = pBlockInfo->numOfBlocks;
H
hzcheng 已提交
839 840
  }

S
slguan 已提交
841 842
  pQueryMsg->tsLen = htonl(tsLen);
  pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
843 844
  if (pQueryInfo->tsBuf != NULL) {
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
H
hzcheng 已提交
845 846
  }

847
  int32_t msgLen = pMsg - pStart;
H
hzcheng 已提交
848 849 850

  tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
  pCmd->payloadLen = msgLen;
S
slguan 已提交
851
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hjxilinx 已提交
852
  
853
  pQueryMsg->head.contLen = htonl(msgLen);
H
hzcheng 已提交
854
  assert(msgLen + minMsgSize() <= size);
855 856

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
857 858
}

859 860
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
861
  pCmd->payloadLen = sizeof(SCMCreateDbMsg);
S
slguan 已提交
862
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
H
hzcheng 已提交
863

864
  SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload;
865

866
  assert(pCmd->numOfClause == 1);
867
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
868
  tstrncpy(pCreateDbMsg->db, pTableMetaInfo->name, sizeof(pCreateDbMsg->db));
H
hzcheng 已提交
869

870
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
871 872
}

873 874
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
875
  pCmd->payloadLen = sizeof(SCMCreateDnodeMsg);
S
slguan 已提交
876 877
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
878
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
879
  }
H
hzcheng 已提交
880

881
  SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload;
J
jtao1735 已提交
882 883
  strncpy(pCreate->ep, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
  
S
slguan 已提交
884
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
H
hzcheng 已提交
885

886
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
887 888
}

889 890
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
891
  pCmd->payloadLen = sizeof(SCMCreateAcctMsg);
S
slguan 已提交
892 893
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
894
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
895
  }
H
hzcheng 已提交
896

897
  SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
898

899 900
  SSQLToken *pName = &pInfo->pDCLInfo->user.user;
  SSQLToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
901

902 903
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
904

905
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
906

907 908 909 910 911 912 913 914
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
915

916 917 918 919 920 921 922 923 924 925 926 927 928
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
929

S
slguan 已提交
930
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
931
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
932 933
}

934 935
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
936
  pCmd->payloadLen = sizeof(SCMCreateUserMsg);
S
slguan 已提交
937

S
slguan 已提交
938 939
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
940
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
941 942
  }

943
  SCMCreateUserMsg *pAlterMsg = (SCMCreateUserMsg*)pCmd->payload;
H
hzcheng 已提交
944

945 946 947
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
  pAlterMsg->flag = pUser->type;
H
hzcheng 已提交
948

949 950 951 952
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
953 954
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
955
  }
H
hzcheng 已提交
956

957
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
S
slguan 已提交
958
    pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
959
  } else {
S
slguan 已提交
960
    pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
961
  }
H
hzcheng 已提交
962

963
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
964 965
}

966 967
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
968
  pCmd->payloadLen = sizeof(SCMCfgDnodeMsg);
guanshengliang's avatar
guanshengliang 已提交
969
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONFIG_DNODE;
970 971
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
972

973 974
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
975
  pCmd->payloadLen = sizeof(SCMDropDbMsg);
H
hzcheng 已提交
976

S
slguan 已提交
977 978
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
979
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
980 981
  }

982
  SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
983

984
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
985
  tstrncpy(pDropDbMsg->db, pTableMetaInfo->name, sizeof(pDropDbMsg->db));
986
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
987

S
slguan 已提交
988
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
989
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
990 991
}

992 993
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
994
  pCmd->payloadLen = sizeof(SCMDropTableMsg);
H
hzcheng 已提交
995

S
slguan 已提交
996 997
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
998
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
999
  }
H
hzcheng 已提交
1000

1001
  SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
1002
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1003
  strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
S
slguan 已提交
1004
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1005

S
slguan 已提交
1006
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
1007
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1008 1009
}

1010
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1011
  SSqlCmd *pCmd = &pSql->cmd;
1012
  pCmd->payloadLen = sizeof(SCMDropDnodeMsg);
S
slguan 已提交
1013 1014
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1015
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1016
  }
H
hzcheng 已提交
1017

1018
  SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
1019
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1020
  tstrncpy(pDrop->ep, pTableMetaInfo->name, sizeof(pDrop->ep));
S
slguan 已提交
1021
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
H
hzcheng 已提交
1022

1023
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1024 1025
}

S
[TD-16]  
slguan 已提交
1026
int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1027
  SSqlCmd *pCmd = &pSql->cmd;
1028
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
S
slguan 已提交
1029
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
H
hzcheng 已提交
1030

S
slguan 已提交
1031 1032
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1033
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1034
  }
H
hzcheng 已提交
1035

1036
  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
1037
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1038
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
H
hzcheng 已提交
1039

1040
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1041 1042
}

S
[TD-16]  
slguan 已提交
1043 1044 1045 1046 1047 1048 1049
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_ACCT;

  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1050
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
[TD-16]  
slguan 已提交
1051 1052 1053 1054
  }

  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1055
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
S
[TD-16]  
slguan 已提交
1056 1057 1058 1059

  return TSDB_CODE_SUCCESS;
}

1060 1061
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1062
  pCmd->payloadLen = sizeof(SCMUseDbMsg);
H
hzcheng 已提交
1063

S
slguan 已提交
1064 1065
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1066
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1067
  }
1068

1069
  SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload;
1070
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1071
  strcpy(pUseDbMsg->db, pTableMetaInfo->name);
S
slguan 已提交
1072
  pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
H
hzcheng 已提交
1073

1074
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1075 1076
}

1077
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1078
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1079
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1080
  pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
1081
  pCmd->payloadLen = sizeof(SCMShowMsg) + 100;
H
hzcheng 已提交
1082

S
slguan 已提交
1083 1084
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1085
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1086
  }
H
hzcheng 已提交
1087

1088
  SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload;
S
slguan 已提交
1089

1090
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1091
  size_t nameLen = strlen(pTableMetaInfo->name);
S
slguan 已提交
1092
  if (nameLen > 0) {
B
Bomin Zhang 已提交
1093
    tstrncpy(pShowMsg->db, pTableMetaInfo->name, sizeof(pShowMsg->db));  // prefix is set here
H
hzcheng 已提交
1094
  } else {
B
Bomin Zhang 已提交
1095
    tstrncpy(pShowMsg->db, pObj->db, sizeof(pShowMsg->db));
H
hzcheng 已提交
1096 1097
  }

1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
    SSQLToken *pPattern = &pShowInfo->pattern;
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
    SSQLToken *pIpAddr = &pShowInfo->prefix;
    assert(pIpAddr->n > 0 && pIpAddr->type > 0);
H
hzcheng 已提交
1110

1111 1112 1113 1114
    strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n);
    pShowMsg->payloadLen = htons(pIpAddr->n);
  }

1115
  pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
1116
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1117 1118
}

1119
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1120
  SSqlCmd *pCmd = &pSql->cmd;
1121
  pCmd->payloadLen = sizeof(SCMKillQueryMsg);
H
hzcheng 已提交
1122

1123 1124
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
S
slguan 已提交
1125
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
1126 1127
      break;
    case TSDB_SQL_KILL_CONNECTION:
S
slguan 已提交
1128
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
1129 1130
      break;
    case TSDB_SQL_KILL_STREAM:
S
slguan 已提交
1131
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
1132 1133 1134
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1135 1136
}

1137
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1138 1139
  SSqlCmd *pCmd = &(pSql->cmd);

guanshengliang's avatar
guanshengliang 已提交
1140
  int32_t size = minMsgSize() + sizeof(SCMCreateTableMsg);
H
hzcheng 已提交
1141

1142
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1143
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
1144 1145
    size += sizeof(STagData);
  } else {
S
slguan 已提交
1146
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1147
  }
1148

1149 1150 1151
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1152 1153 1154 1155

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1156
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1157
  int              msgLen = 0;
S
slguan 已提交
1158
  SSchema *        pSchema;
H
hzcheng 已提交
1159
  int              size = 0;
1160 1161 1162
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1163
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1164 1165

  // Reallocate the payload size
1166
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1167 1168
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1169
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1170
  }
H
hzcheng 已提交
1171 1172


1173
  SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
H
hjxilinx 已提交
1174
  strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
1175 1176

  // use dbinfo from table id without modifying current db info
H
Haojun Liao 已提交
1177
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pCreateTableMsg->db);
H
hzcheng 已提交
1178

1179 1180 1181
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;
H
hzcheng 已提交
1182 1183 1184 1185
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
S
slguan 已提交
1186
  char *pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
1187

1188 1189
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
1190 1191 1192 1193 1194 1195 1196
    STagData* pTag = &pInfo->pCreateTableInfo->usingInfo.tagdata;
    *(int32_t*)pMsg = htonl(pTag->dataLen);
    pMsg += sizeof(int32_t);
    memcpy(pMsg, pTag->name, sizeof(pTag->name));
    pMsg += sizeof(pTag->name);
    memcpy(pMsg, pTag->data, pTag->dataLen);
    pMsg += pTag->dataLen;
1197
  } else {  // create (super) table
1198
    pSchema = (SSchema *)pCreateTableMsg->schema;
1199

H
hzcheng 已提交
1200
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
H
hjxilinx 已提交
1201
      TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hzcheng 已提交
1202 1203 1204 1205

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
1206

H
hzcheng 已提交
1207 1208 1209 1210
      pSchema++;
    }

    pMsg = (char *)pSchema;
1211 1212
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
1213

1214 1215 1216
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
1217 1218 1219
    }
  }

H
hjxilinx 已提交
1220
  tscFieldInfoClear(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
1221

S
slguan 已提交
1222
  msgLen = pMsg - (char*)pCreateTableMsg;
S
slguan 已提交
1223
  pCreateTableMsg->contLen = htonl(msgLen);
H
hzcheng 已提交
1224
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1225
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
H
hzcheng 已提交
1226 1227

  assert(msgLen + minMsgSize() <= size);
1228
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1229 1230 1231
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
1232
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
guanshengliang's avatar
guanshengliang 已提交
1233
  return minMsgSize() + sizeof(SCMAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
1234 1235 1236
         TSDB_EXTRA_PAYLOAD_SIZE;
}

1237
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1238 1239
  char *pMsg;
  int   msgLen = 0;
H
hzcheng 已提交
1240

1241
  SSqlCmd    *pCmd = &pSql->cmd;
1242 1243
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
1244
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1245 1246 1247
  
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;
  int size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
1248 1249 1250 1251
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
    return -1;
  }
1252 1253
  
  SCMAlterTableMsg *pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
H
Haojun Liao 已提交
1254
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
1255

H
hjxilinx 已提交
1256
  strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
1257
  pAlterTableMsg->type = htons(pAlterInfo->type);
1258

1259
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
S
slguan 已提交
1260
  SSchema *pSchema = pAlterTableMsg->schema;
1261
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
H
hjxilinx 已提交
1262
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
1263
  
H
hzcheng 已提交
1264 1265 1266 1267 1268 1269 1270
    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;
1271 1272 1273
  pAlterTableMsg->tagValLen = htonl(pAlterInfo->tagData.dataLen);
  memcpy(pMsg, pAlterInfo->tagData.data, pAlterInfo->tagData.dataLen);
  pMsg += pAlterInfo->tagData.dataLen;
H
hzcheng 已提交
1274

S
slguan 已提交
1275
  msgLen = pMsg - (char*)pAlterTableMsg;
1276

H
hzcheng 已提交
1277
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1278
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
H
hzcheng 已提交
1279 1280

  assert(msgLen + minMsgSize() <= size);
1281

1282
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1283 1284
}

1285 1286 1287 1288 1289 1290
int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
  SSqlCmd* pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL;
  
  SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) (pCmd->payload + tsRpcHeadSize);
  pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
1291 1292 1293 1294
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  tscSetDnodeIpList(pSql, &pTableMetaInfo->pTableMeta->vgroupInfo);

1295 1296 1297
  return TSDB_CODE_SUCCESS;
}

1298
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1299
  SSqlCmd *pCmd = &pSql->cmd;
1300
  pCmd->payloadLen = sizeof(SCMAlterDbMsg);
S
slguan 已提交
1301
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
H
hzcheng 已提交
1302

1303
  SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
1304
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1305
  tstrncpy(pAlterDbMsg->db, pTableMetaInfo->name, sizeof(pAlterDbMsg->db));
H
hzcheng 已提交
1306

1307
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1308 1309
}

1310
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1311
  SSqlCmd *pCmd = &pSql->cmd;
J
jtao1735 已提交
1312
  pCmd->msgType = TSDB_MSG_TYPE_CM_RETRIEVE;
S
slguan 已提交
1313
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
1314

S
slguan 已提交
1315 1316
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1317
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
1318
  }
S
slguan 已提交
1319

S
slguan 已提交
1320 1321 1322 1323
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
1324

1325
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1326 1327
}

1328
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
1329
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1330 1331 1332
    return pRes->code;
  }

H
hjxilinx 已提交
1333
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
H
hjxilinx 已提交
1334
    int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
H
hjxilinx 已提交
1335
    pRes->tsrow[i] = ((char*) pRes->data + offset * pRes->numOfRows);
H
hzcheng 已提交
1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
1350

1351
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1352

H
hzcheng 已提交
1353 1354 1355 1356 1357 1358 1359
  pRes->code = TSDB_CODE_SUCCESS;

  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

1360
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1361
  } else {
S
slguan 已提交
1362
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
1378
  SSqlCmd *       pCmd = &pSql->cmd;
1379
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
1380

H
hjxilinx 已提交
1381
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1382 1383
  
  int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
H
hzcheng 已提交
1384 1385 1386
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
Haojun Liao 已提交
1387 1388 1389
int tscProcessLocalRetrieveRsp(SSqlObj *pSql) {
  int32_t numOfRes = 1;
  pSql->res.completed = true;
H
hzcheng 已提交
1390 1391 1392
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
hjxilinx 已提交
1393
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
H
hzcheng 已提交
1394 1395 1396
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

1397
  pRes->code = tscDoLocalMerge(pSql);
H
hjxilinx 已提交
1398
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1399 1400

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
1401
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1402 1403 1404
  }

  pRes->row = 0;
1405
  pRes->completed = (pRes->numOfRows == 0);
H
hzcheng 已提交
1406

1407
  int32_t code = pRes->code;
H
hjxilinx 已提交
1408 1409 1410 1411
  if (pRes->code == TSDB_CODE_SUCCESS) {
    (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
  } else {
    tscQueueAsyncRes(pSql);
H
hzcheng 已提交
1412 1413 1414 1415 1416
  }

  return code;
}

S
slguan 已提交
1417
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
1418

1419
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1420
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1421
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1422
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
1423
  pCmd->payloadLen = sizeof(SCMConnectMsg);
H
hzcheng 已提交
1424

S
slguan 已提交
1425 1426
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1427
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1428 1429
  }

1430
  SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
H
hzcheng 已提交
1431 1432 1433 1434

  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
B
Bomin Zhang 已提交
1435 1436 1437
  tstrncpy(pConnect->db, db, sizeof(pConnect->db));
  tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion));
  tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion));
H
hzcheng 已提交
1438

1439
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1440 1441
}

H
hjxilinx 已提交
1442
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1443
  SCMTableInfoMsg *pInfoMsg;
S
slguan 已提交
1444
  char *         pMsg;
H
hzcheng 已提交
1445 1446
  int            msgLen = 0;

B
Bomin Zhang 已提交
1447 1448 1449 1450
  char *tmpData = NULL;
  uint32_t len = pSql->cmd.payloadLen;
  if (len > 0) {
    tmpData = calloc(1, len);
1451
    if (NULL == tmpData) {
1452
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
1453 1454
    }

H
hzcheng 已提交
1455
    // STagData is in binary format, strncpy is not available
B
Bomin Zhang 已提交
1456
    memcpy(tmpData, pSql->cmd.payload, len);
H
hzcheng 已提交
1457 1458
  }

1459 1460 1461
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
1462
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1463

1464
  pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
H
hjxilinx 已提交
1465
  strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
H
hjxilinx 已提交
1466
  pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
S
slguan 已提交
1467

1468
  pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
H
hzcheng 已提交
1469

B
Bomin Zhang 已提交
1470 1471 1472
  if (pSql->cmd.autoCreated && len > 0) {
    memcpy(pInfoMsg->tags, tmpData, len);
    pMsg += len;
H
hzcheng 已提交
1473 1474
  }

1475
  pCmd->payloadLen = pMsg - (char*)pInfoMsg;
S
slguan 已提交
1476
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
H
hzcheng 已提交
1477 1478 1479 1480

  tfree(tmpData);

  assert(msgLen + minMsgSize() <= pCmd->allocSize);
1481
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1482 1483
}

S
slguan 已提交
1484
/**
1485
 *  multi table meta req pkg format:
1486
 *  | SMgmtHead | SCMMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
1487 1488
 *      no used         4B
 **/
1489
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
guanshengliang's avatar
guanshengliang 已提交
1490
#if 0
S
slguan 已提交
1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
    tmpData = calloc(1, pCmd->payloadLen + 1);
    if (NULL == tmpData) return -1;
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
S
slguan 已提交
1503
  memset(pMgmt->db, 0, TSDB_TABLE_ID_LEN);  // server don't need the db
S
slguan 已提交
1504

1505
  SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
S
slguan 已提交
1506
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
1507 1508

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
1509
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
1510 1511 1512 1513
  }

  tfree(tmpData);

1514
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1515
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
S
slguan 已提交
1516 1517 1518

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

S
slguan 已提交
1519
  tscTrace("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
1520 1521 1522
           pCmd->payloadLen);

  return pCmd->payloadLen;
guanshengliang's avatar
guanshengliang 已提交
1523 1524
#endif
  return 0;  
S
slguan 已提交
1525 1526
}

H
hjxilinx 已提交
1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552
//static UNUSED_FUNC int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
////  const int32_t defaultSize =
////      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
////  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
////
////  int32_t n = 0;
////  size_t size = taosArrayGetSize(pQueryInfo->tagCond.pCond);
////  for (int32_t i = 0; i < size; ++i) {
////    assert(0);
//////    n += strlen(pQueryInfo->tagCond.cond[i].cond);
////  }
////
////  int32_t tagLen = n * TSDB_NCHAR_SIZE;
////  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
////    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
////  }
////
////  int32_t joinCondLen = (TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2;
////  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
////
////  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndex);
////
////  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
////
////  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
//}
H
hzcheng 已提交
1553

H
hjxilinx 已提交
1554 1555 1556
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {

#if 0
S
slguan 已提交
1557
  SSuperTableMetaMsg *pMetaMsg;
H
hzcheng 已提交
1558 1559
  char *          pMsg, *pStart;
  int             msgLen = 0;
S
slguan 已提交
1560
  int             tableIndex = 0;
H
hzcheng 已提交
1561

1562 1563 1564
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

1565
  STagCond *pTagCond = &pQueryInfo->tagCond;
S
slguan 已提交
1566

H
hjxilinx 已提交
1567
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
H
hzcheng 已提交
1568 1569

  int32_t size = tscEstimateMetricMetaMsgSize(pCmd);
S
slguan 已提交
1570 1571 1572 1573
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for metric meter msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1574 1575 1576 1577 1578

  pStart = pCmd->payload + tsRpcHeadSize;
  pMsg = pStart;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
H
Haojun Liao 已提交
1579
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pMgmt->db);
H
hzcheng 已提交
1580 1581 1582

  pMsg += sizeof(SMgmtHead);

S
slguan 已提交
1583
  pMetaMsg = (SSuperTableMetaMsg *)pMsg;
S
slguan 已提交
1584
  pMetaMsg->numOfTables = htonl(pQueryInfo->numOfTables);
S
slguan 已提交
1585

S
slguan 已提交
1586
  pMsg += sizeof(SSuperTableMetaMsg);
S
slguan 已提交
1587 1588 1589 1590 1591

  int32_t offset = pMsg - (char *)pMetaMsg;
  pMetaMsg->join = htonl(offset);

  // todo refactor
S
slguan 已提交
1592
  pMetaMsg->joinCondLen = htonl((TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2);
H
hzcheng 已提交
1593

S
slguan 已提交
1594
  memcpy(pMsg, pTagCond->joinInfo.left.tableId, TSDB_TABLE_ID_LEN);
S
slguan 已提交
1595
  pMsg += TSDB_TABLE_ID_LEN;
H
hzcheng 已提交
1596

S
slguan 已提交
1597 1598 1599
  *(int16_t *)pMsg = pTagCond->joinInfo.left.tagCol;
  pMsg += sizeof(int16_t);

S
slguan 已提交
1600
  memcpy(pMsg, pTagCond->joinInfo.right.tableId, TSDB_TABLE_ID_LEN);
S
slguan 已提交
1601
  pMsg += TSDB_TABLE_ID_LEN;
S
slguan 已提交
1602 1603 1604 1605

  *(int16_t *)pMsg = pTagCond->joinInfo.right.tagCol;
  pMsg += sizeof(int16_t);

1606
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
1607
    pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
H
hjxilinx 已提交
1608
    uint64_t uid = pTableMetaInfo->pTableMeta->uid;
S
slguan 已提交
1609 1610 1611 1612

    offset = pMsg - (char *)pMetaMsg;
    pMetaMsg->metaElem[i] = htonl(offset);

S
slguan 已提交
1613 1614
    SSuperTableMetaElemMsg *pElem = (SSuperTableMetaElemMsg *)pMsg;
    pMsg += sizeof(SSuperTableMetaElemMsg);
S
slguan 已提交
1615 1616 1617 1618

    // convert to unicode before sending to mnode for metric query
    int32_t condLen = 0;
    if (pTagCond->numOfTagCond > 0) {
1619
      SCond *pCond = tsGetSTableQueryCond(pTagCond, uid);
H
hjxilinx 已提交
1620
      if (pCond != NULL && pCond->cond != NULL) {
H
hjxilinx 已提交
1621
        condLen = strlen(pCond->cond) + 1;
1622

H
hjxilinx 已提交
1623
        bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE);
S
slguan 已提交
1624
        if (!ret) {
1625
          tscError("%p mbs to ucs4 failed:%s", pSql, tsGetSTableQueryCond(pTagCond, uid));
S
slguan 已提交
1626 1627 1628
          return 0;
        }
      }
H
hzcheng 已提交
1629 1630
    }

S
slguan 已提交
1631
    pElem->condLen = htonl(condLen);
H
hzcheng 已提交
1632

S
slguan 已提交
1633 1634 1635
    offset = pMsg - (char *)pMetaMsg;
    pElem->cond = htonl(offset);
    pMsg += condLen * TSDB_NCHAR_SIZE;
H
hzcheng 已提交
1636

S
slguan 已提交
1637 1638 1639
    pElem->rel = htons(pTagCond->relType);
    if (pTagCond->tbnameCond.uid == uid) {
      offset = pMsg - (char *)pMetaMsg;
H
hzcheng 已提交
1640

S
slguan 已提交
1641
      pElem->tableCond = htonl(offset);
H
hjxilinx 已提交
1642 1643 1644 1645 1646 1647 1648
      
      uint32_t len = 0;
      if (pTagCond->tbnameCond.cond != NULL) {
        len = strlen(pTagCond->tbnameCond.cond);
        memcpy(pMsg, pTagCond->tbnameCond.cond, len);
      }
      
H
hjxilinx 已提交
1649 1650
      pElem->tableCondLen = htonl(len);
      pMsg += len;
S
slguan 已提交
1651 1652
    }

1653
    SSqlGroupbyExpr *pGroupby = &pQueryInfo->groupbyExpr;
S
slguan 已提交
1654

H
hjxilinx 已提交
1655
    if (pGroupby->tableIndex != i && pGroupby->numOfGroupCols > 0) {
S
slguan 已提交
1656 1657 1658 1659 1660
      pElem->orderType = 0;
      pElem->orderIndex = 0;
      pElem->numOfGroupCols = 0;
    } else {
      pElem->numOfGroupCols = htons(pGroupby->numOfGroupCols);
H
hjxilinx 已提交
1661 1662
      for (int32_t j = 0; j < pTableMetaInfo->numOfTags; ++j) {
        pElem->tagCols[j] = htons(pTableMetaInfo->tagColumnIndex[j]);
S
slguan 已提交
1663 1664 1665 1666 1667 1668 1669 1670
      }

      if (pGroupby->numOfGroupCols != 0) {
        pElem->orderIndex = htons(pGroupby->orderIndex);
        pElem->orderType = htons(pGroupby->orderType);
        offset = pMsg - (char *)pMetaMsg;

        pElem->groupbyTagColumnList = htonl(offset);
1671
        for (int32_t j = 0; j < pQueryInfo->groupbyExpr.numOfGroupCols; ++j) {
1672 1673
          SColIndex *pCol = &pQueryInfo->groupbyExpr.columnInfo[j];
          SColIndex *pDestCol = (SColIndex *)pMsg;
1674

H
hjxilinx 已提交
1675
          pDestCol->colIdxInBuf = 0;
1676
          pDestCol->colIndex = htons(pCol->colIndex);
H
hjxilinx 已提交
1677 1678
          pDestCol->colId = htons(pDestCol->colId);
          pDestCol->flag = htons(pDestCol->flag);
H
hjxilinx 已提交
1679
          strncpy(pDestCol->name, pCol->name, tListLen(pCol->name));
1680

1681
          pMsg += sizeof(SColIndex);
S
slguan 已提交
1682 1683
        }
      }
H
hzcheng 已提交
1684
    }
S
slguan 已提交
1685

H
hjxilinx 已提交
1686 1687
    strcpy(pElem->tableId, pTableMetaInfo->name);
    pElem->numOfTags = htons(pTableMetaInfo->numOfTags);
S
slguan 已提交
1688 1689 1690

    int16_t len = pMsg - (char *)pElem;
    pElem->elemLen = htons(len);  // redundant data for integrate check
H
hzcheng 已提交
1691 1692 1693 1694
  }

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1695
  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
H
hzcheng 已提交
1696
  assert(msgLen + minMsgSize() <= size);
H
hjxilinx 已提交
1697
#endif
1698
  
H
hjxilinx 已提交
1699
  SSqlCmd *pCmd = &pSql->cmd;
H
hjxilinx 已提交
1700 1701 1702 1703 1704 1705 1706 1707 1708 1709
  
  char* pMsg = pCmd->payload;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  
  SCMSTableVgroupMsg *pStableVgroupMsg = (SCMSTableVgroupMsg *) pMsg;
  pStableVgroupMsg->numOfTables = htonl(pQueryInfo->numOfTables);
  pMsg += sizeof(SCMSTableVgroupMsg);
  
  for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
1710 1711 1712
    size_t size = sizeof(pTableMetaInfo->name);
    tstrncpy(pMsg, pTableMetaInfo->name, size);
    pMsg += size;
H
hjxilinx 已提交
1713
  }
H
hjxilinx 已提交
1714 1715

  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
H
hjxilinx 已提交
1716
  pCmd->payloadLen = (pMsg - pCmd->payload);
H
hjxilinx 已提交
1717

1718
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1719 1720
}

1721 1722
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1723 1724
  STscObj *pObj = pSql->pTscObj;

1725
  pthread_mutex_lock(&pObj->mutex);
H
hzcheng 已提交
1726

S
Shengliang Guan 已提交
1727
  int32_t numOfQueries = 2;
H
hzcheng 已提交
1728 1729 1730
  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
    tpSql = tpSql->next;
1731
    numOfQueries++;
H
hzcheng 已提交
1732 1733
  }

S
Shengliang Guan 已提交
1734
  int32_t numOfStreams = 2;
H
hzcheng 已提交
1735 1736 1737
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
    pStream = pStream->next;
1738
    numOfStreams++;
H
hzcheng 已提交
1739 1740
  }

1741
  int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100;
S
slguan 已提交
1742
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
1743
    pthread_mutex_unlock(&pObj->mutex);
S
slguan 已提交
1744 1745 1746
    tscError("%p failed to malloc for heartbeat msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1747

1748
  SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
1749 1750
  pHeartbeat->numOfQueries = numOfQueries;
  pHeartbeat->numOfStreams = numOfStreams;
1751
  int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj);
H
hzcheng 已提交
1752 1753 1754 1755

  pthread_mutex_unlock(&pObj->mutex);

  pCmd->payloadLen = msgLen;
S
slguan 已提交
1756
  pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
H
hzcheng 已提交
1757

1758
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1759 1760
}

1761 1762
int tscProcessTableMetaRsp(SSqlObj *pSql) {
  STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
H
hzcheng 已提交
1763

1764 1765
  pMetaMsg->sid = htonl(pMetaMsg->sid);
  pMetaMsg->sversion = htons(pMetaMsg->sversion);
1766
  pMetaMsg->tversion = htons(pMetaMsg->tversion);
H
hjxilinx 已提交
1767 1768
  pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
  
1769 1770
  pMetaMsg->uid = htobe64(pMetaMsg->uid);
  pMetaMsg->contLen = htons(pMetaMsg->contLen);
1771
  pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1772

H
hjxilinx 已提交
1773 1774
  if (pMetaMsg->sid < 0 || pMetaMsg->vgroup.numOfIps < 0) {
    tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgroup.numOfIps, pMetaMsg->sid);
1775
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1776 1777
  }

1778 1779
  if (pMetaMsg->numOfTags > TSDB_MAX_TAGS || pMetaMsg->numOfTags < 0) {
    tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
1780
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1781 1782
  }

1783 1784
  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
1785
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1786 1787
  }

H
hjxilinx 已提交
1788 1789
  for (int i = 0; i < pMetaMsg->vgroup.numOfIps; ++i) {
    pMetaMsg->vgroup.ipAddr[i].port = htons(pMetaMsg->vgroup.ipAddr[i].port);
H
hzcheng 已提交
1790 1791
  }

1792
  SSchema* pSchema = pMetaMsg->schema;
H
hzcheng 已提交
1793

1794
  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
H
hzcheng 已提交
1795 1796 1797
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);
H
hjxilinx 已提交
1798 1799
    
    assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
1800 1801 1802
    pSchema++;
  }

1803 1804
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
1805
  
H
hzcheng 已提交
1806
  // todo add one more function: taosAddDataIfNotExists();
1807
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1808
  assert(pTableMetaInfo->pTableMeta == NULL);
H
hzcheng 已提交
1809

H
hjxilinx 已提交
1810
  pTableMetaInfo->pTableMeta =
H
hjxilinx 已提交
1811
      (STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name, pTableMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1812
  
1813
  // todo handle out of memory case
1814
  if (pTableMetaInfo->pTableMeta == NULL) {
B
Bomin Zhang 已提交
1815
    free(pTableMeta);
1816
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1817
  }
H
hzcheng 已提交
1818

H
Haojun Liao 已提交
1819
  tscTrace("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->uid, pTableMeta->sid, pTableMetaInfo->name);
1820
  free(pTableMeta);
1821
  
H
hjxilinx 已提交
1822
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1823 1824
}

S
slguan 已提交
1825
/**
1826
 *  multi table meta rsp pkg format:
1827
 *  | STaosRsp | ieType | SCMMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
1828 1829 1830
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
1831
#if 0
S
slguan 已提交
1832 1833 1834 1835 1836
  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
1837
    pSql->res.code = TSDB_CODE_TSC_INVALID_IE;
S
slguan 已提交
1838
    pSql->res.numOfTotal = 0;
1839
    return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1840 1841 1842 1843
  }

  rsp++;

1844
  SCMMultiTableInfoMsg *pInfo = (SCMMultiTableInfoMsg *)rsp;
S
slguan 已提交
1845
  totalNum = htonl(pInfo->numOfTables);
1846
  rsp += sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1847 1848

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
1849
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
1850
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
1851 1852 1853

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
H
hjxilinx 已提交
1854
    pMeta->vgId = htonl(pMeta->vgId);
S
slguan 已提交
1855 1856
    pMeta->uid = htobe64(pMeta->uid);

H
hjxilinx 已提交
1857 1858
    if (pMeta->sid <= 0 || pMeta->vgId < 0) {
      tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
1859
      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
S
slguan 已提交
1860
      pSql->res.numOfTotal = i;
1861
      return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1862 1863
    }

H
hjxilinx 已提交
1864 1865 1866 1867
    //    pMeta->numOfColumns = htons(pMeta->numOfColumns);
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid tag value count:%d", pMeta->numOfTags);
1868
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1869
    //      pSql->res.numOfTotal = i;
1870
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1871 1872 1873 1874
    //    }
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid numOfTags:%d", pMeta->numOfTags);
1875
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1876
    //      pSql->res.numOfTotal = i;
1877
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1878 1879 1880 1881
    //    }
    //
    //    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
    //      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
1882
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1883
    //      pSql->res.numOfTotal = i;
1884
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1885 1886
    //    }
    //
H
hjxilinx 已提交
1887
    //    for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) {
H
hjxilinx 已提交
1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921
    //      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    //    }
    //
    //    pMeta->rowSize = 0;
    //    rsp += sizeof(SMultiTableMeta);
    //    pSchema = (SSchema *)rsp;
    //
    //    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    //    for (int j = 0; j < numOfTotalCols; ++j) {
    //      pSchema->bytes = htons(pSchema->bytes);
    //      pSchema->colId = htons(pSchema->colId);
    //
    //      // ignore the tags length
    //      if (j < pMeta->numOfColumns) {
    //        pMeta->rowSize += pSchema->bytes;
    //      }
    //      pSchema++;
    //    }
    //
    //    rsp += numOfTotalCols * sizeof(SSchema);
    //
    //    int32_t  tagLen = 0;
    //    SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
    //
    //    if (pMeta->tableType == TSDB_CHILD_TABLE) {
    //      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
    //        tagLen += pTagsSchema[j].bytes;
    //      }
    //    }
    //
    //    rsp += tagLen;
    //    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
    //
    //    pMeta->index = 0;
H
hjxilinx 已提交
1922
    //    (void)taosCachePut(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1923
    //  }
S
slguan 已提交
1924
  }
H
hjxilinx 已提交
1925
  
S
slguan 已提交
1926 1927
  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
H
Haojun Liao 已提交
1928
  tscTrace("%p load multi-metermeta resp from complete num:%d", pSql, pSql->res.numOfTotal);
1929 1930
#endif
  
S
slguan 已提交
1931 1932 1933
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
1934
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
1935
  SSqlRes* pRes = &pSql->res;
H
hjxilinx 已提交
1936
  
H
hjxilinx 已提交
1937
  // NOTE: the order of several table must be preserved.
1938
  SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pRes->pRsp;
H
hjxilinx 已提交
1939 1940
  pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables);
  char* pMsg = pRes->pRsp + sizeof(SCMSTableVgroupRspMsg);
H
hjxilinx 已提交
1941
  
1942 1943 1944
  // master sqlObj locates in param
  SSqlObj* parent = pSql->param;
  assert(parent != NULL);
H
hjxilinx 已提交
1945
  
1946
  SSqlCmd* pCmd = &parent->cmd;
H
hjxilinx 已提交
1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967
  for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) {
    STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
    SVgroupsInfo *  pVgroupInfo = (SVgroupsInfo *)pMsg;
    pVgroupInfo->numOfVgroups = htonl(pVgroupInfo->numOfVgroups);

    size_t size = sizeof(SCMVgroupInfo) * pVgroupInfo->numOfVgroups + sizeof(SVgroupsInfo);
    pInfo->vgroupList = calloc(1, size);
    assert(pInfo->vgroupList != NULL);

    memcpy(pInfo->vgroupList, pVgroupInfo, size);
    for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
      SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];

      pVgroups->vgId = htonl(pVgroups->vgId);
      assert(pVgroups->numOfIps >= 1);

      for (int32_t k = 0; k < pVgroups->numOfIps; ++k) {
        pVgroups->ipAddr[k].port = htons(pVgroups->ipAddr[k].port);
      }

      pMsg += size;
1968
    }
H
hjxilinx 已提交
1969 1970
  }
  
S
slguan 已提交
1971
  return pSql->res.code;
H
hzcheng 已提交
1972 1973 1974 1975 1976 1977
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
H
hjxilinx 已提交
1978
  STableMetaMsg * pMetaMsg;
1979
  SCMShowRsp *pShow;
S
slguan 已提交
1980
  SSchema *    pSchema;
H
hzcheng 已提交
1981 1982
  char         key[20];

1983 1984 1985
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
1986
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
1987

H
hjxilinx 已提交
1988
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1989

1990
  pShow = (SCMShowRsp *)pRes->pRsp;
S
slguan 已提交
1991
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
1992 1993
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
1994
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
1995
  pMetaMsg = &(pShow->tableMeta);
H
hzcheng 已提交
1996

H
hjxilinx 已提交
1997
  pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1998

H
hjxilinx 已提交
1999
  pSchema = pMetaMsg->schema;
H
hjxilinx 已提交
2000 2001
  pMetaMsg->sid = ntohs(pMetaMsg->sid);
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
hzcheng 已提交
2002 2003 2004 2005
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

2006
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
2007 2008
  strcpy(key + 1, "showlist");

H
hjxilinx 已提交
2009
  taosCacheRelease(tscCacheHandle, (void *)&(pTableMetaInfo->pTableMeta), false);
H
hjxilinx 已提交
2010
  
H
hjxilinx 已提交
2011 2012 2013
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
  
H
hjxilinx 已提交
2014
  pTableMetaInfo->pTableMeta = taosCachePut(tscCacheHandle, key, (char *)pTableMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
2015
  SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2016

2017 2018 2019 2020
  if (pQueryInfo->colList == NULL) {
    pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
  }
  
H
hjxilinx 已提交
2021 2022
  SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
  
S
slguan 已提交
2023
  SColumnIndex index = {0};
H
hjxilinx 已提交
2024 2025 2026
  pSchema = pMetaMsg->schema;
  
  for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) {
S
slguan 已提交
2027
    index.columnIndex = i;
2028 2029
    tscColumnListInsert(pQueryInfo->colList, &index);
    
H
hjxilinx 已提交
2030 2031
    TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes);
    SFieldSupInfo* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
H
hjxilinx 已提交
2032
    
H
hjxilinx 已提交
2033
    pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
2034
                     pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false);
H
hzcheng 已提交
2035
  }
H
hjxilinx 已提交
2036 2037
  
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2038
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
2039 2040
  
  tfree(pTableMeta);
H
hzcheng 已提交
2041 2042 2043 2044
  return 0;
}

int tscProcessConnectRsp(SSqlObj *pSql) {
S
slguan 已提交
2045
  char temp[TSDB_TABLE_ID_LEN * 2];
H
hzcheng 已提交
2046 2047 2048
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

2049
  SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
B
Bomin Zhang 已提交
2050
  tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId));  // copy acctId from response
2051 2052
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

B
Bomin Zhang 已提交
2053 2054
  assert(len <= sizeof(pObj->db));
  tstrncpy(pObj->db, temp, sizeof(pObj->db));
H
hjxilinx 已提交
2055
  
2056 2057
  if (pConnect->ipList.numOfIps > 0) 
    tscSetMgmtIpList(&pConnect->ipList);
H
hzcheng 已提交
2058

S
slguan 已提交
2059
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
2060 2061
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
S
Shengliang Guan 已提交
2062
  pObj->connId = htonl(pConnect->connId);
S
scripts  
slguan 已提交
2063
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
H
hzcheng 已提交
2064 2065 2066 2067 2068

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
2069
  STscObj *       pObj = pSql->pTscObj;
2070
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2071

B
Bomin Zhang 已提交
2072
  tstrncpy(pObj->db, pTableMetaInfo->name, sizeof(pObj->db));
H
hzcheng 已提交
2073 2074 2075 2076
  return 0;
}

int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
2077
  taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2078 2079 2080 2081
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
2082
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2083

H
hjxilinx 已提交
2084 2085
  STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMeta == NULL) {
H
hzcheng 已提交
2086 2087 2088 2089 2090 2091 2092 2093
    /* not in cache, abort */
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
2094 2095
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
2096
   */
2097
  tscTrace("%p force release table meta after drop table:%s", pSql, pTableMetaInfo->name);
H
hjxilinx 已提交
2098
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2099

H
hjxilinx 已提交
2100 2101
  if (pTableMetaInfo->pTableMeta) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2102 2103 2104 2105 2106 2107
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
2108
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2109

H
hjxilinx 已提交
2110 2111
  STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2112 2113 2114
    return 0;
  }

H
hjxilinx 已提交
2115 2116
  tscTrace("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2117

H
hjxilinx 已提交
2118
  if (pTableMetaInfo->pTableMeta) {
weixin_48148422's avatar
weixin_48148422 已提交
2119
    bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
H
hjxilinx 已提交
2120
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2121

2122
    if (isSuperTable) {  // if it is a super table, reset whole query cache
H
hjxilinx 已提交
2123
      tscTrace("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
2124
      taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

2139
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
2140 2141 2142
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
2143
  pRes->data = NULL;
S
slguan 已提交
2144
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2145 2146 2147
  return 0;
}

H
hjxilinx 已提交
2148
int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
S
slguan 已提交
2149 2150 2151
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
2152
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2153 2154 2155

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
2156 2157
  pRes->offset    = htobe64(pRetrieve->offset);
  pRes->useconds  = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
2158
  pRes->completed = (pRetrieve->completed == 1);
2159
  pRes->data      = pRetrieve->data;
H
hjxilinx 已提交
2160
  
2161
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2162 2163 2164 2165
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
    return pRes->code;
  }
  
weixin_48148422's avatar
weixin_48148422 已提交
2166
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
2167
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2168
    
H
hjxilinx 已提交
2169
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
H
hjxilinx 已提交
2170 2171
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
2172 2173
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
2174
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
2175
    p += sizeof(int32_t);
S
slguan 已提交
2176
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
2177 2178
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
weixin_48148422's avatar
weixin_48148422 已提交
2179
      p += sizeof(int32_t); // skip tid
weixin_48148422's avatar
weixin_48148422 已提交
2180 2181
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
2182
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
2183
    }
2184 2185
  }

H
hzcheng 已提交
2186
  pRes->row = 0;
B
Bomin Zhang 已提交
2187
  tscTrace("%p numOfRows:%" PRId64 ", offset:%" PRId64 ", complete:%d", pSql, pRes->numOfRows, pRes->offset, pRes->completed);
H
hzcheng 已提交
2188 2189 2190 2191 2192

  return 0;
}

int tscProcessRetrieveRspFromLocal(SSqlObj *pSql) {
2193 2194
  SSqlRes *   pRes = &pSql->res;
  SSqlCmd *   pCmd = &pSql->cmd;
2195
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2196

S
slguan 已提交
2197
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2198 2199 2200

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->data = pRetrieve->data;
2201

2202
  tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
2203 2204 2205 2206
  pRes->row = 0;
  return 0;
}

H
hjxilinx 已提交
2207
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
H
hzcheng 已提交
2208

2209
static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
S
slguan 已提交
2210 2211
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
2212
    tscError("%p malloc failed for new sqlobj to get table meta", pSql);
2213
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2214
  }
2215

H
hzcheng 已提交
2216 2217 2218
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
2219

2220
  tscAddSubqueryInfo(&pNew->cmd);
2221 2222 2223 2224

  SQueryInfo *pNewQueryInfo = NULL;
  tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo);

H
hjxilinx 已提交
2225
  pNew->cmd.autoCreated = pSql->cmd.autoCreated;  // create table if not exists
B
Bomin Zhang 已提交
2226
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) {
2227
    tscError("%p malloc failed for payload to get table meta", pSql);
S
slguan 已提交
2228
    free(pNew);
2229

2230
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2231 2232
  }

H
hjxilinx 已提交
2233
  STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
2234
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
2235

B
Bomin Zhang 已提交
2236
  tstrncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, sizeof(pNewMeterMetaInfo->name));
B
Bomin Zhang 已提交
2237 2238
  memcpy(pNew->cmd.payload, pSql->cmd.payload, pSql->cmd.payloadLen);  // tag information if table does not exists.
  pNew->cmd.payloadLen = pSql->cmd.payloadLen;
2239
  tscTrace("%p new pSqlObj:%p to get tableMeta, auto create:%d", pSql, pNew, pNew->cmd.autoCreated);
H
hzcheng 已提交
2240

H
hjxilinx 已提交
2241 2242
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
H
hzcheng 已提交
2243

H
hjxilinx 已提交
2244 2245
  int32_t code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2246
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;  // notify upper application that current process need to be terminated
H
hzcheng 已提交
2247 2248 2249 2250 2251
  }

  return code;
}

H
hjxilinx 已提交
2252
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
H
hjxilinx 已提交
2253
  assert(strlen(pTableMetaInfo->name) != 0);
S
slguan 已提交
2254

2255
  // If this STableMetaInfo owns a table meta, release it first
H
hjxilinx 已提交
2256 2257
  if (pTableMetaInfo->pTableMeta != NULL) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), false);
2258 2259
  }
  
H
hjxilinx 已提交
2260 2261
  pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMetaInfo->pTableMeta != NULL) {
H
hjxilinx 已提交
2262
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
2263 2264
    tscTrace("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
             tinfo.numOfTags, pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2265 2266 2267

    return TSDB_CODE_SUCCESS;
  }
2268 2269
  
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2270 2271
}

H
hjxilinx 已提交
2272
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
H
hjxilinx 已提交
2273
  pSql->cmd.autoCreated = createIfNotExists;
H
hjxilinx 已提交
2274
  return tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
2275 2276 2277
}

/**
H
Haojun Liao 已提交
2278
 * retrieve table meta from mnode, and update the local table meta cache.
H
hzcheng 已提交
2279
 * @param pSql          sql object
H
Haojun Liao 已提交
2280
 * @param tableId       table full name
H
hzcheng 已提交
2281 2282
 * @return              status code
 */
H
Haojun Liao 已提交
2283
int tscRenewTableMeta(SSqlObj *pSql, char *tableId) {
H
hzcheng 已提交
2284
  SSqlCmd *pCmd = &pSql->cmd;
2285 2286

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
2287
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
2288

H
Haojun Liao 已提交
2289 2290 2291 2292
  STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
  if (pTableMetaInfo->pTableMeta) {
    tscTrace("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
             tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->uid, pTableMeta);
H
hzcheng 已提交
2293 2294
  }

H
Haojun Liao 已提交
2295 2296
  taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2297 2298
}

H
hjxilinx 已提交
2299
static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
2300
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2301
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2302
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2303 2304
    if (pTableMetaInfo->vgroupList == NULL) {
      return false;
S
slguan 已提交
2305 2306
    }
  }
H
hjxilinx 已提交
2307 2308 2309 2310
  
  // all super tables vgroupinfo are retrieved, no need to retrieve vgroup info anymore
  return true;
}
H
hzcheng 已提交
2311

H
hjxilinx 已提交
2312
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
2313
  int      code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hjxilinx 已提交
2314 2315 2316
  SSqlCmd *pCmd = &pSql->cmd;
  
  if (allVgroupInfoRetrieved(pCmd, clauseIndex)) {
H
hzcheng 已提交
2317 2318
    return TSDB_CODE_SUCCESS;
  }
H
hjxilinx 已提交
2319

S
slguan 已提交
2320
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
2321 2322 2323
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

H
hjxilinx 已提交
2324
  pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
2325 2326
  
  SQueryInfo *pNewQueryInfo = NULL;
2327
  if ((code = tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo)) != TSDB_CODE_SUCCESS) {
B
Bomin Zhang 已提交
2328
    tscFreeSqlObj(pNew);
2329 2330
    return code;
  }
2331
  
H
hjxilinx 已提交
2332
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2333
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2334 2335 2336
    STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
    STableMeta *pTableMeta = taosCacheAcquireByData(tscCacheHandle, pMInfo->pTableMeta);
    tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList);
S
slguan 已提交
2337 2338 2339 2340 2341 2342
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
2343

2344
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
H
hjxilinx 已提交
2345
  tscTrace("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
H
hzcheng 已提交
2346

2347 2348 2349 2350
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
  code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
2351
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2352 2353 2354 2355 2356
  }

  return code;
}

2357
void tscInitMsgsFp() {
S
slguan 已提交
2358 2359
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
J
jtao1735 已提交
2360
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;
H
hzcheng 已提交
2361 2362

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
2363
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
2364

2365 2366
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
2367 2368

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
S
[TD-16]  
slguan 已提交
2369
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserMsg;
H
hzcheng 已提交
2370 2371 2372
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
2373
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
2374 2375 2376
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
2377
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
2378
  tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
H
hzcheng 已提交
2379 2380 2381 2382
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
H
hjxilinx 已提交
2383
  tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
H
hjxilinx 已提交
2384
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
S
slguan 已提交
2385
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
2386 2387 2388 2389

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
2390 2391 2392
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
2393 2394

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
H
hjxilinx 已提交
2395
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
H
hzcheng 已提交
2396 2397 2398 2399 2400

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
2401
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
H
hjxilinx 已提交
2402
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
S
slguan 已提交
2403
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
2404 2405

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
H
hjxilinx 已提交
2406
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode;  // rsp handled by same function.
H
hzcheng 已提交
2407
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
2408

H
Haojun Liao 已提交
2409 2410 2411 2412 2413
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB]   = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION]  = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS]  = tscProcessLocalRetrieveRsp;
2414

H
hzcheng 已提交
2415 2416
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

H
hjxilinx 已提交
2417
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp;
H
hzcheng 已提交
2418 2419 2420 2421 2422 2423 2424 2425 2426 2427

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;
}