tscServer.c 85.4 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17 18 19 20
#include "tcache.h"
#include "trpc.h"
#include "tscProfile.h"
#include "tscSecondaryMerge.h"
H
hjxilinx 已提交
21
#include "tscSubquery.h"
H
hzcheng 已提交
22 23 24 25 26 27 28
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "tsocket.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
S
slguan 已提交
29
#include "tscLog.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
30
#include "qsqltype.h"
H
hzcheng 已提交
31 32 33

#define TSC_MGMT_VNODE 999

S
slguan 已提交
34
SRpcIpSet  tscMgmtIpSet;
S
slguan 已提交
35 36
SRpcIpSet  tscDnodeIpSet;

37 38
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
39 40 41
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
42

B
Bomin Zhang 已提交
43
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt);
44 45
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
46

S
slguan 已提交
47
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
hzcheng 已提交
48

49
static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
50 51
  SRpcIpSet* pIpList = &pSql->ipList;
  
52
  pIpList->numOfIps = pVgroupInfo->numOfIps;
53 54
  pIpList->inUse    = 0;
  
55 56 57
  for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) {
    strcpy(pIpList->fqdn[i], pVgroupInfo->ipAddr[i].fqdn);
    pIpList->port[i] = pVgroupInfo->ipAddr[i].port;
58 59 60
  }
}

S
slguan 已提交
61
void tscPrintMgmtIp() {
S
slguan 已提交
62
  if (tscMgmtIpSet.numOfIps <= 0) {
S
Shengliang Guan 已提交
63
    tscError("invalid mnode IP list:%d", tscMgmtIpSet.numOfIps);
S
slguan 已提交
64
  } else {
S
slguan 已提交
65
    for (int i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
S
Shengliang Guan 已提交
66
      tscTrace("mnode index:%d %s:%d", i, tscMgmtIpSet.fqdn[i], tscMgmtIpSet.port[i]);
S
slguan 已提交
67
    }
S
slguan 已提交
68 69 70
  }
}

71
void tscSetMgmtIpList(SRpcIpSet *pIpList) {
S
slguan 已提交
72 73 74
  tscMgmtIpSet.numOfIps = pIpList->numOfIps;
  tscMgmtIpSet.inUse = pIpList->inUse;
  for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
J
jtao1735 已提交
75
    tscMgmtIpSet.port[i] = htons(pIpList->port[i]);
S
slguan 已提交
76 77 78
  }
}

S
slguan 已提交
79
void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) {
S
slguan 已提交
80
  tscMgmtIpSet = *pIpSet;
S
Shengliang Guan 已提交
81
  tscTrace("mnode IP list is changed for ufp is called, numOfIps:%d inUse:%d", tscMgmtIpSet.numOfIps, tscMgmtIpSet.inUse);
S
slguan 已提交
82 83 84
  for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
    tscTrace("index:%d fqdn:%s port:%d", i, tscMgmtIpSet.fqdn[i], tscMgmtIpSet.port[i]);
  }
S
slguan 已提交
85 86
}

H
hjxilinx 已提交
87 88 89 90 91 92 93
/*
 * For each management node, try twice at least in case of poor network situation.
 * If the client start to connect to a non-management node from the client, and the first retry may fail due to
 * the poor network quality. And then, the second retry get the response with redirection command.
 * The retry will not be executed since only *two* retry is allowed in case of single management node in the cluster.
 * Therefore, we need to multiply the retry times by factor of 2 to fix this problem.
 */
94
UNUSED_FUNC
H
hjxilinx 已提交
95 96
static int32_t tscGetMgmtConnMaxRetryTimes() {
  int32_t factor = 2;
S
slguan 已提交
97
  return tscMgmtIpSet.numOfIps * factor;
H
hjxilinx 已提交
98 99
}

H
hzcheng 已提交
100 101 102 103 104 105 106 107 108 109 110 111
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
  if (pObj != pObj->signature) {
    tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
    return;
  }

  SSqlObj *pSql = pObj->pHb;
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
112
    SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
S
slguan 已提交
113
    SRpcIpSet *      pIpList = &pRsp->ipList;
114 115
    if (pIpList->numOfIps > 0) 
      tscSetMgmtIpList(pIpList);
S
slguan 已提交
116

S
Shengliang Guan 已提交
117 118
    pSql->pTscObj->connId = htonl(pRsp->connId);

H
hzcheng 已提交
119 120 121
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
    } else {
S
slguan 已提交
122 123
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
124 125
    }
  } else {
H
hjxilinx 已提交
126
    tscTrace("heart beat failed, code:%s", tstrerror(code));
H
hzcheng 已提交
127 128 129 130 131 132 133 134 135 136 137 138 139
  }

  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;

  if (pObj == NULL) return;
  if (pObj->signature != pObj) return;
  if (pObj->pTimer != tmrId) return;

  if (pObj->pHb == NULL) {
S
slguan 已提交
140 141 142
    SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
    if (NULL == pSql) return;

H
hzcheng 已提交
143
    pSql->fp = tscProcessHeartBeatRsp;
H
hjxilinx 已提交
144
    
145 146 147 148
    SQueryInfo *pQueryInfo = NULL;
    tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
    pQueryInfo->command = TSDB_SQL_HB;
    
149
    pSql->cmd.command = TSDB_SQL_HB;
S
slguan 已提交
150 151 152 153 154
    if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
      tfree(pSql);
      return;
    }

155
    pSql->cmd.command = TSDB_SQL_HB;
H
hzcheng 已提交
156 157 158 159
    pSql->param = pObj;
    pSql->pTscObj = pObj;
    pSql->signature = pSql;
    pObj->pHb = pSql;
160
    tscAddSubqueryInfo(&pObj->pHb->cmd);
161

S
slguan 已提交
162
    tscTrace("%p pHb is allocated, pObj:%p", pObj->pHb, pObj);
H
hzcheng 已提交
163 164 165
  }

  if (tscShouldFreeHeatBeat(pObj->pHb)) {
S
slguan 已提交
166
    tscTrace("%p free HB object and release connection", pObj);
H
hzcheng 已提交
167 168 169 170 171 172 173 174 175
    tscFreeSqlObj(pObj->pHb);
    tscCloseTscObj(pObj);
    return;
  }

  tscProcessSql(pObj->pHb);
}

int tscSendMsgToServer(SSqlObj *pSql) {
176
  STscObj* pObj = pSql->pTscObj;
H
hjxilinx 已提交
177 178 179
  SSqlCmd* pCmd = &pSql->cmd;
  
  char *pMsg = rpcMallocCont(pCmd->payloadLen);
S
slguan 已提交
180
  if (NULL == pMsg) {
S
slguan 已提交
181 182
    tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
H
hzcheng 已提交
183 184
  }

S
slguan 已提交
185
  if (pSql->cmd.command < TSDB_SQL_MGMT) {
S
slguan 已提交
186
    memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen);
J
jtao1735 已提交
187 188 189 190 191
  } else {
    pSql->ipList = tscMgmtIpSet;
    memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
192
  // tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]);
193

J
jtao1735 已提交
194
  SRpcMsg rpcMsg = {
195 196 197 198
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
      .contLen = pSql->cmd.payloadLen,
      .handle  = pSql,
H
hjxilinx 已提交
199
      .code    = 0
J
jtao1735 已提交
200
  };
201
  rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg);
H
hzcheng 已提交
202

S
slguan 已提交
203
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
204 205
}

J
jtao1735 已提交
206
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
207
  SSqlObj *pSql = (SSqlObj *)rpcMsg->handle;
208 209 210 211 212
  if (pSql == NULL) {
    tscError("%p sql is already released", pSql->signature);
    return;
  }
  if (pSql->signature != pSql) {
H
hzcheng 已提交
213
    tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
S
slguan 已提交
214
    return;
H
hzcheng 已提交
215 216
  }

S
slguan 已提交
217 218 219
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
  STscObj *pObj = pSql->pTscObj;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
220
  // tscTrace("%p msg:%s is received from server", pSql, taosMsg[rpcMsg->msgType]);
H
hzcheng 已提交
221

222
  if (pObj->signature != pObj) {
S
slguan 已提交
223 224
    tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed,
             pObj, pObj->signature);
H
Haojun Liao 已提交
225
    tscFreeSqlObj(pSql);
226
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
227
    return;
H
hzcheng 已提交
228 229
  }

J
jtao1735 已提交
230 231 232 233 234 235
  if (pCmd->command < TSDB_SQL_MGMT) {
    if (pIpSet) pSql->ipList = *pIpSet;
  } else {
    if (pIpSet) tscMgmtIpSet = *pIpSet;
  }

236 237
  if (rpcMsg->pCont == NULL) {
    rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
S
slguan 已提交
238
  } else {
239
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
240 241
    if (rpcMsg->code == TSDB_CODE_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_INVALID_VGROUP_ID || 
        rpcMsg->code == TSDB_CODE_NETWORK_UNAVAIL) {
S
slguan 已提交
242 243 244 245 246 247 248 249 250 251 252
      /*
       * not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized,
       *                   the virtual node may have not create table till now, so try again by using the new metermeta.
       *                   2. this requested table may have been removed by other client, so we need to renew the
       *                   metermeta here.
       *
       * not_active_vnode: current vnode is move to other node due to node balance procedure or virtual node have been
       *                   removed. So, renew metermeta and try again.
       * not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore.
       */
      if (pCmd->command == TSDB_SQL_CONNECT) {
253 254
        rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
        rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
255 256
        return;
      } else if (pCmd->command == TSDB_SQL_HB) {
257 258
        rpcMsg->code = TSDB_CODE_NOT_READY;
        rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
259
        return;
H
Haojun Liao 已提交
260 261 262
      } else if (pCmd->command == TSDB_SQL_META) {
//        rpcFreeCont(rpcMsg->pCont);
//        return;
S
slguan 已提交
263
      } else {
264
        tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);
265
        
266
        pSql->res.code = rpcMsg->code;  // keep the previous error code
267 268 269 270 271 272 273 274
        if (pSql->retry > pSql->maxRetry) {
          tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
        } else {
          rpcMsg->code = tscRenewMeterMeta(pSql, pTableMetaInfo->name);
          if (pTableMetaInfo->pTableMeta) {
            tscSendMsgToServer(pSql);
          }
  
275
          rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
276 277
          return;
        }
H
hzcheng 已提交
278 279
      }
    }
S
slguan 已提交
280
  }
281
  
H
hzcheng 已提交
282
  pRes->rspLen = 0;
283
  
H
hzcheng 已提交
284
  if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
285
    pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_NETWORK_UNAVAIL;
H
hzcheng 已提交
286
  } else {
H
hjxilinx 已提交
287
    tscTrace("%p query is cancelled, code:%d", pSql, tstrerror(pRes->code));
H
hzcheng 已提交
288 289
  }

S
slguan 已提交
290 291 292 293 294
  if (pRes->code == TSDB_CODE_SUCCESS) {
    tscTrace("%p reset retry counter to be 0 due to success rsp, old:%d", pSql, pSql->retry);
    pSql->retry = 0;
  }

S
slguan 已提交
295
  if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
296
    assert(rpcMsg->msgType == pCmd->msgType + 1);
297
    pRes->code    = rpcMsg->code;
298
    pRes->rspType = rpcMsg->msgType;
299
    pRes->rspLen  = rpcMsg->contLen;
H
hzcheng 已提交
300

301
    if (pRes->rspLen > 0 && rpcMsg->pCont) {
302 303 304 305 306
      char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
      if (tmp == NULL) {
        pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      } else {
        pRes->pRsp = tmp;
307
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
308
      }
309 310
    } else {
      pRes->pRsp = NULL;
S
slguan 已提交
311 312
    }

H
hzcheng 已提交
313 314 315 316
    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
317
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
318
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
319 320 321 322 323 324 325
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
326 327
      tscTrace("%p SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql, sqlCmd[pCmd->command], 
          tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
S
slguan 已提交
328
    } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
329
      tscTrace("%p SQL cmd:%s, code:%s rspLen:%d", pSql, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
H
hzcheng 已提交
330 331
    }
  }
332
  
333 334
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command])
    rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
hjxilinx 已提交
335
  
336
  if (rpcMsg->code != TSDB_CODE_ACTION_IN_PROGRESS) {
H
Haojun Liao 已提交
337
    rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? pRes->numOfRows: pRes->code;
338
    
H
hjxilinx 已提交
339
    bool shouldFree = tscShouldBeFreed(pSql);
H
Haojun Liao 已提交
340
    (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
H
hzcheng 已提交
341

342
    if (shouldFree) {
343
      tscTrace("%p sqlObj is automatically freed", pSql);
sangshuduo's avatar
sangshuduo 已提交
344
      tscFreeSqlObj(pSql);
H
hzcheng 已提交
345 346 347
    }
  }

348
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
349 350
}

S
slguan 已提交
351 352 353
int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
354 355
  int32_t code = TSDB_CODE_SUCCESS;
  
H
hjxilinx 已提交
356 357 358 359 360 361 362
  if (pCmd->command == TSDB_SQL_SELECT ||
      pCmd->command == TSDB_SQL_FETCH ||
      pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT ||
      pCmd->command == TSDB_SQL_CONNECT ||
      pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META ||
H
hjxilinx 已提交
363
      pCmd->command == TSDB_SQL_STABLEVGROUP) {
364 365 366 367 368 369
    pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
  }
  
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return pRes->code;
S
slguan 已提交
370
  }
371

372
  code = tscSendMsgToServer(pSql);
373
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
374
    pRes->code = code;
H
hjxilinx 已提交
375
    tscQueueAsyncRes(pSql);
S
slguan 已提交
376
  }
H
hjxilinx 已提交
377 378
  
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
379 380 381
}

int tscProcessSql(SSqlObj *pSql) {
382 383
  char *   name = NULL;
  SSqlCmd *pCmd = &pSql->cmd;
384 385
  
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
386
  STableMetaInfo *pTableMetaInfo = NULL;
387
  uint16_t        type = 0;
388

389
  if (pQueryInfo != NULL) {
H
hjxilinx 已提交
390
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
391 392
    if (pTableMetaInfo != NULL) {
      name = pTableMetaInfo->name;
393
    }
394

395
    type = pQueryInfo->type;
396
  
H
hjxilinx 已提交
397
    // while numOfTables equals to 0, it must be Heartbeat
H
hjxilinx 已提交
398
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
399
  }
400

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
401
  tscTrace("%p SQL cmd:%s will be processed, name:%s, type:%d", pSql, sqlCmd[pCmd->command], name, type);
H
hjxilinx 已提交
402
  if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL
H
hjxilinx 已提交
403
    if (pTableMetaInfo == NULL) {
H
hjxilinx 已提交
404 405 406
      pSql->res.code = TSDB_CODE_OTHERS;
      return pSql->res.code;
    }
H
hjxilinx 已提交
407 408
  } else if (pCmd->command < TSDB_SQL_LOCAL) {
    pSql->ipList = tscMgmtIpSet; //?
H
hzcheng 已提交
409 410 411
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }
412
  
S
slguan 已提交
413 414
  return doProcessSql(pSql);
}
H
hzcheng 已提交
415

H
hjxilinx 已提交
416
void tscKillSTableQuery(SSqlObj *pSql) {
417 418 419
  SSqlCmd* pCmd = &pSql->cmd;
  
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
420
  if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
H
hzcheng 已提交
421 422 423 424 425 426
    return;
  }

  for (int i = 0; i < pSql->numOfSubs; ++i) {
    SSqlObj *pSub = pSql->pSubs[i];

S
slguan 已提交
427
    if (pSub == NULL) {
H
hzcheng 已提交
428 429
      continue;
    }
S
slguan 已提交
430

H
hzcheng 已提交
431 432 433 434 435
    /*
     * here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
     * sub-queries not correctly released and master sql object of metric query reaches an abnormal state.
     */
    pSql->pSubs[i]->res.code = TSDB_CODE_QUERY_CANCELLED;
S
slguan 已提交
436
    //taosStopRpcConn(pSql->pSubs[i]->thandle);
H
hzcheng 已提交
437 438 439 440 441 442 443 444 445 446 447
  }

  /*
   * 1. if the subqueries are not launched or partially launched, we need to waiting the launched
   * query return to successfully free allocated resources.
   * 2. if no any subqueries are launched yet, which means the metric query only in parse sql stage,
   * set the res.code, and return.
   */
  const int64_t MAX_WAITING_TIME = 10000;  // 10 Sec.
  int64_t       stime = taosGetTimestampMs();

H
hjxilinx 已提交
448
  while (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
H
hzcheng 已提交
449 450 451 452 453 454
    taosMsleep(100);
    if (taosGetTimestampMs() - stime > MAX_WAITING_TIME) {
      break;
    }
  }

H
hjxilinx 已提交
455
  tscTrace("%p super table query cancelled", pSql);
H
hzcheng 已提交
456 457
}

J
jtao1735 已提交
458
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
459 460 461 462 463
  char *pMsg, *pStart;

  pStart = pSql->cmd.payload + tsRpcHeadSize;
  pMsg = pStart;

464
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *)pMsg;
S
slguan 已提交
465
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
S
slguan 已提交
466 467
  pMsg += sizeof(pSql->res.qhandle);

468
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
S
slguan 已提交
469
  pRetrieveMsg->free = htons(pQueryInfo->type);
470
  pMsg += sizeof(pQueryInfo->type);
H
hzcheng 已提交
471

472
  // todo valid the vgroupId at the client side
H
hjxilinx 已提交
473 474
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  
weixin_48148422's avatar
weixin_48148422 已提交
475
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
476
    int32_t vgIndex = pTableMetaInfo->vgroupIndex;
477
    
H
hjxilinx 已提交
478 479
    SVgroupsInfo* pVgroupInfo = pTableMetaInfo->vgroupList;
    pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
480
  } else {
H
hjxilinx 已提交
481
    STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
482
    pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
483 484
  }
  
485 486 487 488
  pMsg += sizeof(SRetrieveTableMsg);
  
  pRetrieveMsg->header.contLen = htonl(pSql->cmd.payloadLen);
  
J
jtao1735 已提交
489
  pSql->cmd.msgType = TSDB_MSG_TYPE_FETCH;
490
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
491 492
}

493
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hjxilinx 已提交
494
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
495
  STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
H
hjxilinx 已提交
496
  
497 498 499 500
  char* pMsg = pSql->cmd.payload + tsRpcHeadSize;
  
  // NOTE: shell message size should not include SMsgDesc
  int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
H
hjxilinx 已提交
501
  
502
  SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
503 504
  
  pMsgDesc->numOfVnodes = htonl(1);       //todo set the right number of vnodes
505
  pMsg += sizeof(SMsgDesc);
H
hjxilinx 已提交
506
  
507
  SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
H
hjxilinx 已提交
508
  int32_t vgId = pTableMeta->vgroupInfo.vgId;
509
  
H
hjxilinx 已提交
510
  pShellMsg->header.vgId = htonl(vgId);
511
  pShellMsg->header.contLen = htonl(size);
512
  pShellMsg->length = pShellMsg->header.contLen;
H
hjxilinx 已提交
513
  
514
  pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit);  // number of meters to be inserted
H
hzcheng 已提交
515

H
hjxilinx 已提交
516
  // pSql->cmd.payloadLen is set during copying data into payload
S
slguan 已提交
517
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
518
  tscSetDnodeIpList(pSql, &pTableMeta->vgroupInfo);
519
  
S
slguan 已提交
520
  tscTrace("%p build submit msg, vgId:%d numOfVgroup:%d numberOfIP:%d", pSql, vgId, htonl(pMsgDesc->numOfVnodes), pSql->ipList.numOfIps);
521
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
522 523 524
}

/*
525
 * for table query, simply return the size <= 1k
H
hzcheng 已提交
526
 */
527
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
528
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
529
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
530

531
  int32_t srcColListSize = taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo);
H
hjxilinx 已提交
532 533 534 535
  
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
  int32_t exprSize = sizeof(SSqlFuncMsg) * numOfExprs;
  
536
  return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + 4096;
H
hzcheng 已提交
537 538
}

539
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
540
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
B
Bomin Zhang 已提交
541
  TSKEY dfltKey = htobe64(pQueryMsg->window.skey);
542

H
hjxilinx 已提交
543
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
weixin_48148422's avatar
weixin_48148422 已提交
544
  if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
545 546
    
    SCMVgroupInfo* pVgroupInfo = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
547
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
548 549
      int32_t index = pTableMetaInfo->vgroupIndex;
      assert(index >= 0);
H
hjxilinx 已提交
550
  
551 552
      pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
      tscTrace("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups);
H
hjxilinx 已提交
553 554
    } else {
      pVgroupInfo = &pTableMeta->vgroupInfo;
555
    }
weixin_48148422's avatar
weixin_48148422 已提交
556

557 558
    tscSetDnodeIpList(pSql, pVgroupInfo);
    pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
weixin_48148422's avatar
weixin_48148422 已提交
559

560 561 562
    STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
    pTableIdInfo->tid = htonl(pTableMeta->sid);
    pTableIdInfo->uid = htobe64(pTableMeta->uid);
B
Bomin Zhang 已提交
563
    pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid, dfltKey));
weixin_48148422's avatar
weixin_48148422 已提交
564

565 566 567 568 569 570
    pQueryMsg->numOfTables = htonl(1);  // set the number of tables
    pMsg += sizeof(STableIdInfo);
  } else {
    int32_t index = pTableMetaInfo->vgroupIndex;
    int32_t numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables);
    assert(index >= 0 && index < numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
571

572
    tscTrace("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
573

574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589
    SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
    
    // set the vgroup info
    tscSetDnodeIpList(pSql, &pTableIdList->vgInfo);
    pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
    
    int32_t numOfTables = taosArrayGetSize(pTableIdList->itemList);
    pQueryMsg->numOfTables = htonl(numOfTables);  // set the number of tables
  
    // serialize each table id info
    for(int32_t i = 0; i < numOfTables; ++i) {
      STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i);
      
      STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
      pTableIdInfo->tid = htonl(pItem->tid);
      pTableIdInfo->uid = htobe64(pItem->uid);
B
Bomin Zhang 已提交
590
      pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pItem->uid, dfltKey));
591 592 593 594
      pMsg += sizeof(STableIdInfo);
    }
  }
  
H
Haojun Liao 已提交
595 596
  tscTrace("%p vgId:%d, query on table:%s, tid:%d, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name,
      pTableMeta->sid, pTableMeta->uid);
H
hjxilinx 已提交
597
  
598 599 600
  return pMsg;
}

601
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
602 603
  SSqlCmd *pCmd = &pSql->cmd;

604
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
605

S
slguan 已提交
606 607
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
H
Haojun Liao 已提交
608
    return -1;  // todo add test for this
S
slguan 已提交
609
  }
610
  
611
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
612
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
613
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
614
  
H
hjxilinx 已提交
615
  if (taosArrayGetSize(pQueryInfo->colList) <= 0 && !tscQueryTags(pQueryInfo)) {
616 617 618
    tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
    return -1;
  }
619 620 621 622 623 624 625 626 627 628
  
  if (pQueryInfo->intervalTime < 0) {
    tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
    return -1;
  }
  
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
    return -1;
  }
629 630

  char *pStart = pCmd->payload + tsRpcHeadSize;
H
hzcheng 已提交
631

S
slguan 已提交
632
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart;
H
hzcheng 已提交
633

634
  int32_t numOfTags = taosArrayGetSize(pTableMetaInfo->tagColList);
635
  
636
  if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
H
hjxilinx 已提交
637 638
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey);
H
hzcheng 已提交
639
  } else {
H
hjxilinx 已提交
640 641
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.ekey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey);
H
hzcheng 已提交
642 643
  }

644 645
  pQueryMsg->order          = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId     = htons(pQueryInfo->order.orderColId);
H
Haojun Liao 已提交
646
  pQueryMsg->fillType       = htons(pQueryInfo->fillType);
647 648
  pQueryMsg->limit          = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset         = htobe64(pQueryInfo->limit.offset);
649
  pQueryMsg->numOfCols      = htons(taosArrayGetSize(pQueryInfo->colList));
650 651
  pQueryMsg->intervalTime   = htobe64(pQueryInfo->intervalTime);
  pQueryMsg->slidingTime    = htobe64(pQueryInfo->slidingTime);
H
hjxilinx 已提交
652
  pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
653
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
654
  pQueryMsg->numOfTags      = htonl(numOfTags);
weixin_48148422's avatar
weixin_48148422 已提交
655
  pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
656
  pQueryMsg->queryType      = htons(pQueryInfo->type);
H
hjxilinx 已提交
657 658 659
  
  size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
  pQueryMsg->numOfOutput = htons(numOfOutput);
H
hjxilinx 已提交
660 661
  if (numOfOutput < 0) {
    tscError("%p illegal value of number of output columns in query msg: %d", pSql, numOfOutput);
H
hzcheng 已提交
662 663 664 665
    return -1;
  }

  // set column list ids
666 667
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
H
hjxilinx 已提交
668
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
669
  
670 671 672
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
    SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
673

674
    if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
675 676 677 678 679
        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
      tscError("%p sid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
          pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex,
               pColSchema->name);

680
      return TSDB_CODE_INVALID_SQL;
681
    }
H
hzcheng 已提交
682 683 684

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
685
    pQueryMsg->colList[i].type  = htons(pColSchema->type);
S
slguan 已提交
686
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
687

S
slguan 已提交
688 689 690
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
691

S
slguan 已提交
692
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
693
      pFilterMsg->filterstr = htons(pColFilter->filterstr);
S
slguan 已提交
694 695

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
696

697
      if (pColFilter->filterstr) {
S
slguan 已提交
698 699 700 701 702 703 704 705 706 707
        pFilterMsg->len = htobe64(pColFilter->len);
        memcpy(pMsg, (void *)pColFilter->pz, pColFilter->len + 1);
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
708

S
slguan 已提交
709 710 711 712 713
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
        return -1;
      }
    }
H
hzcheng 已提交
714 715
  }

H
hjxilinx 已提交
716
  SSqlFuncMsg *pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hjxilinx 已提交
717
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
718
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
719

H
hjxilinx 已提交
720
    if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId)) {
721
      /* column id is not valid according to the cached table meta, the table meta is expired */
H
hzcheng 已提交
722 723 724 725
      tscError("%p table schema is not matched with parsed sql", pSql);
      return -1;
    }

726 727 728
    pSqlFuncExpr->colInfo.colId    = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
    pSqlFuncExpr->colInfo.flag     = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
729

730
    pSqlFuncExpr->functionId  = htons(pExpr->functionId);
H
hzcheng 已提交
731
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
H
hjxilinx 已提交
732
    pMsg += sizeof(SSqlFuncMsg);
H
hzcheng 已提交
733 734 735 736 737 738 739

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
740
        pMsg += pExpr->param[j].nLen;
H
hzcheng 已提交
741 742 743 744 745
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

H
hjxilinx 已提交
746
    pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hzcheng 已提交
747
  }
748
  
749
  // serialize the table info (sid, uid, tags)
750 751
  pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
  
752
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
753
  if (pGroupbyExpr->numOfGroupCols > 0) {
S
slguan 已提交
754
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
755 756
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
757
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
758 759
      SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
  
S
slguan 已提交
760 761 762
      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

763 764
      *((int16_t *)pMsg) += pCol->colIndex;
      pMsg += sizeof(pCol->colIndex);
S
slguan 已提交
765 766 767

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
768 769 770
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
771 772 773
    }
  }

774
  if (pQueryInfo->fillType != TSDB_FILL_NONE) {
H
hjxilinx 已提交
775
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
776 777
      *((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]);
      pMsg += sizeof(pQueryInfo->fillVal[0]);
S
slguan 已提交
778 779
    }
  }
780 781 782 783 784 785 786 787 788
  
  if (numOfTags != 0) {
    int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
    int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
    int32_t total = numOfTagColumns + numOfColumns;
    
    pSchema = tscGetTableTagSchema(pTableMeta);
    
    for (int32_t i = 0; i < numOfTags; ++i) {
H
hjxilinx 已提交
789
      SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810
      SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];

      if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
          (pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
        tscError("%p sid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
                 pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, total, numOfTagColumns,
                 pCol->colIndex, pColSchema->name);

        return TSDB_CODE_INVALID_SQL;
      }
  
      SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
  
      pTagCol->colId = htons(pColSchema->colId);
      pTagCol->bytes = htons(pColSchema->bytes);
      pTagCol->type  = htons(pColSchema->type);
      pTagCol->numOfFilters = 0;
      
      pMsg += sizeof(SColumnInfo);
    }
  }
S
slguan 已提交
811 812 813 814 815 816

  // compressed ts block
  pQueryMsg->tsOffset = htonl(pMsg - pStart);
  int32_t tsLen = 0;
  int32_t numOfBlocks = 0;

817
  if (pQueryInfo->tsBuf != NULL) {
H
hjxilinx 已提交
818
    STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vgroupIndex);
819
    assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL);  // this query should not be sent
S
slguan 已提交
820 821

    // todo refactor
822 823
    fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET);
    fread(pMsg, pBlockInfo->compLen, 1, pQueryInfo->tsBuf->f);
S
slguan 已提交
824 825 826 827

    pMsg += pBlockInfo->compLen;
    tsLen = pBlockInfo->compLen;
    numOfBlocks = pBlockInfo->numOfBlocks;
H
hzcheng 已提交
828 829
  }

S
slguan 已提交
830 831
  pQueryMsg->tsLen = htonl(tsLen);
  pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
832 833
  if (pQueryInfo->tsBuf != NULL) {
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
H
hzcheng 已提交
834 835
  }

H
hjxilinx 已提交
836
  // serialize tag column query condition
837
  if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
838 839
    STagCond* pTagCond = &pQueryInfo->tagCond;
    
840
    SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->uid);
841
    if (pCond != NULL && pCond->cond != NULL) {
842 843
      pQueryMsg->tagCondLen = htons(pCond->len);
      memcpy(pMsg, pCond->cond, pCond->len);
844
      
845
      pMsg += pCond->len;
846 847 848
    }
  }
  
weixin_48148422's avatar
weixin_48148422 已提交
849 850 851 852 853 854 855 856
  if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
    *pMsg = 0;
    pMsg++;
  } else {
    strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
    pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
  }

857
  int32_t msgLen = pMsg - pStart;
H
hzcheng 已提交
858 859 860

  tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
  pCmd->payloadLen = msgLen;
S
slguan 已提交
861
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hjxilinx 已提交
862
  
863
  pQueryMsg->head.contLen = htonl(msgLen);
H
hzcheng 已提交
864
  assert(msgLen + minMsgSize() <= size);
865 866

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
867 868
}

869 870
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
871
  pCmd->payloadLen = sizeof(SCMCreateDbMsg);
S
slguan 已提交
872
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
H
hzcheng 已提交
873

874
  SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload;
875

876
  assert(pCmd->numOfClause == 1);
877
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
878
  strncpy(pCreateDbMsg->db, pTableMetaInfo->name, tListLen(pCreateDbMsg->db));
H
hzcheng 已提交
879

880
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
881 882
}

883 884
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
885
  pCmd->payloadLen = sizeof(SCMCreateDnodeMsg);
S
slguan 已提交
886 887 888 889
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
890

891
  SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload;
J
jtao1735 已提交
892 893
  strncpy(pCreate->ep, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
  
S
slguan 已提交
894
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
H
hzcheng 已提交
895

896
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
897 898
}

899 900
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
901
  pCmd->payloadLen = sizeof(SCMCreateAcctMsg);
S
slguan 已提交
902 903 904 905
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
906

907
  SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
908

909 910
  SSQLToken *pName = &pInfo->pDCLInfo->user.user;
  SSQLToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
911

912 913
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
914

915
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
916

917 918 919 920 921 922 923 924
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
925

926 927 928 929 930 931 932 933 934 935 936 937 938
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
939

S
slguan 已提交
940
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
941
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
942 943
}

944 945
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
946
  pCmd->payloadLen = sizeof(SCMCreateUserMsg);
S
slguan 已提交
947

S
slguan 已提交
948 949 950 951 952
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

953
  SCMCreateUserMsg *pAlterMsg = (SCMCreateUserMsg*)pCmd->payload;
H
hzcheng 已提交
954

955 956 957
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
  pAlterMsg->flag = pUser->type;
H
hzcheng 已提交
958

959 960 961 962
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
963 964
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
965
  }
H
hzcheng 已提交
966

967
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
S
slguan 已提交
968
    pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
969
  } else {
S
slguan 已提交
970
    pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
971
  }
H
hzcheng 已提交
972

973
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
974 975
}

976 977
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
978
  pCmd->payloadLen = sizeof(SCMCfgDnodeMsg);
guanshengliang's avatar
guanshengliang 已提交
979
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONFIG_DNODE;
980 981
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
982

983 984
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
985
  pCmd->payloadLen = sizeof(SCMDropDbMsg);
H
hzcheng 已提交
986

S
slguan 已提交
987 988 989 990 991
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

992
  SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
993

994
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
995
  strncpy(pDropDbMsg->db, pTableMetaInfo->name, tListLen(pDropDbMsg->db));
996
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
997

S
slguan 已提交
998
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
999
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1000 1001
}

1002 1003
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1004
  pCmd->payloadLen = sizeof(SCMDropTableMsg);
H
hzcheng 已提交
1005

S
slguan 已提交
1006 1007 1008 1009
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1010

1011
  SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
1012
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1013
  strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
S
slguan 已提交
1014
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1015

S
slguan 已提交
1016
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
1017
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1018 1019
}

1020
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1021
  SSqlCmd *pCmd = &pSql->cmd;
1022
  pCmd->payloadLen = sizeof(SCMDropDnodeMsg);
S
slguan 已提交
1023 1024 1025 1026
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1027

1028
  SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
1029
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
J
jtao1735 已提交
1030
  strcpy(pDrop->ep, pTableMetaInfo->name);
S
slguan 已提交
1031
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
H
hzcheng 已提交
1032

1033
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1034 1035
}

S
[TD-16]  
slguan 已提交
1036
int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1037
  SSqlCmd *pCmd = &pSql->cmd;
1038
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
S
slguan 已提交
1039
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
H
hzcheng 已提交
1040

S
slguan 已提交
1041 1042 1043 1044
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
H
hzcheng 已提交
1045

1046
  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
1047
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1048
  strcpy(pDropMsg->user, pTableMetaInfo->name);
H
hzcheng 已提交
1049

1050
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1051 1052
}

S
[TD-16]  
slguan 已提交
1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_ACCT;

  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
  strcpy(pDropMsg->user, pTableMetaInfo->name);

  return TSDB_CODE_SUCCESS;
}

1070 1071
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1072
  pCmd->payloadLen = sizeof(SCMUseDbMsg);
H
hzcheng 已提交
1073

S
slguan 已提交
1074 1075 1076 1077
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
1078

1079
  SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload;
1080
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1081
  strcpy(pUseDbMsg->db, pTableMetaInfo->name);
S
slguan 已提交
1082
  pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
H
hzcheng 已提交
1083

1084
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1085 1086
}

1087
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1088
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1089
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1090
  pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
1091
  pCmd->payloadLen = sizeof(SCMShowMsg) + 100;
H
hzcheng 已提交
1092

S
slguan 已提交
1093 1094 1095
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
1096
  }
H
hzcheng 已提交
1097

1098
  SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload;
S
slguan 已提交
1099

1100
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1101
  size_t nameLen = strlen(pTableMetaInfo->name);
S
slguan 已提交
1102
  if (nameLen > 0) {
H
hjxilinx 已提交
1103
    strcpy(pShowMsg->db, pTableMetaInfo->name);  // prefix is set here
H
hzcheng 已提交
1104
  } else {
S
slguan 已提交
1105
    strcpy(pShowMsg->db, pObj->db);
H
hzcheng 已提交
1106 1107
  }

1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
    SSQLToken *pPattern = &pShowInfo->pattern;
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
    SSQLToken *pIpAddr = &pShowInfo->prefix;
    assert(pIpAddr->n > 0 && pIpAddr->type > 0);
H
hzcheng 已提交
1120

1121 1122 1123 1124
    strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n);
    pShowMsg->payloadLen = htons(pIpAddr->n);
  }

1125
  pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
1126
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1127 1128
}

1129
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1130
  SSqlCmd *pCmd = &pSql->cmd;
1131
  pCmd->payloadLen = sizeof(SCMKillQueryMsg);
H
hzcheng 已提交
1132

1133 1134
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
S
slguan 已提交
1135
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
1136 1137
      break;
    case TSDB_SQL_KILL_CONNECTION:
S
slguan 已提交
1138
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
1139 1140
      break;
    case TSDB_SQL_KILL_STREAM:
S
slguan 已提交
1141
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
1142 1143 1144
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1145 1146
}

1147
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1148 1149
  SSqlCmd *pCmd = &(pSql->cmd);

guanshengliang's avatar
guanshengliang 已提交
1150
  int32_t size = minMsgSize() + sizeof(SCMCreateTableMsg);
H
hzcheng 已提交
1151

1152
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1153
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
1154 1155
    size += sizeof(STagData);
  } else {
S
slguan 已提交
1156
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1157
  }
1158

1159 1160 1161
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1162 1163 1164 1165

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1166
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1167
  int              msgLen = 0;
S
slguan 已提交
1168
  SSchema *        pSchema;
H
hzcheng 已提交
1169
  int              size = 0;
1170 1171 1172
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1173
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1174 1175

  // Reallocate the payload size
1176
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1177 1178
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1179
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
S
slguan 已提交
1180
  }
H
hzcheng 已提交
1181 1182


1183
  SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
H
hjxilinx 已提交
1184
  strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
1185 1186

  // use dbinfo from table id without modifying current db info
H
hjxilinx 已提交
1187
  tscGetDBInfoFromMeterId(pTableMetaInfo->name, pCreateTableMsg->db);
H
hzcheng 已提交
1188

1189 1190 1191
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;
H
hzcheng 已提交
1192 1193 1194 1195
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
S
slguan 已提交
1196
  char *pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
1197

1198 1199
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
1200 1201 1202 1203 1204 1205 1206
    STagData* pTag = &pInfo->pCreateTableInfo->usingInfo.tagdata;
    *(int32_t*)pMsg = htonl(pTag->dataLen);
    pMsg += sizeof(int32_t);
    memcpy(pMsg, pTag->name, sizeof(pTag->name));
    pMsg += sizeof(pTag->name);
    memcpy(pMsg, pTag->data, pTag->dataLen);
    pMsg += pTag->dataLen;
1207
  } else {  // create (super) table
1208
    pSchema = (SSchema *)pCreateTableMsg->schema;
1209

H
hzcheng 已提交
1210
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
H
hjxilinx 已提交
1211
      TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hzcheng 已提交
1212 1213 1214 1215

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
1216

H
hzcheng 已提交
1217 1218 1219 1220
      pSchema++;
    }

    pMsg = (char *)pSchema;
1221 1222
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
1223

1224 1225 1226
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
1227 1228 1229
    }
  }

H
hjxilinx 已提交
1230
  tscFieldInfoClear(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
1231

S
slguan 已提交
1232
  msgLen = pMsg - (char*)pCreateTableMsg;
S
slguan 已提交
1233
  pCreateTableMsg->contLen = htonl(msgLen);
H
hzcheng 已提交
1234
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1235
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
H
hzcheng 已提交
1236 1237

  assert(msgLen + minMsgSize() <= size);
1238
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1239 1240 1241
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
1242
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
guanshengliang's avatar
guanshengliang 已提交
1243
  return minMsgSize() + sizeof(SCMAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
1244 1245 1246
         TSDB_EXTRA_PAYLOAD_SIZE;
}

1247
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1248 1249
  char *pMsg;
  int   msgLen = 0;
H
hzcheng 已提交
1250

1251
  SSqlCmd    *pCmd = &pSql->cmd;
1252 1253
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
1254
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1255 1256 1257
  
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;
  int size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
1258 1259 1260 1261
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
    return -1;
  }
1262 1263
  
  SCMAlterTableMsg *pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
H
hjxilinx 已提交
1264
  tscGetDBInfoFromMeterId(pTableMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
1265

H
hjxilinx 已提交
1266
  strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
1267
  pAlterTableMsg->type = htons(pAlterInfo->type);
1268

1269
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
S
slguan 已提交
1270
  SSchema *pSchema = pAlterTableMsg->schema;
1271
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
H
hjxilinx 已提交
1272
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
1273
  
H
hzcheng 已提交
1274 1275 1276 1277 1278 1279 1280
    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;
1281 1282 1283
  pAlterTableMsg->tagValLen = htonl(pAlterInfo->tagData.dataLen);
  memcpy(pMsg, pAlterInfo->tagData.data, pAlterInfo->tagData.dataLen);
  pMsg += pAlterInfo->tagData.dataLen;
H
hzcheng 已提交
1284

S
slguan 已提交
1285
  msgLen = pMsg - (char*)pAlterTableMsg;
1286

H
hzcheng 已提交
1287
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1288
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
H
hzcheng 已提交
1289 1290

  assert(msgLen + minMsgSize() <= size);
1291

1292
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1293 1294
}

1295 1296 1297 1298 1299 1300 1301 1302 1303 1304
int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
  SSqlCmd* pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL;
  
  SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) (pCmd->payload + tsRpcHeadSize);
  pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
  
  return TSDB_CODE_SUCCESS;
}

1305
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1306
  SSqlCmd *pCmd = &pSql->cmd;
1307
  pCmd->payloadLen = sizeof(SCMAlterDbMsg);
S
slguan 已提交
1308
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
H
hzcheng 已提交
1309

1310
  SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
1311
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1312
  strcpy(pAlterDbMsg->db, pTableMetaInfo->name);
H
hzcheng 已提交
1313

1314
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1315 1316
}

1317
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1318
  SSqlCmd *pCmd = &pSql->cmd;
J
jtao1735 已提交
1319
  pCmd->msgType = TSDB_MSG_TYPE_CM_RETRIEVE;
S
slguan 已提交
1320
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
1321

S
slguan 已提交
1322 1323 1324
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
H
hzcheng 已提交
1325
  }
S
slguan 已提交
1326

S
slguan 已提交
1327 1328 1329 1330
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
1331

1332
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1333 1334
}

1335
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
1336
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1337 1338 1339
    return pRes->code;
  }

H
hjxilinx 已提交
1340
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
H
hjxilinx 已提交
1341
    int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
H
hjxilinx 已提交
1342
    pRes->tsrow[i] = ((char*) pRes->data + offset * pRes->numOfRows);
H
hzcheng 已提交
1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
1357

1358
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1359

H
hzcheng 已提交
1360 1361 1362 1363 1364 1365 1366
  pRes->code = TSDB_CODE_SUCCESS;

  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

1367
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1368
  } else {
S
slguan 已提交
1369
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
1385
  SSqlCmd *       pCmd = &pSql->cmd;
1386
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
1387

H
hjxilinx 已提交
1388
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1389 1390
  
  int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
H
hzcheng 已提交
1391 1392 1393
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
Haojun Liao 已提交
1394 1395 1396
int tscProcessLocalRetrieveRsp(SSqlObj *pSql) {
  int32_t numOfRes = 1;
  pSql->res.completed = true;
H
hzcheng 已提交
1397 1398 1399
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
hjxilinx 已提交
1400
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
H
hzcheng 已提交
1401 1402 1403
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

1404
  pRes->code = tscDoLocalMerge(pSql);
H
hjxilinx 已提交
1405
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1406 1407

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
1408
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1409 1410 1411
  }

  pRes->row = 0;
1412
  pRes->completed = (pRes->numOfRows == 0);
H
hzcheng 已提交
1413

1414
  int32_t code = pRes->code;
H
hjxilinx 已提交
1415 1416 1417 1418
  if (pRes->code == TSDB_CODE_SUCCESS) {
    (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
  } else {
    tscQueueAsyncRes(pSql);
H
hzcheng 已提交
1419 1420 1421 1422 1423
  }

  return code;
}

S
slguan 已提交
1424
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
1425

1426
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1427
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1428
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1429
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
1430
  pCmd->payloadLen = sizeof(SCMConnectMsg);
H
hzcheng 已提交
1431

S
slguan 已提交
1432 1433 1434 1435 1436
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

1437
  SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
H
hzcheng 已提交
1438 1439 1440 1441 1442

  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
  strcpy(pConnect->db, db);
S
slguan 已提交
1443
  strcpy(pConnect->clientVersion, version);
S
slguan 已提交
1444
  strcpy(pConnect->msgVersion, "");
H
hzcheng 已提交
1445

1446
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1447 1448
}

H
hjxilinx 已提交
1449
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1450
  SCMTableInfoMsg *pInfoMsg;
S
slguan 已提交
1451
  char *         pMsg;
H
hzcheng 已提交
1452 1453
  int            msgLen = 0;

B
Bomin Zhang 已提交
1454 1455 1456 1457
  char *tmpData = NULL;
  uint32_t len = pSql->cmd.payloadLen;
  if (len > 0) {
    tmpData = calloc(1, len);
1458 1459 1460 1461
    if (NULL == tmpData) {
      return TSDB_CODE_CLI_OUT_OF_MEMORY;
    }

H
hzcheng 已提交
1462
    // STagData is in binary format, strncpy is not available
B
Bomin Zhang 已提交
1463
    memcpy(tmpData, pSql->cmd.payload, len);
H
hzcheng 已提交
1464 1465
  }

1466 1467 1468
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
1469
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1470

1471
  pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
H
hjxilinx 已提交
1472
  strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
H
hjxilinx 已提交
1473
  pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
S
slguan 已提交
1474

1475
  pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
H
hzcheng 已提交
1476

B
Bomin Zhang 已提交
1477 1478 1479
  if (pSql->cmd.autoCreated && len > 0) {
    memcpy(pInfoMsg->tags, tmpData, len);
    pMsg += len;
H
hzcheng 已提交
1480 1481
  }

H
hjxilinx 已提交
1482
  pCmd->payloadLen = pMsg - (char*)pInfoMsg;;
S
slguan 已提交
1483
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
H
hzcheng 已提交
1484 1485 1486 1487

  tfree(tmpData);

  assert(msgLen + minMsgSize() <= pCmd->allocSize);
1488
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1489 1490
}

S
slguan 已提交
1491
/**
1492
 *  multi table meta req pkg format:
1493
 *  | SMgmtHead | SCMMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
1494 1495
 *      no used         4B
 **/
1496
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
guanshengliang's avatar
guanshengliang 已提交
1497
#if 0
S
slguan 已提交
1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
    tmpData = calloc(1, pCmd->payloadLen + 1);
    if (NULL == tmpData) return -1;
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
S
slguan 已提交
1510
  memset(pMgmt->db, 0, TSDB_TABLE_ID_LEN);  // server don't need the db
S
slguan 已提交
1511

1512
  SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
S
slguan 已提交
1513
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
1514 1515

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
1516
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
1517 1518 1519 1520
  }

  tfree(tmpData);

1521
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1522
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
S
slguan 已提交
1523 1524 1525

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

S
slguan 已提交
1526
  tscTrace("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
1527 1528 1529
           pCmd->payloadLen);

  return pCmd->payloadLen;
guanshengliang's avatar
guanshengliang 已提交
1530 1531
#endif
  return 0;  
S
slguan 已提交
1532 1533
}

H
hjxilinx 已提交
1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559
//static UNUSED_FUNC int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
////  const int32_t defaultSize =
////      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
////  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
////
////  int32_t n = 0;
////  size_t size = taosArrayGetSize(pQueryInfo->tagCond.pCond);
////  for (int32_t i = 0; i < size; ++i) {
////    assert(0);
//////    n += strlen(pQueryInfo->tagCond.cond[i].cond);
////  }
////
////  int32_t tagLen = n * TSDB_NCHAR_SIZE;
////  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
////    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
////  }
////
////  int32_t joinCondLen = (TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2;
////  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
////
////  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndex);
////
////  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
////
////  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
//}
H
hzcheng 已提交
1560

H
hjxilinx 已提交
1561 1562 1563
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {

#if 0
S
slguan 已提交
1564
  SSuperTableMetaMsg *pMetaMsg;
H
hzcheng 已提交
1565 1566
  char *          pMsg, *pStart;
  int             msgLen = 0;
S
slguan 已提交
1567
  int             tableIndex = 0;
H
hzcheng 已提交
1568

1569 1570 1571
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

1572
  STagCond *pTagCond = &pQueryInfo->tagCond;
S
slguan 已提交
1573

H
hjxilinx 已提交
1574
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
H
hzcheng 已提交
1575 1576

  int32_t size = tscEstimateMetricMetaMsgSize(pCmd);
S
slguan 已提交
1577 1578 1579 1580
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for metric meter msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1581 1582 1583 1584 1585

  pStart = pCmd->payload + tsRpcHeadSize;
  pMsg = pStart;

  SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
H
hjxilinx 已提交
1586
  tscGetDBInfoFromMeterId(pTableMetaInfo->name, pMgmt->db);
H
hzcheng 已提交
1587 1588 1589

  pMsg += sizeof(SMgmtHead);

S
slguan 已提交
1590
  pMetaMsg = (SSuperTableMetaMsg *)pMsg;
S
slguan 已提交
1591
  pMetaMsg->numOfTables = htonl(pQueryInfo->numOfTables);
S
slguan 已提交
1592

S
slguan 已提交
1593
  pMsg += sizeof(SSuperTableMetaMsg);
S
slguan 已提交
1594 1595 1596 1597 1598

  int32_t offset = pMsg - (char *)pMetaMsg;
  pMetaMsg->join = htonl(offset);

  // todo refactor
S
slguan 已提交
1599
  pMetaMsg->joinCondLen = htonl((TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2);
H
hzcheng 已提交
1600

S
slguan 已提交
1601
  memcpy(pMsg, pTagCond->joinInfo.left.tableId, TSDB_TABLE_ID_LEN);
S
slguan 已提交
1602
  pMsg += TSDB_TABLE_ID_LEN;
H
hzcheng 已提交
1603

S
slguan 已提交
1604 1605 1606
  *(int16_t *)pMsg = pTagCond->joinInfo.left.tagCol;
  pMsg += sizeof(int16_t);

S
slguan 已提交
1607
  memcpy(pMsg, pTagCond->joinInfo.right.tableId, TSDB_TABLE_ID_LEN);
S
slguan 已提交
1608
  pMsg += TSDB_TABLE_ID_LEN;
S
slguan 已提交
1609 1610 1611 1612

  *(int16_t *)pMsg = pTagCond->joinInfo.right.tagCol;
  pMsg += sizeof(int16_t);

1613
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
1614
    pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
H
hjxilinx 已提交
1615
    uint64_t uid = pTableMetaInfo->pTableMeta->uid;
S
slguan 已提交
1616 1617 1618 1619

    offset = pMsg - (char *)pMetaMsg;
    pMetaMsg->metaElem[i] = htonl(offset);

S
slguan 已提交
1620 1621
    SSuperTableMetaElemMsg *pElem = (SSuperTableMetaElemMsg *)pMsg;
    pMsg += sizeof(SSuperTableMetaElemMsg);
S
slguan 已提交
1622 1623 1624 1625

    // convert to unicode before sending to mnode for metric query
    int32_t condLen = 0;
    if (pTagCond->numOfTagCond > 0) {
1626
      SCond *pCond = tsGetSTableQueryCond(pTagCond, uid);
H
hjxilinx 已提交
1627
      if (pCond != NULL && pCond->cond != NULL) {
H
hjxilinx 已提交
1628
        condLen = strlen(pCond->cond) + 1;
1629

H
hjxilinx 已提交
1630
        bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE);
S
slguan 已提交
1631
        if (!ret) {
1632
          tscError("%p mbs to ucs4 failed:%s", pSql, tsGetSTableQueryCond(pTagCond, uid));
S
slguan 已提交
1633 1634 1635
          return 0;
        }
      }
H
hzcheng 已提交
1636 1637
    }

S
slguan 已提交
1638
    pElem->condLen = htonl(condLen);
H
hzcheng 已提交
1639

S
slguan 已提交
1640 1641 1642
    offset = pMsg - (char *)pMetaMsg;
    pElem->cond = htonl(offset);
    pMsg += condLen * TSDB_NCHAR_SIZE;
H
hzcheng 已提交
1643

S
slguan 已提交
1644 1645 1646
    pElem->rel = htons(pTagCond->relType);
    if (pTagCond->tbnameCond.uid == uid) {
      offset = pMsg - (char *)pMetaMsg;
H
hzcheng 已提交
1647

S
slguan 已提交
1648
      pElem->tableCond = htonl(offset);
H
hjxilinx 已提交
1649 1650 1651 1652 1653 1654 1655
      
      uint32_t len = 0;
      if (pTagCond->tbnameCond.cond != NULL) {
        len = strlen(pTagCond->tbnameCond.cond);
        memcpy(pMsg, pTagCond->tbnameCond.cond, len);
      }
      
H
hjxilinx 已提交
1656 1657
      pElem->tableCondLen = htonl(len);
      pMsg += len;
S
slguan 已提交
1658 1659
    }

1660
    SSqlGroupbyExpr *pGroupby = &pQueryInfo->groupbyExpr;
S
slguan 已提交
1661

H
hjxilinx 已提交
1662
    if (pGroupby->tableIndex != i && pGroupby->numOfGroupCols > 0) {
S
slguan 已提交
1663 1664 1665 1666 1667
      pElem->orderType = 0;
      pElem->orderIndex = 0;
      pElem->numOfGroupCols = 0;
    } else {
      pElem->numOfGroupCols = htons(pGroupby->numOfGroupCols);
H
hjxilinx 已提交
1668 1669
      for (int32_t j = 0; j < pTableMetaInfo->numOfTags; ++j) {
        pElem->tagCols[j] = htons(pTableMetaInfo->tagColumnIndex[j]);
S
slguan 已提交
1670 1671 1672 1673 1674 1675 1676 1677
      }

      if (pGroupby->numOfGroupCols != 0) {
        pElem->orderIndex = htons(pGroupby->orderIndex);
        pElem->orderType = htons(pGroupby->orderType);
        offset = pMsg - (char *)pMetaMsg;

        pElem->groupbyTagColumnList = htonl(offset);
1678
        for (int32_t j = 0; j < pQueryInfo->groupbyExpr.numOfGroupCols; ++j) {
1679 1680
          SColIndex *pCol = &pQueryInfo->groupbyExpr.columnInfo[j];
          SColIndex *pDestCol = (SColIndex *)pMsg;
1681

H
hjxilinx 已提交
1682
          pDestCol->colIdxInBuf = 0;
1683
          pDestCol->colIndex = htons(pCol->colIndex);
H
hjxilinx 已提交
1684 1685
          pDestCol->colId = htons(pDestCol->colId);
          pDestCol->flag = htons(pDestCol->flag);
H
hjxilinx 已提交
1686
          strncpy(pDestCol->name, pCol->name, tListLen(pCol->name));
1687

1688
          pMsg += sizeof(SColIndex);
S
slguan 已提交
1689 1690
        }
      }
H
hzcheng 已提交
1691
    }
S
slguan 已提交
1692

H
hjxilinx 已提交
1693 1694
    strcpy(pElem->tableId, pTableMetaInfo->name);
    pElem->numOfTags = htons(pTableMetaInfo->numOfTags);
S
slguan 已提交
1695 1696 1697

    int16_t len = pMsg - (char *)pElem;
    pElem->elemLen = htons(len);  // redundant data for integrate check
H
hzcheng 已提交
1698 1699 1700 1701
  }

  msgLen = pMsg - pStart;
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1702
  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
H
hzcheng 已提交
1703
  assert(msgLen + minMsgSize() <= size);
H
hjxilinx 已提交
1704
#endif
1705
  
H
hjxilinx 已提交
1706
  SSqlCmd *pCmd = &pSql->cmd;
H
hjxilinx 已提交
1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719
  
  char* pMsg = pCmd->payload;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  
  SCMSTableVgroupMsg *pStableVgroupMsg = (SCMSTableVgroupMsg *) pMsg;
  pStableVgroupMsg->numOfTables = htonl(pQueryInfo->numOfTables);
  pMsg += sizeof(SCMSTableVgroupMsg);
  
  for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
    strncpy(pMsg, pTableMetaInfo->name, TSDB_TABLE_ID_LEN);
    pMsg += TSDB_TABLE_ID_LEN;
  }
H
hjxilinx 已提交
1720 1721

  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
H
hjxilinx 已提交
1722
  pCmd->payloadLen = (pMsg - pCmd->payload);
H
hjxilinx 已提交
1723

1724
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1725 1726
}

1727 1728
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1729 1730
  STscObj *pObj = pSql->pTscObj;

1731
  pthread_mutex_lock(&pObj->mutex);
H
hzcheng 已提交
1732

S
Shengliang Guan 已提交
1733
  int32_t numOfQueries = 2;
H
hzcheng 已提交
1734 1735 1736
  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
    tpSql = tpSql->next;
1737
    numOfQueries++;
H
hzcheng 已提交
1738 1739
  }

S
Shengliang Guan 已提交
1740
  int32_t numOfStreams = 2;
H
hzcheng 已提交
1741 1742 1743
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
    pStream = pStream->next;
1744
    numOfStreams++;
H
hzcheng 已提交
1745 1746
  }

1747
  int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100;
S
slguan 已提交
1748
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
1749
    pthread_mutex_unlock(&pObj->mutex);
S
slguan 已提交
1750 1751 1752
    tscError("%p failed to malloc for heartbeat msg", pSql);
    return -1;
  }
H
hzcheng 已提交
1753

1754
  SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
1755 1756
  pHeartbeat->numOfQueries = numOfQueries;
  pHeartbeat->numOfStreams = numOfStreams;
1757
  int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj);
H
hzcheng 已提交
1758 1759 1760 1761

  pthread_mutex_unlock(&pObj->mutex);

  pCmd->payloadLen = msgLen;
S
slguan 已提交
1762
  pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
H
hzcheng 已提交
1763

1764
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1765 1766
}

1767 1768
int tscProcessTableMetaRsp(SSqlObj *pSql) {
  STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
H
hzcheng 已提交
1769

1770 1771
  pMetaMsg->sid = htonl(pMetaMsg->sid);
  pMetaMsg->sversion = htons(pMetaMsg->sversion);
1772
  pMetaMsg->tversion = htons(pMetaMsg->tversion);
H
hjxilinx 已提交
1773 1774
  pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
  
1775 1776
  pMetaMsg->uid = htobe64(pMetaMsg->uid);
  pMetaMsg->contLen = htons(pMetaMsg->contLen);
1777
  pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1778

H
hjxilinx 已提交
1779 1780
  if (pMetaMsg->sid < 0 || pMetaMsg->vgroup.numOfIps < 0) {
    tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgroup.numOfIps, pMetaMsg->sid);
H
hzcheng 已提交
1781 1782 1783
    return TSDB_CODE_INVALID_VALUE;
  }

1784 1785
  if (pMetaMsg->numOfTags > TSDB_MAX_TAGS || pMetaMsg->numOfTags < 0) {
    tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
H
hzcheng 已提交
1786 1787 1788
    return TSDB_CODE_INVALID_VALUE;
  }

1789 1790
  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
H
hzcheng 已提交
1791 1792 1793
    return TSDB_CODE_INVALID_VALUE;
  }

H
hjxilinx 已提交
1794 1795
  for (int i = 0; i < pMetaMsg->vgroup.numOfIps; ++i) {
    pMetaMsg->vgroup.ipAddr[i].port = htons(pMetaMsg->vgroup.ipAddr[i].port);
H
hzcheng 已提交
1796 1797
  }

1798
  SSchema* pSchema = pMetaMsg->schema;
H
hzcheng 已提交
1799

1800
  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
H
hzcheng 已提交
1801 1802 1803
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);
H
hjxilinx 已提交
1804 1805
    
    assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
1806 1807 1808
    pSchema++;
  }

1809 1810
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
1811
  
H
hzcheng 已提交
1812
  // todo add one more function: taosAddDataIfNotExists();
1813
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1814
  assert(pTableMetaInfo->pTableMeta == NULL);
H
hzcheng 已提交
1815

H
hjxilinx 已提交
1816
  pTableMetaInfo->pTableMeta =
H
hjxilinx 已提交
1817
      (STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name, pTableMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1818
  
1819
  // todo handle out of memory case
1820
  if (pTableMetaInfo->pTableMeta == NULL) {
H
hjxilinx 已提交
1821
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
1822
  }
H
hzcheng 已提交
1823

H
Haojun Liao 已提交
1824
  tscTrace("%p recv table meta: %"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->uid, pTableMeta->sid, pTableMetaInfo->name);
1825
  free(pTableMeta);
1826
  
H
hjxilinx 已提交
1827
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1828 1829
}

S
slguan 已提交
1830
/**
1831
 *  multi table meta rsp pkg format:
1832
 *  | STaosRsp | ieType | SCMMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
1833 1834 1835
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
1836
#if 0
S
slguan 已提交
1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848
  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
    pSql->res.code = TSDB_CODE_INVALID_IE;
    pSql->res.numOfTotal = 0;
    return TSDB_CODE_OTHERS;
  }

  rsp++;

1849
  SCMMultiTableInfoMsg *pInfo = (SCMMultiTableInfoMsg *)rsp;
S
slguan 已提交
1850
  totalNum = htonl(pInfo->numOfTables);
1851
  rsp += sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1852 1853

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
1854
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
1855
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
1856 1857 1858

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
H
hjxilinx 已提交
1859
    pMeta->vgId = htonl(pMeta->vgId);
S
slguan 已提交
1860 1861
    pMeta->uid = htobe64(pMeta->uid);

H
hjxilinx 已提交
1862 1863
    if (pMeta->sid <= 0 || pMeta->vgId < 0) {
      tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
S
slguan 已提交
1864 1865 1866 1867 1868
      pSql->res.code = TSDB_CODE_INVALID_VALUE;
      pSql->res.numOfTotal = i;
      return TSDB_CODE_OTHERS;
    }

H
hjxilinx 已提交
1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891
    //    pMeta->numOfColumns = htons(pMeta->numOfColumns);
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid tag value count:%d", pMeta->numOfTags);
    //      pSql->res.code = TSDB_CODE_INVALID_VALUE;
    //      pSql->res.numOfTotal = i;
    //      return TSDB_CODE_OTHERS;
    //    }
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid numOfTags:%d", pMeta->numOfTags);
    //      pSql->res.code = TSDB_CODE_INVALID_VALUE;
    //      pSql->res.numOfTotal = i;
    //      return TSDB_CODE_OTHERS;
    //    }
    //
    //    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
    //      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
    //      pSql->res.code = TSDB_CODE_INVALID_VALUE;
    //      pSql->res.numOfTotal = i;
    //      return TSDB_CODE_OTHERS;
    //    }
    //
H
hjxilinx 已提交
1892
    //    for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) {
H
hjxilinx 已提交
1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926
    //      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    //    }
    //
    //    pMeta->rowSize = 0;
    //    rsp += sizeof(SMultiTableMeta);
    //    pSchema = (SSchema *)rsp;
    //
    //    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    //    for (int j = 0; j < numOfTotalCols; ++j) {
    //      pSchema->bytes = htons(pSchema->bytes);
    //      pSchema->colId = htons(pSchema->colId);
    //
    //      // ignore the tags length
    //      if (j < pMeta->numOfColumns) {
    //        pMeta->rowSize += pSchema->bytes;
    //      }
    //      pSchema++;
    //    }
    //
    //    rsp += numOfTotalCols * sizeof(SSchema);
    //
    //    int32_t  tagLen = 0;
    //    SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
    //
    //    if (pMeta->tableType == TSDB_CHILD_TABLE) {
    //      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
    //        tagLen += pTagsSchema[j].bytes;
    //      }
    //    }
    //
    //    rsp += tagLen;
    //    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
    //
    //    pMeta->index = 0;
H
hjxilinx 已提交
1927
    //    (void)taosCachePut(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1928
    //  }
S
slguan 已提交
1929
  }
H
hjxilinx 已提交
1930
  
S
slguan 已提交
1931 1932
  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
H
Haojun Liao 已提交
1933
  tscTrace("%p load multi-metermeta resp from complete num:%d", pSql, pSql->res.numOfTotal);
1934 1935
#endif
  
S
slguan 已提交
1936 1937 1938
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
1939 1940
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
#if 0
S
slguan 已提交
1941 1942
  void **      metricMetaList = NULL;
  int32_t *    sizes = NULL;
H
hjxilinx 已提交
1943
  
S
slguan 已提交
1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959
  int32_t num = htons(*(int16_t *)rsp);
  rsp += sizeof(int16_t);

  metricMetaList = calloc(1, POINTER_BYTES * num);
  sizes = calloc(1, sizeof(int32_t) * num);

  // return with error code
  if (metricMetaList == NULL || sizes == NULL) {
    tfree(metricMetaList);
    tfree(sizes);
    pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;

    return pSql->res.code;
  }

  for (int32_t k = 0; k < num; ++k) {
S
slguan 已提交
1960
    pMeta = (SSuperTableMeta *)rsp;
S
slguan 已提交
1961 1962

    size_t size = (size_t)pSql->res.rspLen - 1;
S
slguan 已提交
1963
    rsp = rsp + sizeof(SSuperTableMeta);
S
slguan 已提交
1964

S
slguan 已提交
1965
    pMeta->numOfTables = htonl(pMeta->numOfTables);
S
slguan 已提交
1966 1967 1968
    pMeta->numOfVnodes = htonl(pMeta->numOfVnodes);
    pMeta->tagLen = htons(pMeta->tagLen);

1969
    size += pMeta->numOfVnodes * sizeof(SVnodeSidList *) + pMeta->numOfTables * sizeof(STableIdInfo *);
H
hzcheng 已提交
1970

1971 1972
    char *pBuf = calloc(1, size);
    if (pBuf == NULL) {
S
slguan 已提交
1973 1974 1975
      pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      goto _error_clean;
    }
H
hzcheng 已提交
1976

S
slguan 已提交
1977
    SSuperTableMeta *pNewMetricMeta = (SSuperTableMeta *)pBuf;
S
slguan 已提交
1978
    metricMetaList[k] = pNewMetricMeta;
H
hzcheng 已提交
1979

S
slguan 已提交
1980
    pNewMetricMeta->numOfTables = pMeta->numOfTables;
S
slguan 已提交
1981 1982
    pNewMetricMeta->numOfVnodes = pMeta->numOfVnodes;
    pNewMetricMeta->tagLen = pMeta->tagLen;
H
hzcheng 已提交
1983

S
slguan 已提交
1984
    pBuf = pBuf + sizeof(SSuperTableMeta) + pNewMetricMeta->numOfVnodes * sizeof(SVnodeSidList *);
H
hzcheng 已提交
1985

S
slguan 已提交
1986 1987
    for (int32_t i = 0; i < pMeta->numOfVnodes; ++i) {
      SVnodeSidList *pSidLists = (SVnodeSidList *)rsp;
1988
      memcpy(pBuf, pSidLists, sizeof(SVnodeSidList));
H
hzcheng 已提交
1989

1990 1991
      pNewMetricMeta->list[i] = pBuf - (char *)pNewMetricMeta;  // offset value
      SVnodeSidList *pLists = (SVnodeSidList *)pBuf;
H
hzcheng 已提交
1992

S
slguan 已提交
1993
      tscTrace("%p metricmeta:vid:%d,numOfTables:%d", pSql, i, pLists->numOfSids);
H
hzcheng 已提交
1994

1995
      pBuf += sizeof(SVnodeSidList) + sizeof(STableIdInfo *) * pSidLists->numOfSids;
S
slguan 已提交
1996
      rsp += sizeof(SVnodeSidList);
H
hzcheng 已提交
1997

1998
      size_t elemSize = sizeof(STableIdInfo) + pNewMetricMeta->tagLen;
S
slguan 已提交
1999
      for (int32_t j = 0; j < pSidLists->numOfSids; ++j) {
2000 2001
        pLists->pSidExtInfoList[j] = pBuf - (char *)pLists;
        memcpy(pBuf, rsp, elemSize);
2002

2003 2004
        ((STableIdInfo *)pBuf)->uid = htobe64(((STableIdInfo *)pBuf)->uid);
        ((STableIdInfo *)pBuf)->sid = htonl(((STableIdInfo *)pBuf)->sid);
2005

2006 2007
        rsp += elemSize;
        pBuf += elemSize;
S
slguan 已提交
2008
      }
H
hzcheng 已提交
2009
    }
S
slguan 已提交
2010

2011
    sizes[k] = pBuf - (char *)pNewMetricMeta;
H
hzcheng 已提交
2012 2013
  }

2014
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
S
slguan 已提交
2015 2016 2017
  for (int32_t i = 0; i < num; ++i) {
    char name[TSDB_MAX_TAGS_LEN + 1] = {0};

H
hjxilinx 已提交
2018
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2019
    tscGetMetricMetaCacheKey(pQueryInfo, name, pTableMetaInfo->pTableMeta->uid);
H
hzcheng 已提交
2020

S
slguan 已提交
2021 2022 2023
#ifdef _DEBUG_VIEW
    printf("generate the metric key:%s, index:%d\n", name, i);
#endif
H
hzcheng 已提交
2024

S
slguan 已提交
2025
    // release the used metricmeta
H
hjxilinx 已提交
2026 2027
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), false);
    pTableMetaInfo->pMetricMeta = (SSuperTableMeta *)taosCachePut(tscCacheHandle, name, (char *)metricMetaList[i],
S
slguan 已提交
2028 2029 2030 2031
                                                                      sizes[i], tsMetricMetaKeepTimer);
    tfree(metricMetaList[i]);

    // failed to put into cache
H
hjxilinx 已提交
2032
    if (pTableMetaInfo->pMetricMeta == NULL) {
S
slguan 已提交
2033 2034 2035
      pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
      goto _error_clean;
    }
H
hzcheng 已提交
2036 2037
  }

S
slguan 已提交
2038 2039 2040 2041 2042 2043 2044 2045
_error_clean:
  // free allocated resource
  for (int32_t i = 0; i < num; ++i) {
    tfree(metricMetaList[i]);
  }

  free(sizes);
  free(metricMetaList);
H
hjxilinx 已提交
2046
#endif
2047
  SSqlRes* pRes = &pSql->res;
H
hjxilinx 已提交
2048
  
H
hjxilinx 已提交
2049
  // NOTE: the order of several table must be preserved.
2050
  SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pRes->pRsp;
H
hjxilinx 已提交
2051 2052
  pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables);
  char* pMsg = pRes->pRsp + sizeof(SCMSTableVgroupRspMsg);
H
hjxilinx 已提交
2053
  
2054 2055 2056
  // master sqlObj locates in param
  SSqlObj* parent = pSql->param;
  assert(parent != NULL);
H
hjxilinx 已提交
2057
  
2058
  SSqlCmd* pCmd = &parent->cmd;
H
hjxilinx 已提交
2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079
  for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) {
    STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
    SVgroupsInfo *  pVgroupInfo = (SVgroupsInfo *)pMsg;
    pVgroupInfo->numOfVgroups = htonl(pVgroupInfo->numOfVgroups);

    size_t size = sizeof(SCMVgroupInfo) * pVgroupInfo->numOfVgroups + sizeof(SVgroupsInfo);
    pInfo->vgroupList = calloc(1, size);
    assert(pInfo->vgroupList != NULL);

    memcpy(pInfo->vgroupList, pVgroupInfo, size);
    for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
      SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];

      pVgroups->vgId = htonl(pVgroups->vgId);
      assert(pVgroups->numOfIps >= 1);

      for (int32_t k = 0; k < pVgroups->numOfIps; ++k) {
        pVgroups->ipAddr[k].port = htons(pVgroups->ipAddr[k].port);
      }

      pMsg += size;
2080
    }
H
hjxilinx 已提交
2081 2082
  }
  
S
slguan 已提交
2083
  return pSql->res.code;
H
hzcheng 已提交
2084 2085 2086 2087 2088 2089
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
H
hjxilinx 已提交
2090
  STableMetaMsg * pMetaMsg;
2091
  SCMShowRsp *pShow;
S
slguan 已提交
2092
  SSchema *    pSchema;
H
hzcheng 已提交
2093 2094
  char         key[20];

2095 2096 2097
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
2098
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2099

H
hjxilinx 已提交
2100
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
2101

2102
  pShow = (SCMShowRsp *)pRes->pRsp;
S
slguan 已提交
2103
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
2104 2105
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
2106
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
2107
  pMetaMsg = &(pShow->tableMeta);
H
hzcheng 已提交
2108

H
hjxilinx 已提交
2109
  pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
H
hzcheng 已提交
2110

H
hjxilinx 已提交
2111
  pSchema = pMetaMsg->schema;
H
hjxilinx 已提交
2112 2113
  pMetaMsg->sid = ntohs(pMetaMsg->sid);
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
hzcheng 已提交
2114 2115 2116 2117
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

2118
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
2119 2120
  strcpy(key + 1, "showlist");

H
hjxilinx 已提交
2121
  taosCacheRelease(tscCacheHandle, (void *)&(pTableMetaInfo->pTableMeta), false);
H
hjxilinx 已提交
2122
  
H
hjxilinx 已提交
2123 2124 2125
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
  
H
hjxilinx 已提交
2126
  pTableMetaInfo->pTableMeta = taosCachePut(tscCacheHandle, key, (char *)pTableMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
2127
  SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2128

2129 2130 2131 2132
  if (pQueryInfo->colList == NULL) {
    pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
  }
  
H
hjxilinx 已提交
2133 2134
  SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
  
S
slguan 已提交
2135
  SColumnIndex index = {0};
H
hjxilinx 已提交
2136 2137 2138
  pSchema = pMetaMsg->schema;
  
  for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) {
S
slguan 已提交
2139
    index.columnIndex = i;
2140 2141
    tscColumnListInsert(pQueryInfo->colList, &index);
    
H
hjxilinx 已提交
2142 2143
    TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes);
    SFieldSupInfo* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
H
hjxilinx 已提交
2144
    
H
hjxilinx 已提交
2145
    pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
2146
                     pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false);
H
hzcheng 已提交
2147
  }
H
hjxilinx 已提交
2148 2149
  
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2150
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
2151 2152
  
  tfree(pTableMeta);
H
hzcheng 已提交
2153 2154 2155 2156
  return 0;
}

int tscProcessConnectRsp(SSqlObj *pSql) {
S
slguan 已提交
2157
  char temp[TSDB_TABLE_ID_LEN * 2];
H
hzcheng 已提交
2158 2159 2160
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

2161
  SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
H
hzcheng 已提交
2162
  strcpy(pObj->acctId, pConnect->acctId);  // copy acctId from response
2163 2164
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

H
hjxilinx 已提交
2165 2166 2167
  assert(len <= tListLen(pObj->db));
  strncpy(pObj->db, temp, tListLen(pObj->db));
  
2168 2169
  if (pConnect->ipList.numOfIps > 0) 
    tscSetMgmtIpList(&pConnect->ipList);
H
hzcheng 已提交
2170

S
slguan 已提交
2171
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
2172 2173
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
S
Shengliang Guan 已提交
2174
  pObj->connId = htonl(pConnect->connId);
S
scripts  
slguan 已提交
2175
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
H
hzcheng 已提交
2176 2177 2178 2179 2180

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
2181
  STscObj *       pObj = pSql->pTscObj;
2182
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2183

H
hjxilinx 已提交
2184
  strcpy(pObj->db, pTableMetaInfo->name);
H
hzcheng 已提交
2185 2186 2187 2188
  return 0;
}

int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
2189
  taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2190 2191 2192 2193
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
2194
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2195

H
hjxilinx 已提交
2196 2197
  STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMeta == NULL) {
H
hzcheng 已提交
2198 2199 2200 2201 2202 2203 2204 2205
    /* not in cache, abort */
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
2206 2207
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
2208
   */
H
hjxilinx 已提交
2209 2210
  tscTrace("%p force release metermeta after drop table:%s", pSql, pTableMetaInfo->name);
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2211

H
hjxilinx 已提交
2212 2213
  if (pTableMetaInfo->pTableMeta) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2214 2215 2216 2217 2218 2219
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
2220
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2221

H
hjxilinx 已提交
2222 2223
  STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2224 2225 2226
    return 0;
  }

H
hjxilinx 已提交
2227 2228
  tscTrace("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
  taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
H
hzcheng 已提交
2229

H
hjxilinx 已提交
2230
  if (pTableMetaInfo->pTableMeta) {
weixin_48148422's avatar
weixin_48148422 已提交
2231
    bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
H
hzcheng 已提交
2232

H
hjxilinx 已提交
2233
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hjxilinx 已提交
2234
//    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true);
H
hzcheng 已提交
2235

2236
    if (isSuperTable) {  // if it is a super table, reset whole query cache
H
hjxilinx 已提交
2237
      tscTrace("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
2238
      taosCacheEmpty(tscCacheHandle);
H
hzcheng 已提交
2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

2253
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
2254 2255 2256
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
2257
  pRes->data = NULL;
S
slguan 已提交
2258
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2259 2260 2261
  return 0;
}

H
hjxilinx 已提交
2262
int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
S
slguan 已提交
2263 2264 2265
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
2266
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2267 2268 2269

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
2270 2271
  pRes->offset    = htobe64(pRetrieve->offset);
  pRes->useconds  = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
2272
  pRes->completed = (pRetrieve->completed == 1);
2273
  pRes->data      = pRetrieve->data;
H
hjxilinx 已提交
2274
  
2275
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2276 2277 2278 2279
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
    return pRes->code;
  }
  
weixin_48148422's avatar
weixin_48148422 已提交
2280
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
2281
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2282
    
H
hjxilinx 已提交
2283
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
H
hjxilinx 已提交
2284 2285
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
2286 2287
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
2288
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
2289
    p += sizeof(int32_t);
S
slguan 已提交
2290
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
2291 2292
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
weixin_48148422's avatar
weixin_48148422 已提交
2293
      p += sizeof(int32_t); // skip tid
weixin_48148422's avatar
weixin_48148422 已提交
2294 2295
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
2296
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
2297
    }
2298 2299
  }

H
hzcheng 已提交
2300
  pRes->row = 0;
H
Haojun Liao 已提交
2301
  tscTrace("%p numOfRows:%d, offset:%d, complete:%d", pSql, pRes->numOfRows, pRes->offset, pRes->completed);
H
hzcheng 已提交
2302 2303 2304 2305 2306

  return 0;
}

int tscProcessRetrieveRspFromLocal(SSqlObj *pSql) {
2307 2308
  SSqlRes *   pRes = &pSql->res;
  SSqlCmd *   pCmd = &pSql->cmd;
2309
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
2310

S
slguan 已提交
2311
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
H
hzcheng 已提交
2312 2313 2314

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->data = pRetrieve->data;
2315

2316
  tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
2317 2318 2319 2320
  pRes->row = 0;
  return 0;
}

H
hjxilinx 已提交
2321
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
H
hzcheng 已提交
2322

2323
static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
S
slguan 已提交
2324 2325
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
2326
    tscError("%p malloc failed for new sqlobj to get table meta", pSql);
S
slguan 已提交
2327 2328
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }
2329

H
hzcheng 已提交
2330 2331 2332
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
2333

2334
  tscAddSubqueryInfo(&pNew->cmd);
2335 2336 2337 2338

  SQueryInfo *pNewQueryInfo = NULL;
  tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo);

H
hjxilinx 已提交
2339
  pNew->cmd.autoCreated = pSql->cmd.autoCreated;  // create table if not exists
B
Bomin Zhang 已提交
2340
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) {
2341
    tscError("%p malloc failed for payload to get table meta", pSql);
S
slguan 已提交
2342
    free(pNew);
2343

S
slguan 已提交
2344 2345 2346
    return TSDB_CODE_CLI_OUT_OF_MEMORY;
  }

H
hjxilinx 已提交
2347
  STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
2348
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
2349

2350
  strncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, tListLen(pNewMeterMetaInfo->name));
B
Bomin Zhang 已提交
2351 2352
  memcpy(pNew->cmd.payload, pSql->cmd.payload, pSql->cmd.payloadLen);  // tag information if table does not exists.
  pNew->cmd.payloadLen = pSql->cmd.payloadLen;
2353
  tscTrace("%p new pSqlObj:%p to get tableMeta, auto create:%d", pSql, pNew, pNew->cmd.autoCreated);
H
hzcheng 已提交
2354

H
hjxilinx 已提交
2355 2356
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
H
hzcheng 已提交
2357

H
hjxilinx 已提交
2358 2359 2360
  int32_t code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
    code = TSDB_CODE_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2361 2362 2363 2364 2365
  }

  return code;
}

H
hjxilinx 已提交
2366
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
H
hjxilinx 已提交
2367
  assert(strlen(pTableMetaInfo->name) != 0);
S
slguan 已提交
2368

2369
  // If this STableMetaInfo owns a table meta, release it first
H
hjxilinx 已提交
2370 2371
  if (pTableMetaInfo->pTableMeta != NULL) {
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), false);
2372 2373
  }
  
H
hjxilinx 已提交
2374 2375
  pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
  if (pTableMetaInfo->pTableMeta != NULL) {
H
hjxilinx 已提交
2376
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
2377 2378
    tscTrace("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
             tinfo.numOfTags, pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2379 2380 2381

    return TSDB_CODE_SUCCESS;
  }
2382 2383
  
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2384 2385
}

H
hjxilinx 已提交
2386
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
H
hjxilinx 已提交
2387
  pSql->cmd.autoCreated = createIfNotExists;
H
hjxilinx 已提交
2388
  return tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
2389 2390 2391 2392
}

/*
 * in handling the renew metermeta problem during insertion,
S
slguan 已提交
2393
 *
H
hzcheng 已提交
2394 2395 2396 2397 2398
 * If the meter is created on demand during insertion, the routine usually waits for a short
 * period to re-issue the getMeterMeta msg, in which makes a greater change that vnode has
 * successfully created the corresponding table.
 */
static void tscWaitingForCreateTable(SSqlCmd *pCmd) {
S
slguan 已提交
2399
  if (pCmd->command == TSDB_SQL_INSERT) {
H
hzcheng 已提交
2400 2401 2402 2403 2404 2405 2406
    taosMsleep(50);  // todo: global config
  }
}

/**
 * in renew metermeta, do not retrieve metadata in cache.
 * @param pSql          sql object
S
slguan 已提交
2407
 * @param tableId       meter id
H
hzcheng 已提交
2408 2409
 * @return              status code
 */
S
slguan 已提交
2410
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) {
2411 2412
  int code = 0;

H
hjxilinx 已提交
2413
  // handle table meta renew process
H
hzcheng 已提交
2414
  SSqlCmd *pCmd = &pSql->cmd;
2415 2416

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
2417
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
2418 2419

  /*
S
slguan 已提交
2420
   * 1. only update the metermeta in force model metricmeta is not updated
H
hzcheng 已提交
2421 2422
   * 2. if get metermeta failed, still get the metermeta
   */
2423
  if (pTableMetaInfo->pTableMeta == NULL || !tscQueryOnSTable(pCmd)) {
2424
    STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
2425
    if (pTableMetaInfo->pTableMeta) {
2426 2427
      tscTrace("%p update table meta, old: numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
               tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->uid, pTableMeta);
H
hzcheng 已提交
2428
    }
2429

2430
    tscWaitingForCreateTable(pCmd);
H
hjxilinx 已提交
2431
    taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2432

2433
    code = getTableMetaFromMgmt(pSql, pTableMetaInfo);  // todo ??
H
hzcheng 已提交
2434
  } else {
H
hjxilinx 已提交
2435
    tscTrace("%p metric query not update metric meta, numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
H
hjxilinx 已提交
2436 2437
             tscGetNumOfTags(pTableMetaInfo->pTableMeta), pCmd->numOfCols, pTableMetaInfo->pTableMeta->uid,
             pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2438 2439 2440 2441 2442
  }

  return code;
}

H
hjxilinx 已提交
2443
static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
2444
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2445
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2446
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2447 2448
    if (pTableMetaInfo->vgroupList == NULL) {
      return false;
S
slguan 已提交
2449 2450
    }
  }
H
hjxilinx 已提交
2451 2452 2453 2454
  
  // all super tables vgroupinfo are retrieved, no need to retrieve vgroup info anymore
  return true;
}
H
hzcheng 已提交
2455

H
hjxilinx 已提交
2456 2457 2458 2459 2460
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
  int      code = TSDB_CODE_NETWORK_UNAVAIL;
  SSqlCmd *pCmd = &pSql->cmd;
  
  if (allVgroupInfoRetrieved(pCmd, clauseIndex)) {
H
hzcheng 已提交
2461 2462
    return TSDB_CODE_SUCCESS;
  }
H
hjxilinx 已提交
2463

S
slguan 已提交
2464
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
2465 2466 2467
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

H
hjxilinx 已提交
2468
  pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
2469 2470
  
  SQueryInfo *pNewQueryInfo = NULL;
2471 2472 2473
  if ((code = tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo)) != TSDB_CODE_SUCCESS) {
    return code;
  }
2474
  
H
hjxilinx 已提交
2475
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2476
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2477 2478 2479
    STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
    STableMeta *pTableMeta = taosCacheAcquireByData(tscCacheHandle, pMInfo->pTableMeta);
    tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList);
S
slguan 已提交
2480 2481 2482 2483 2484 2485
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
2486

2487
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
H
hjxilinx 已提交
2488
  tscTrace("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
H
hzcheng 已提交
2489

2490 2491 2492 2493 2494
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
  code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
    code = TSDB_CODE_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2495 2496 2497 2498 2499
  }

  return code;
}

2500
void tscInitMsgsFp() {
S
slguan 已提交
2501 2502
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
J
jtao1735 已提交
2503
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;
H
hzcheng 已提交
2504 2505

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
2506
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
2507

2508 2509
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
2510 2511

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
S
[TD-16]  
slguan 已提交
2512
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserMsg;
H
hzcheng 已提交
2513 2514 2515
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
2516
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
2517 2518 2519
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
2520
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
2521
  tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
H
hzcheng 已提交
2522 2523 2524 2525
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
H
hjxilinx 已提交
2526
  tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
H
hjxilinx 已提交
2527
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
S
slguan 已提交
2528
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
2529 2530 2531 2532

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
2533 2534 2535
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
2536 2537

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
H
hjxilinx 已提交
2538
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
H
hzcheng 已提交
2539 2540 2541 2542 2543

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
2544
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
H
hjxilinx 已提交
2545
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
S
slguan 已提交
2546
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
2547 2548

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
H
hjxilinx 已提交
2549
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode;  // rsp handled by same function.
H
hzcheng 已提交
2550
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
2551

H
Haojun Liao 已提交
2552 2553 2554 2555 2556
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB]   = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION]  = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS]  = tscProcessLocalRetrieveRsp;
2557

H
hzcheng 已提交
2558 2559
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

H
hjxilinx 已提交
2560
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp;
H
hzcheng 已提交
2561 2562 2563 2564 2565 2566 2567 2568 2569 2570

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;
}