tscServer.c 81.0 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17
#include "tcache.h"
H
Haojun Liao 已提交
18
#include "tcmdtype.h"
H
hzcheng 已提交
19
#include "trpc.h"
20 21
#include "tscLocalMerge.h"
#include "tscLog.h"
H
hzcheng 已提交
22 23 24 25 26 27
#include "tscProfile.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "ttimer.h"
#include "tutil.h"
dengyihao's avatar
dengyihao 已提交
28
#include "tlockfree.h"
H
hzcheng 已提交
29

30
SRpcCorEpSet  tscMgmtEpSet;
S
slguan 已提交
31

32 33
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
34 35 36
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
37

B
Bomin Zhang 已提交
38
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt);
39 40
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
41

S
slguan 已提交
42
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
Haojun Liao 已提交
43 44 45 46 47 48 49 50
static int32_t getWaitingTimeInterval(int32_t count) {
  int32_t initial = 100; // 100 ms by default
  if (count <= 1) {
    return 0;
  }

  return initial * (2<<(count - 2));
}
H
hzcheng 已提交
51

52
static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
53 54
  assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);

55
  SRpcEpSet* pEpSet = &pSql->epSet;
56 57 58 59
  pEpSet->inUse = 0;

  // apply the FQDN string length check here
  bool hasFqdn = false;
dengyihao's avatar
bugfix  
dengyihao 已提交
60

61 62 63 64
  pEpSet->numOfEps = pVgroupInfo->numOfEps;
  for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
    strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn);
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
65 66 67 68

    if (!hasFqdn) {
      hasFqdn = (strlen(pEpSet->fqdn[i]) > 0);
    }
69
  }
70 71

  assert(hasFqdn);
72
}
73

dengyihao's avatar
dengyihao 已提交
74 75 76 77
static void tscDumpMgmtEpSet(SRpcEpSet *epSet) {
  taosCorBeginRead(&tscMgmtEpSet.version);
  *epSet = tscMgmtEpSet.epSet;
  taosCorEndRead(&tscMgmtEpSet.version);
78
}  
dengyihao's avatar
dengyihao 已提交
79 80
static void tscEpSetHtons(SRpcEpSet *s) {
   for (int32_t i = 0; i < s->numOfEps; i++) {
dengyihao's avatar
fixbug  
dengyihao 已提交
81 82 83
      s->port[i] = htons(s->port[i]);    
   }
} 
dengyihao's avatar
dengyihao 已提交
84 85
bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
   if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
86 87
     return false;
   } 
dengyihao's avatar
dengyihao 已提交
88
   for (int32_t i = 0; i < s1->numOfEps; i++) {
89 90 91 92 93
     if (s1->port[i] != s2->port[i] 
        || strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0)
        return false;
   }
   return true;
S
slguan 已提交
94
}
dengyihao's avatar
dengyihao 已提交
95
void tscUpdateMgmtEpSet(SRpcEpSet *pEpSet) {
96
  // no need to update if equal
dengyihao's avatar
dengyihao 已提交
97 98 99
  taosCorBeginWrite(&tscMgmtEpSet.version);
  tscMgmtEpSet.epSet = *pEpSet;
  taosCorEndWrite(&tscMgmtEpSet.version);
100
}
dengyihao's avatar
dengyihao 已提交
101
static void tscDumpEpSetFromVgroupInfo(SCMCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
102
  if (pVgroupInfo == NULL) { return;}
dengyihao's avatar
bugfix  
dengyihao 已提交
103
  taosCorBeginRead(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
104
  int8_t inUse = pVgroupInfo->inUse;
dengyihao's avatar
dengyihao 已提交
105 106 107
  pEpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0; 
  pEpSet->numOfEps = pVgroupInfo->numOfEps;  
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
Y
yihaoDeng 已提交
108
    tstrncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, sizeof(pEpSet->fqdn[i]));
dengyihao's avatar
dengyihao 已提交
109
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
S
slguan 已提交
110
  }
dengyihao's avatar
bugfix  
dengyihao 已提交
111
  taosCorEndRead(&pVgroupInfo->version);
S
slguan 已提交
112 113
}

dengyihao's avatar
dengyihao 已提交
114
static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
115 116
  SSqlCmd *pCmd = &pObj->cmd;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
dengyihao's avatar
dengyihao 已提交
117
  if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;}
dengyihao's avatar
bugfix  
dengyihao 已提交
118
  SCMCorVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->corVgroupInfo;
dengyihao's avatar
dengyihao 已提交
119

dengyihao's avatar
bugfix  
dengyihao 已提交
120
  taosCorBeginWrite(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
121
  tscDebug("before: Endpoint in use: %d", pVgroupInfo->inUse);
122 123
  pVgroupInfo->inUse = pEpSet->inUse;
  pVgroupInfo->numOfEps = pEpSet->numOfEps;
dengyihao's avatar
dengyihao 已提交
124
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; i++) {
125
    tstrncpy(pVgroupInfo->epAddr[i].fqdn, pEpSet->fqdn[i], TSDB_FQDN_LEN);
126
    pVgroupInfo->epAddr[i].port = pEpSet->port[i];
dengyihao's avatar
dengyihao 已提交
127
  }
dengyihao's avatar
dengyihao 已提交
128
  tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse);
dengyihao's avatar
bugfix  
dengyihao 已提交
129
  taosCorEndWrite(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
130
}
131 132 133 134
void tscPrintMgmtEp() {
  SRpcEpSet dump;
  tscDumpMgmtEpSet(&dump);
  if (dump.numOfEps <= 0) {
dengyihao's avatar
dengyihao 已提交
135
    tscError("invalid mnode EP list:%d", dump.numOfEps);
S
slguan 已提交
136
  } else {
137
    for (int i = 0; i < dump.numOfEps; ++i) {
138
      tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]);
S
slguan 已提交
139
    }
S
slguan 已提交
140
  }
S
slguan 已提交
141 142
}

H
hzcheng 已提交
143 144 145 146 147 148 149 150 151 152 153 154
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
  if (pObj != pObj->signature) {
    tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
    return;
  }

  SSqlObj *pSql = pObj->pHb;
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
155
    SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
dengyihao's avatar
dengyihao 已提交
156 157 158 159
    SRpcEpSet *      epSet = &pRsp->epSet;
    if (epSet->numOfEps > 0) {
      tscEpSetHtons(epSet);
      tscUpdateMgmtEpSet(epSet);
dengyihao's avatar
dengyihao 已提交
160
    } 
S
slguan 已提交
161

S
Shengliang Guan 已提交
162 163
    pSql->pTscObj->connId = htonl(pRsp->connId);

H
hzcheng 已提交
164 165
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
166
      return;
H
hzcheng 已提交
167
    } else {
S
slguan 已提交
168 169
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
170 171
    }
  } else {
172
    tscDebug("heart beat failed, code:%s", tstrerror(code));
H
hzcheng 已提交
173 174 175 176 177 178 179
  }

  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;
180 181 182
  if (pObj == NULL || pObj->signature != pObj) {
    return;
  }
H
hzcheng 已提交
183

184 185 186
  SSqlObj* pHB = pObj->pHb;
  if (pObj->pTimer != tmrId || pHB == NULL) {
    return;
H
hzcheng 已提交
187 188
  }

189 190
  if (tscShouldFreeHeartBeat(pHB)) {
    tscDebug("%p free HB object and release connection", pHB);
191 192
    pObj->pHb = 0;
    taos_free_result(pHB);
H
Haojun Liao 已提交
193
  } else {
194
    int32_t code = tscProcessSql(pHB);
H
Haojun Liao 已提交
195
    if (code != TSDB_CODE_SUCCESS) {
196
      tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
H
Haojun Liao 已提交
197
    }
H
hzcheng 已提交
198 199 200 201
  }
}

int tscSendMsgToServer(SSqlObj *pSql) {
202
  STscObj* pObj = pSql->pTscObj;
H
hjxilinx 已提交
203 204 205
  SSqlCmd* pCmd = &pSql->cmd;
  
  char *pMsg = rpcMallocCont(pCmd->payloadLen);
S
slguan 已提交
206
  if (NULL == pMsg) {
207
    tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]);
208
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
209 210
  }

211 212
  // set the mgmt ip list
  if (pSql->cmd.command >= TSDB_SQL_MGMT) {
dengyihao's avatar
dengyihao 已提交
213
    tscDumpMgmtEpSet(&pSql->epSet);
J
jtao1735 已提交
214 215
  }

216
  memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
217

J
jtao1735 已提交
218
  SRpcMsg rpcMsg = {
219 220 221
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
      .contLen = pSql->cmd.payloadLen,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
222 223
      .ahandle = pSql,
      .handle  = &pSql->pRpcCtx,
H
hjxilinx 已提交
224
      .code    = 0
J
jtao1735 已提交
225
  };
H
Haojun Liao 已提交
226

H
Haojun Liao 已提交
227 228 229 230
  // NOTE: the rpc context should be acquired before sending data to server.
  // Otherwise, the pSql object may have been released already during the response function, which is
  // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
  // cause crash.
H
Haojun Liao 已提交
231 232
  rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg);
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
233 234
}

235
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
H
Haojun Liao 已提交
236 237 238 239 240
  uint64_t handle = (uint64_t) rpcMsg->ahandle;

  void** p = taosCacheAcquireByKey(tscObjCache, &handle, sizeof(uint64_t));
  if (p == NULL) {
    rpcFreeCont(rpcMsg->pCont);
241 242
    return;
  }
243

H
Haojun Liao 已提交
244 245 246
  SSqlObj* pSql = *p;
  assert(pSql != NULL);

H
Haojun Liao 已提交
247
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
248 249
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
250

H
Haojun Liao 已提交
251 252
  assert(*pSql->self == pSql);

H
Haojun Liao 已提交
253
  if (pObj->signature != pObj) {
254
    tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
H
Haojun Liao 已提交
255

H
Haojun Liao 已提交
256
    taosCacheRelease(tscObjCache, (void**) &p, true);
H
Haojun Liao 已提交
257 258 259 260
    rpcFreeCont(rpcMsg->pCont);
    return;
  }

261 262
  pSql->pRpcCtx = NULL;    // clear the rpcCtx

H
Haojun Liao 已提交
263 264
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
265
    tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
266
        pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
H
Haojun Liao 已提交
267

H
Haojun Liao 已提交
268 269 270
    void** p1 = p;
    taosCacheRelease(tscObjCache, (void**) &p1, false);

H
Haojun Liao 已提交
271
    taosCacheRelease(tscObjCache, (void**) &p, true);
272
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
273
    return;
H
hzcheng 已提交
274 275
  }

H
Haojun Liao 已提交
276
  if (pEpSet) {
dengyihao's avatar
dengyihao 已提交
277
    if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
H
Haojun Liao 已提交
278 279
      if (pCmd->command < TSDB_SQL_MGMT) {
        tscUpdateVgroupInfo(pSql, pEpSet);
dengyihao's avatar
dengyihao 已提交
280
      } else {
dengyihao's avatar
dengyihao 已提交
281
        tscUpdateMgmtEpSet(pEpSet);
H
Haojun Liao 已提交
282
      }
dengyihao's avatar
dengyihao 已提交
283
    }
J
jtao1735 已提交
284 285
  }

286 287 288 289 290
  int32_t cmd = pCmd->command;
  if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
      (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
       rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
       rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL ||
S
Shengliang Guan 已提交
291
       rpcMsg->code == TSDB_CODE_APP_NOT_READY ||
292 293 294 295 296 297 298
       rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) {
    tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);

    // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
    if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
      pSql->cmd.submitSchema = 1;
    }
299

300 301 302 303
    pSql->res.code = rpcMsg->code;  // keep the previous error code
    if (pSql->retry > pSql->maxRetry) {
      tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
    } else {
H
Haojun Liao 已提交
304
      // wait for a little bit moment and then retry, todo do not sleep in rpc callback thread
H
Haojun Liao 已提交
305 306 307 308 309
      if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
        int32_t duration = getWaitingTimeInterval(pSql->retry);
        taosMsleep(duration);
      }

B
Bomin Zhang 已提交
310
      rpcMsg->code = tscRenewTableMeta(pSql, 0);
311 312 313

      // if there is an error occurring, proceed to the following error handling procedure.
      if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
H
Haojun Liao 已提交
314
        taosCacheRelease(tscObjCache, (void**) &p, false);
315 316
        rpcFreeCont(rpcMsg->pCont);
        return;
H
hzcheng 已提交
317 318
      }
    }
S
slguan 已提交
319
  }
320

H
hzcheng 已提交
321
  pRes->rspLen = 0;
322
  
H
Haojun Liao 已提交
323
  if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
324
    tscDebug("%p query is cancelled, code:%s", pSql, tstrerror(pRes->code));
H
Haojun Liao 已提交
325 326
  } else {
    pRes->code = rpcMsg->code;
H
hzcheng 已提交
327 328
  }

S
slguan 已提交
329
  if (pRes->code == TSDB_CODE_SUCCESS) {
330
    tscDebug("%p reset retry counter to be 0 due to success rsp, old:%d", pSql, pSql->retry);
S
slguan 已提交
331 332 333
    pSql->retry = 0;
  }

334
  if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
335
    assert(rpcMsg->msgType == pCmd->msgType + 1);
336
    pRes->code    = rpcMsg->code;
337
    pRes->rspType = rpcMsg->msgType;
338
    pRes->rspLen  = rpcMsg->contLen;
H
hzcheng 已提交
339

340
    if (pRes->rspLen > 0 && rpcMsg->pCont) {
341 342
      char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
      if (tmp == NULL) {
343
        pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
344 345
      } else {
        pRes->pRsp = tmp;
346
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
347
      }
348 349
    } else {
      pRes->pRsp = NULL;
S
slguan 已提交
350 351
    }

H
hzcheng 已提交
352 353 354 355
    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
356
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
357
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
358 359 360 361 362 363 364
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
365
      tscDebug("%p SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql, sqlCmd[pCmd->command], 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
366
          tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
S
slguan 已提交
367
    } else {
368
      tscDebug("%p SQL cmd:%s, code:%s rspLen:%d", pSql, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
H
hzcheng 已提交
369 370
    }
  }
371
  
H
Haojun Liao 已提交
372
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
373
    rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
Haojun Liao 已提交
374
  }
S
Shengliang Guan 已提交
375

H
Haojun Liao 已提交
376
  bool shouldFree = tscShouldBeFreed(pSql);
377
  if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
S
TD-1057  
Shengliang Guan 已提交
378
    rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
H
Haojun Liao 已提交
379
    (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
H
hzcheng 已提交
380 381
  }

H
Haojun Liao 已提交
382
  void** p1 = p;
H
Haojun Liao 已提交
383
  taosCacheRelease(tscObjCache, (void**) &p1, false);
H
Haojun Liao 已提交
384

H
Haojun Liao 已提交
385 386
  if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it
    taosCacheRelease(tscObjCache, (void **)&p, true);
H
Haojun Liao 已提交
387 388 389
    tscDebug("%p sqlObj is automatically freed", pSql);
  }

390
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
391 392
}

S
slguan 已提交
393 394 395
int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
396

H
hjxilinx 已提交
397 398 399 400 401 402 403
  if (pCmd->command == TSDB_SQL_SELECT ||
      pCmd->command == TSDB_SQL_FETCH ||
      pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT ||
      pCmd->command == TSDB_SQL_CONNECT ||
      pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META ||
H
hjxilinx 已提交
404
      pCmd->command == TSDB_SQL_STABLEVGROUP) {
405 406 407 408 409 410
    pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
  }
  
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return pRes->code;
S
slguan 已提交
411
  }
412

413 414 415
  int32_t code = tscSendMsgToServer(pSql);

  // NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads.
416
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
417
    pRes->code = code;
H
hjxilinx 已提交
418
    tscQueueAsyncRes(pSql);
H
Haojun Liao 已提交
419
    return code;
S
slguan 已提交
420
  }
H
hjxilinx 已提交
421 422
  
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
423 424 425
}

int tscProcessSql(SSqlObj *pSql) {
426
  char    *name = NULL;
427
  SSqlCmd *pCmd = &pSql->cmd;
428
  
429
  SQueryInfo     *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
430
  STableMetaInfo *pTableMetaInfo = NULL;
431
  uint32_t        type = 0;
432

433
  if (pQueryInfo != NULL) {
H
hjxilinx 已提交
434
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
435
    name = (pTableMetaInfo != NULL)? pTableMetaInfo->name:NULL;
436
    type = pQueryInfo->type;
437

H
hjxilinx 已提交
438
    // while numOfTables equals to 0, it must be Heartbeat
H
hjxilinx 已提交
439
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
440
  }
441

442
  tscDebug("%p SQL cmd:%s will be processed, name:%s, type:%d", pSql, sqlCmd[pCmd->command], name, type);
H
hjxilinx 已提交
443
  if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL
H
hjxilinx 已提交
444
    if (pTableMetaInfo == NULL) {
445
      pSql->res.code = TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
446 447
      return pSql->res.code;
    }
H
hjxilinx 已提交
448
  } else if (pCmd->command < TSDB_SQL_LOCAL) {
449
    //pSql->epSet = tscMgmtEpSet;
H
hzcheng 已提交
450 451 452
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }
453
  
S
slguan 已提交
454 455
  return doProcessSql(pSql);
}
H
hzcheng 已提交
456

H
hjxilinx 已提交
457
void tscKillSTableQuery(SSqlObj *pSql) {
458 459 460
  SSqlCmd* pCmd = &pSql->cmd;
  
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
461
  if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
H
hzcheng 已提交
462 463 464
    return;
  }

465 466
  pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;

H
hzcheng 已提交
467
  for (int i = 0; i < pSql->numOfSubs; ++i) {
468
    // NOTE: pSub may have been released already here
H
hzcheng 已提交
469
    SSqlObj *pSub = pSql->pSubs[i];
S
slguan 已提交
470
    if (pSub == NULL) {
H
hzcheng 已提交
471 472
      continue;
    }
S
slguan 已提交
473

dengyihao's avatar
dengyihao 已提交
474
    pSub->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
H
Haojun Liao 已提交
475 476
    if (pSub->pRpcCtx != NULL) {
      rpcCancelRequest(pSub->pRpcCtx);
H
hzcheng 已提交
477
    }
H
Haojun Liao 已提交
478

479
    tscQueueAsyncRes(pSub); // async res? not other functions?
H
hzcheng 已提交
480 481
  }

482
  tscDebug("%p super table query cancelled", pSql);
H
hzcheng 已提交
483 484
}

J
jtao1735 已提交
485
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
486
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
S
slguan 已提交
487
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
S
slguan 已提交
488

H
Haojun Liao 已提交
489
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
S
slguan 已提交
490
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
491

492
  // todo valid the vgroupId at the client side
H
hjxilinx 已提交
493 494
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  
weixin_48148422's avatar
weixin_48148422 已提交
495
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
496
    int32_t vgIndex = pTableMetaInfo->vgroupIndex;
497
    
H
hjxilinx 已提交
498
    SVgroupsInfo* pVgroupInfo = pTableMetaInfo->vgroupList;
H
Haojun Liao 已提交
499 500
    assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);

H
hjxilinx 已提交
501
    pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
502
  } else {
H
hjxilinx 已提交
503
    STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
504
    pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
505
  }
506 507

  pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg);
J
jtao1735 已提交
508
  pSql->cmd.msgType = TSDB_MSG_TYPE_FETCH;
509 510 511

  pRetrieveMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));

512
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
513 514
}

515
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hjxilinx 已提交
516
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
517
  STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
H
hjxilinx 已提交
518
  
519
  char* pMsg = pSql->cmd.payload;
520 521 522
  
  // NOTE: shell message size should not include SMsgDesc
  int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
523 524
  int32_t vgId = pTableMeta->vgroupInfo.vgId;

525
  SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
526 527
  pMsgDesc->numOfVnodes = htonl(1); // always one vnode

528
  pMsg += sizeof(SMsgDesc);
529
  SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
530

H
hjxilinx 已提交
531
  pShellMsg->header.vgId = htonl(vgId);
H
Haojun Liao 已提交
532
  pShellMsg->header.contLen = htonl(size);      // the length not includes the size of SMsgDesc
533
  pShellMsg->length = pShellMsg->header.contLen;
H
hjxilinx 已提交
534
  
H
Haojun Liao 已提交
535
  pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit);  // number of tables to be inserted
H
hzcheng 已提交
536

H
hjxilinx 已提交
537
  // pSql->cmd.payloadLen is set during copying data into payload
S
slguan 已提交
538
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
dengyihao's avatar
dengyihao 已提交
539
  tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet);
540

541 542
  tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
      pSql->epSet.numOfEps);
543
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
544 545 546
}

/*
547
 * for table query, simply return the size <= 1k
H
hzcheng 已提交
548
 */
549
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
550
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
551
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
552

S
TD-1057  
Shengliang Guan 已提交
553
  int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo));
H
hjxilinx 已提交
554 555
  
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
S
TD-1057  
Shengliang Guan 已提交
556
  int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs);
H
hjxilinx 已提交
557
  
558
  return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + 4096;
H
hzcheng 已提交
559 560
}

561
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
562
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
B
Bomin Zhang 已提交
563
  TSKEY dfltKey = htobe64(pQueryMsg->window.skey);
564

H
hjxilinx 已提交
565
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
weixin_48148422's avatar
weixin_48148422 已提交
566
  if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
567 568
    
    SCMVgroupInfo* pVgroupInfo = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
569
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
570 571
      int32_t index = pTableMetaInfo->vgroupIndex;
      assert(index >= 0);
H
hjxilinx 已提交
572
  
B
Bomin Zhang 已提交
573
      if (pTableMetaInfo->vgroupList->numOfVgroups > 0) {
H
Haojun Liao 已提交
574
        assert(index < pTableMetaInfo->vgroupList->numOfVgroups);
B
Bomin Zhang 已提交
575 576
        pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
      }
577
      tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups);
H
hjxilinx 已提交
578 579
    } else {
      pVgroupInfo = &pTableMeta->vgroupInfo;
580
    }
weixin_48148422's avatar
weixin_48148422 已提交
581

582 583 584 585
    assert(pVgroupInfo != NULL);

    tscSetDnodeEpSet(pSql, pVgroupInfo);
    pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
weixin_48148422's avatar
weixin_48148422 已提交
586

587
    STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
588 589 590
    pTableIdInfo->tid = htonl(pTableMeta->id.tid);
    pTableIdInfo->uid = htobe64(pTableMeta->id.uid);
    pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->id.uid, dfltKey));
weixin_48148422's avatar
weixin_48148422 已提交
591

592 593
    pQueryMsg->numOfTables = htonl(1);  // set the number of tables
    pMsg += sizeof(STableIdInfo);
594
  } else { // it is a subquery of the super table query, this EP info is acquired from vgroupInfo
595
    int32_t index = pTableMetaInfo->vgroupIndex;
S
TD-1057  
Shengliang Guan 已提交
596
    int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
597
    assert(index >= 0 && index < numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
598

599
    tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
600

601
    SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
602

dengyihao's avatar
bugfix  
dengyihao 已提交
603
    // set the vgroup info 
604
    tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo);
605 606
    pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
    
S
TD-1057  
Shengliang Guan 已提交
607
    int32_t numOfTables = (int32_t)taosArrayGetSize(pTableIdList->itemList);
608 609 610 611 612 613 614 615 616
    pQueryMsg->numOfTables = htonl(numOfTables);  // set the number of tables
  
    // serialize each table id info
    for(int32_t i = 0; i < numOfTables; ++i) {
      STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i);
      
      STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
      pTableIdInfo->tid = htonl(pItem->tid);
      pTableIdInfo->uid = htobe64(pItem->uid);
B
Bomin Zhang 已提交
617
      pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pItem->uid, dfltKey));
618 619 620 621
      pMsg += sizeof(STableIdInfo);
    }
  }
  
622
  tscDebug("%p vgId:%d, query on table:%s, tid:%d, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name,
623
      pTableMeta->id.tid, pTableMeta->id.uid);
H
hjxilinx 已提交
624
  
625 626 627
  return pMsg;
}

628
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
629 630
  SSqlCmd *pCmd = &pSql->cmd;

631
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
632

S
slguan 已提交
633 634
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
H
Haojun Liao 已提交
635
    return TSDB_CODE_TSC_INVALID_SQL;  // todo add test for this
S
slguan 已提交
636
  }
637
  
638
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
639
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
640
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
641 642 643 644 645 646

  size_t numOfSrcCols = taosArrayGetSize(pQueryInfo->colList);
  if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo)) {
    tscError("%p illegal value of numOfCols in query msg: %"PRIu64", table cols:%d", pSql, numOfSrcCols,
        tscGetNumOfColumns(pTableMeta));

H
Haojun Liao 已提交
647
    return TSDB_CODE_TSC_INVALID_SQL;
648
  }
649
  
650 651
  if (pQueryInfo->interval.interval < 0) {
    tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->interval.interval);
H
Haojun Liao 已提交
652
    return TSDB_CODE_TSC_INVALID_SQL;
653 654 655 656
  }
  
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
H
Haojun Liao 已提交
657
    return TSDB_CODE_TSC_INVALID_SQL;
658
  }
659

660
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
H
hzcheng 已提交
661

S
TD-1057  
Shengliang Guan 已提交
662
  int32_t numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList);
663
  
664
  if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
H
hjxilinx 已提交
665 666
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey);
H
hzcheng 已提交
667
  } else {
H
hjxilinx 已提交
668 669
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.ekey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey);
H
hzcheng 已提交
670 671
  }

672 673
  pQueryMsg->order          = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId     = htons(pQueryInfo->order.orderColId);
H
Haojun Liao 已提交
674
  pQueryMsg->fillType       = htons(pQueryInfo->fillType);
675 676
  pQueryMsg->limit          = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset         = htobe64(pQueryInfo->limit.offset);
S
TD-1057  
Shengliang Guan 已提交
677
  pQueryMsg->numOfCols      = htons((int16_t)taosArrayGetSize(pQueryInfo->colList));
678 679 680 681 682 683
  pQueryMsg->interval.interval   = htobe64(pQueryInfo->interval.interval);
  pQueryMsg->interval.sliding   = htobe64(pQueryInfo->interval.sliding);
  pQueryMsg->interval.offset   = htobe64(pQueryInfo->interval.offset);
  pQueryMsg->interval.intervalUnit = pQueryInfo->interval.intervalUnit;
  pQueryMsg->interval.slidingUnit = pQueryInfo->interval.slidingUnit;
  pQueryMsg->interval.offsetUnit = pQueryInfo->interval.offsetUnit;
684
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
685
  pQueryMsg->numOfTags      = htonl(numOfTags);
weixin_48148422's avatar
weixin_48148422 已提交
686
  pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
H
Haojun Liao 已提交
687
  pQueryMsg->queryType      = htonl(pQueryInfo->type);
H
hjxilinx 已提交
688 689
  
  size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
S
TD-1057  
Shengliang Guan 已提交
690
  pQueryMsg->numOfOutput = htons((int16_t)numOfOutput);
H
hzcheng 已提交
691 692

  // set column list ids
693 694
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
H
hjxilinx 已提交
695
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
696
  
697 698 699
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
    SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
700

701
    if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
702
        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
703 704
      tscError("%p tid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
          pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
705 706
               pColSchema->name);

707
      return TSDB_CODE_TSC_INVALID_SQL;
708
    }
H
hzcheng 已提交
709 710 711

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
712
    pQueryMsg->colList[i].type  = htons(pColSchema->type);
S
slguan 已提交
713
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
714

S
slguan 已提交
715 716 717
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
718

S
slguan 已提交
719
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
720
      pFilterMsg->filterstr = htons(pColFilter->filterstr);
S
slguan 已提交
721 722

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
723

724
      if (pColFilter->filterstr) {
S
slguan 已提交
725
        pFilterMsg->len = htobe64(pColFilter->len);
726
        memcpy(pMsg, (void *)pColFilter->pz, (size_t)(pColFilter->len + 1));
S
slguan 已提交
727 728 729 730 731 732 733 734
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
735

S
slguan 已提交
736 737
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
H
Haojun Liao 已提交
738
        return TSDB_CODE_TSC_INVALID_SQL;
S
slguan 已提交
739 740
      }
    }
H
hzcheng 已提交
741 742
  }

H
hjxilinx 已提交
743
  SSqlFuncMsg *pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hjxilinx 已提交
744
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
745
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
746

747
    if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
748
      /* column id is not valid according to the cached table meta, the table meta is expired */
H
hzcheng 已提交
749
      tscError("%p table schema is not matched with parsed sql", pSql);
H
Haojun Liao 已提交
750
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
751 752
    }

753 754 755
    pSqlFuncExpr->colInfo.colId    = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
    pSqlFuncExpr->colInfo.flag     = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
756

757
    pSqlFuncExpr->functionId  = htons(pExpr->functionId);
H
hzcheng 已提交
758
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
H
hjxilinx 已提交
759
    pMsg += sizeof(SSqlFuncMsg);
H
hzcheng 已提交
760 761

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
H
Haojun Liao 已提交
762
      // todo add log
H
hzcheng 已提交
763 764 765 766 767
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
768
        pMsg += pExpr->param[j].nLen;
H
hzcheng 已提交
769 770 771 772 773
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

H
hjxilinx 已提交
774
    pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hzcheng 已提交
775
  }
776
  
777
  // serialize the table info (sid, uid, tags)
778 779
  pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
  
780
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
781
  if (pGroupbyExpr->numOfGroupCols > 0) {
S
slguan 已提交
782
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
783 784
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
785
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
786 787
      SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
  
S
slguan 已提交
788 789 790
      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

791 792
      *((int16_t *)pMsg) += pCol->colIndex;
      pMsg += sizeof(pCol->colIndex);
S
slguan 已提交
793 794 795

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
796 797 798
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
799 800 801
    }
  }

802
  if (pQueryInfo->fillType != TSDB_FILL_NONE) {
H
hjxilinx 已提交
803
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
804 805
      *((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]);
      pMsg += sizeof(pQueryInfo->fillVal[0]);
S
slguan 已提交
806 807
    }
  }
808 809 810 811 812 813 814 815 816
  
  if (numOfTags != 0) {
    int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
    int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
    int32_t total = numOfTagColumns + numOfColumns;
    
    pSchema = tscGetTableTagSchema(pTableMeta);
    
    for (int32_t i = 0; i < numOfTags; ++i) {
H
hjxilinx 已提交
817
      SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
818 819 820 821
      SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];

      if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
          (pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
822 823
        tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
                 pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, total, numOfTagColumns,
B
Bomin Zhang 已提交
824
                 pCol->colIndex.columnIndex, pColSchema->name);
825

826
        return TSDB_CODE_TSC_INVALID_SQL;
827 828 829 830 831 832 833 834 835 836 837 838
      }
  
      SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
  
      pTagCol->colId = htons(pColSchema->colId);
      pTagCol->bytes = htons(pColSchema->bytes);
      pTagCol->type  = htons(pColSchema->type);
      pTagCol->numOfFilters = 0;
      
      pMsg += sizeof(SColumnInfo);
    }
  }
S
slguan 已提交
839

H
Haojun Liao 已提交
840 841 842 843
  // serialize tag column query condition
  if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
    STagCond* pTagCond = &pQueryInfo->tagCond;
    
844
    SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid);
H
Haojun Liao 已提交
845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860
    if (pCond != NULL && pCond->cond != NULL) {
      pQueryMsg->tagCondLen = htons(pCond->len);
      memcpy(pMsg, pCond->cond, pCond->len);
      
      pMsg += pCond->len;
    }
  }
  
  if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
    *pMsg = 0;
    pMsg++;
  } else {
    strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
    pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
  }

S
slguan 已提交
861
  // compressed ts block
S
TD-1057  
Shengliang Guan 已提交
862
  pQueryMsg->tsOffset = htonl((int32_t)(pMsg - pCmd->payload));
S
slguan 已提交
863 864 865
  int32_t tsLen = 0;
  int32_t numOfBlocks = 0;

866
  if (pQueryInfo->tsBuf != NULL) {
H
hjxilinx 已提交
867
    STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vgroupIndex);
868
    assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL);  // this query should not be sent
S
slguan 已提交
869 870

    // todo refactor
B
Bomin Zhang 已提交
871 872 873 874 875
    if (fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET) != 0) {
      int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
      tscError("%p: fseek failed: %s", pSql, tstrerror(code));
      return code;
    }
H
Haojun Liao 已提交
876 877 878

    size_t s = fread(pMsg, 1, pBlockInfo->compLen, pQueryInfo->tsBuf->f);
    if (s != pBlockInfo->compLen) {
B
Bomin Zhang 已提交
879 880 881 882
      int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
      tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
      return code;
    }
S
slguan 已提交
883 884 885 886

    pMsg += pBlockInfo->compLen;
    tsLen = pBlockInfo->compLen;
    numOfBlocks = pBlockInfo->numOfBlocks;
H
hzcheng 已提交
887 888
  }

S
slguan 已提交
889 890
  pQueryMsg->tsLen = htonl(tsLen);
  pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
891 892
  if (pQueryInfo->tsBuf != NULL) {
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
H
hzcheng 已提交
893 894
  }

S
TD-1057  
Shengliang Guan 已提交
895
  int32_t msgLen = (int32_t)(pMsg - pCmd->payload);
H
hzcheng 已提交
896

897
  tscDebug("%p msg built success,len:%d bytes", pSql, msgLen);
H
hzcheng 已提交
898
  pCmd->payloadLen = msgLen;
S
slguan 已提交
899
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hjxilinx 已提交
900
  
901
  pQueryMsg->head.contLen = htonl(msgLen);
S
TD-1057  
Shengliang Guan 已提交
902
  assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize);
903 904

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
905 906
}

907 908
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
909
  pCmd->payloadLen = sizeof(SCMCreateDbMsg);
S
slguan 已提交
910
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
H
hzcheng 已提交
911

912
  SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload;
913

914
  assert(pCmd->numOfClause == 1);
915
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
916
  tstrncpy(pCreateDbMsg->db, pTableMetaInfo->name, sizeof(pCreateDbMsg->db));
H
hzcheng 已提交
917

918
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
919 920
}

921 922
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
923
  pCmd->payloadLen = sizeof(SCMCreateDnodeMsg);
S
slguan 已提交
924 925
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
926
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
927
  }
H
hzcheng 已提交
928

929
  SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload;
J
jtao1735 已提交
930 931
  strncpy(pCreate->ep, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
  
S
slguan 已提交
932
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
H
hzcheng 已提交
933

934
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
935 936
}

937 938
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
939
  pCmd->payloadLen = sizeof(SCMCreateAcctMsg);
S
slguan 已提交
940 941
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
942
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
943
  }
H
hzcheng 已提交
944

945
  SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
946

H
Haojun Liao 已提交
947 948
  SStrToken *pName = &pInfo->pDCLInfo->user.user;
  SStrToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
949

950 951
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
952

953
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
954

955 956 957 958 959 960 961 962
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
963

964 965 966 967 968 969 970 971 972 973 974 975 976
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
977

S
slguan 已提交
978
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
979
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
980 981
}

982 983
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
984
  pCmd->payloadLen = sizeof(SCMCreateUserMsg);
S
slguan 已提交
985

S
slguan 已提交
986 987
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
988
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
989 990
  }

991
  SCMCreateUserMsg *pAlterMsg = (SCMCreateUserMsg*)pCmd->payload;
H
hzcheng 已提交
992

993 994
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
S
TD-1057  
Shengliang Guan 已提交
995
  pAlterMsg->flag = (int8_t)pUser->type;
H
hzcheng 已提交
996

997 998 999 1000
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
1001 1002
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
1003
  }
H
hzcheng 已提交
1004

1005
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
S
slguan 已提交
1006
    pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
1007
  } else {
S
slguan 已提交
1008
    pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
1009
  }
H
hzcheng 已提交
1010

1011
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1012 1013
}

1014 1015
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1016
  pCmd->payloadLen = sizeof(SCMCfgDnodeMsg);
guanshengliang's avatar
guanshengliang 已提交
1017
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONFIG_DNODE;
1018 1019
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
1020

1021 1022
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1023
  pCmd->payloadLen = sizeof(SCMDropDbMsg);
H
hzcheng 已提交
1024

S
slguan 已提交
1025 1026
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1027
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1028 1029
  }

1030
  SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
1031

1032
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1033
  tstrncpy(pDropDbMsg->db, pTableMetaInfo->name, sizeof(pDropDbMsg->db));
1034
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1035

S
slguan 已提交
1036
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
1037
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1038 1039
}

1040 1041
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1042
  pCmd->payloadLen = sizeof(SCMDropTableMsg);
H
hzcheng 已提交
1043

S
slguan 已提交
1044 1045
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1046
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1047
  }
H
hzcheng 已提交
1048

1049
  SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
1050
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1051
  strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
S
slguan 已提交
1052
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1053

S
slguan 已提交
1054
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
1055
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1056 1057
}

1058
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1059
  SSqlCmd *pCmd = &pSql->cmd;
1060
  pCmd->payloadLen = sizeof(SCMDropDnodeMsg);
S
slguan 已提交
1061 1062
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1063
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1064
  }
H
hzcheng 已提交
1065

1066
  SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
1067
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1068
  tstrncpy(pDrop->ep, pTableMetaInfo->name, sizeof(pDrop->ep));
S
slguan 已提交
1069
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
H
hzcheng 已提交
1070

1071
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1072 1073
}

S
[TD-16]  
slguan 已提交
1074
int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1075
  SSqlCmd *pCmd = &pSql->cmd;
1076
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
S
slguan 已提交
1077
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
H
hzcheng 已提交
1078

S
slguan 已提交
1079 1080
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1081
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1082
  }
H
hzcheng 已提交
1083

1084
  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
1085
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1086
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
H
hzcheng 已提交
1087

1088
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1089 1090
}

S
[TD-16]  
slguan 已提交
1091 1092 1093 1094 1095 1096 1097
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_ACCT;

  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1098
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
[TD-16]  
slguan 已提交
1099 1100 1101 1102
  }

  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1103
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
S
[TD-16]  
slguan 已提交
1104 1105 1106 1107

  return TSDB_CODE_SUCCESS;
}

1108 1109
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1110
  pCmd->payloadLen = sizeof(SCMUseDbMsg);
H
hzcheng 已提交
1111

S
slguan 已提交
1112 1113
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1114
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1115
  }
1116

1117
  SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload;
1118
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1119
  strcpy(pUseDbMsg->db, pTableMetaInfo->name);
S
slguan 已提交
1120
  pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
H
hzcheng 已提交
1121

1122
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1123 1124
}

1125
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1126
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1127
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1128
  pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
1129
  pCmd->payloadLen = sizeof(SCMShowMsg) + 100;
H
hzcheng 已提交
1130

S
slguan 已提交
1131 1132
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1133
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1134
  }
H
hzcheng 已提交
1135

1136
  SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload;
S
slguan 已提交
1137

1138
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1139
  size_t nameLen = strlen(pTableMetaInfo->name);
S
slguan 已提交
1140
  if (nameLen > 0) {
B
Bomin Zhang 已提交
1141
    tstrncpy(pShowMsg->db, pTableMetaInfo->name, sizeof(pShowMsg->db));  // prefix is set here
H
hzcheng 已提交
1142
  } else {
B
Bomin Zhang 已提交
1143
    tstrncpy(pShowMsg->db, pObj->db, sizeof(pShowMsg->db));
H
hzcheng 已提交
1144 1145
  }

1146 1147 1148 1149
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
H
Haojun Liao 已提交
1150
    SStrToken *pPattern = &pShowInfo->pattern;
1151 1152 1153 1154 1155
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
H
Haojun Liao 已提交
1156
    SStrToken *pEpAddr = &pShowInfo->prefix;
dengyihao's avatar
dengyihao 已提交
1157
    assert(pEpAddr->n > 0 && pEpAddr->type > 0);
H
hzcheng 已提交
1158

dengyihao's avatar
dengyihao 已提交
1159 1160
    strncpy(pShowMsg->payload, pEpAddr->z, pEpAddr->n);
    pShowMsg->payloadLen = htons(pEpAddr->n);
1161 1162
  }

1163
  pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
1164
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1165 1166
}

1167
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1168
  SSqlCmd *pCmd = &pSql->cmd;
1169
  pCmd->payloadLen = sizeof(SCMKillQueryMsg);
H
hzcheng 已提交
1170

1171 1172
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
S
slguan 已提交
1173
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
1174 1175
      break;
    case TSDB_SQL_KILL_CONNECTION:
S
slguan 已提交
1176
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
1177 1178
      break;
    case TSDB_SQL_KILL_STREAM:
S
slguan 已提交
1179
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
1180 1181 1182
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1183 1184
}

1185
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1186 1187
  SSqlCmd *pCmd = &(pSql->cmd);

guanshengliang's avatar
guanshengliang 已提交
1188
  int32_t size = minMsgSize() + sizeof(SCMCreateTableMsg);
H
hzcheng 已提交
1189

1190
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1191
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
1192 1193
    size += sizeof(STagData);
  } else {
S
slguan 已提交
1194
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1195
  }
1196

1197 1198 1199
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1200 1201 1202 1203

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1204
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1205
  int              msgLen = 0;
S
slguan 已提交
1206
  SSchema *        pSchema;
H
hzcheng 已提交
1207
  int              size = 0;
1208 1209 1210
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1211
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1212 1213

  // Reallocate the payload size
1214
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1215 1216
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1217
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1218
  }
H
hzcheng 已提交
1219 1220


1221
  SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
H
hjxilinx 已提交
1222
  strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
1223 1224

  // use dbinfo from table id without modifying current db info
H
Haojun Liao 已提交
1225
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pCreateTableMsg->db);
H
hzcheng 已提交
1226

1227 1228 1229
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;
H
hzcheng 已提交
1230 1231 1232 1233
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
S
slguan 已提交
1234
  char *pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
1235

1236 1237
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
1238 1239 1240 1241 1242 1243 1244
    STagData* pTag = &pInfo->pCreateTableInfo->usingInfo.tagdata;
    *(int32_t*)pMsg = htonl(pTag->dataLen);
    pMsg += sizeof(int32_t);
    memcpy(pMsg, pTag->name, sizeof(pTag->name));
    pMsg += sizeof(pTag->name);
    memcpy(pMsg, pTag->data, pTag->dataLen);
    pMsg += pTag->dataLen;
1245
  } else {  // create (super) table
1246
    pSchema = (SSchema *)pCreateTableMsg->schema;
1247

H
hzcheng 已提交
1248
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
H
hjxilinx 已提交
1249
      TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hzcheng 已提交
1250 1251 1252 1253

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
1254

H
hzcheng 已提交
1255 1256 1257 1258
      pSchema++;
    }

    pMsg = (char *)pSchema;
1259 1260
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
1261

1262 1263 1264
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
1265 1266 1267
    }
  }

H
hjxilinx 已提交
1268
  tscFieldInfoClear(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
1269

S
TD-1057  
Shengliang Guan 已提交
1270
  msgLen = (int32_t)(pMsg - (char*)pCreateTableMsg);
S
slguan 已提交
1271
  pCreateTableMsg->contLen = htonl(msgLen);
H
hzcheng 已提交
1272
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1273
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
H
hzcheng 已提交
1274 1275

  assert(msgLen + minMsgSize() <= size);
1276
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1277 1278 1279
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
1280
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
guanshengliang's avatar
guanshengliang 已提交
1281
  return minMsgSize() + sizeof(SCMAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
1282 1283 1284
         TSDB_EXTRA_PAYLOAD_SIZE;
}

1285
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1286 1287
  char *pMsg;
  int   msgLen = 0;
H
hzcheng 已提交
1288

1289
  SSqlCmd    *pCmd = &pSql->cmd;
1290 1291
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
1292
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1293 1294 1295
  
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;
  int size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
1296 1297
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
H
Haojun Liao 已提交
1298
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1299
  }
1300 1301
  
  SCMAlterTableMsg *pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
H
Haojun Liao 已提交
1302
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
1303

H
hjxilinx 已提交
1304
  strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
1305
  pAlterTableMsg->type = htons(pAlterInfo->type);
1306

1307
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
S
slguan 已提交
1308
  SSchema *pSchema = pAlterTableMsg->schema;
1309
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
H
hjxilinx 已提交
1310
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
1311
  
H
hzcheng 已提交
1312 1313 1314 1315 1316 1317 1318
    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;
1319 1320 1321
  pAlterTableMsg->tagValLen = htonl(pAlterInfo->tagData.dataLen);
  memcpy(pMsg, pAlterInfo->tagData.data, pAlterInfo->tagData.dataLen);
  pMsg += pAlterInfo->tagData.dataLen;
H
hzcheng 已提交
1322

S
TD-1057  
Shengliang Guan 已提交
1323
  msgLen = (int32_t)(pMsg - (char*)pAlterTableMsg);
1324

H
hzcheng 已提交
1325
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1326
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
H
hzcheng 已提交
1327 1328

  assert(msgLen + minMsgSize() <= size);
1329

1330
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1331 1332
}

1333 1334 1335 1336
int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
  SSqlCmd* pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL;
  
1337
  SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
1338
  pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
1339

1340 1341
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1342

dengyihao's avatar
dengyihao 已提交
1343
  tscDumpEpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->epSet);
1344

1345 1346 1347
  return TSDB_CODE_SUCCESS;
}

1348
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1349
  SSqlCmd *pCmd = &pSql->cmd;
1350
  pCmd->payloadLen = sizeof(SCMAlterDbMsg);
S
slguan 已提交
1351
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
H
hzcheng 已提交
1352

1353
  SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
1354
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1355
  tstrncpy(pAlterDbMsg->db, pTableMetaInfo->name, sizeof(pAlterDbMsg->db));
H
hzcheng 已提交
1356

1357
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1358 1359
}

1360
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1361
  SSqlCmd *pCmd = &pSql->cmd;
J
jtao1735 已提交
1362
  pCmd->msgType = TSDB_MSG_TYPE_CM_RETRIEVE;
S
slguan 已提交
1363
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
1364

S
slguan 已提交
1365 1366
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1367
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
1368
  }
S
slguan 已提交
1369

S
slguan 已提交
1370 1371 1372 1373
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
1374

1375
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1376 1377
}

1378
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
1379
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1380 1381 1382
    return pRes->code;
  }

H
hjxilinx 已提交
1383
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
H
hjxilinx 已提交
1384
    int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
H
hjxilinx 已提交
1385
    pRes->tsrow[i] = ((char*) pRes->data + offset * pRes->numOfRows);
H
hzcheng 已提交
1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
1400

1401
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1402

H
hzcheng 已提交
1403 1404 1405 1406 1407 1408
  pRes->code = TSDB_CODE_SUCCESS;
  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

1409
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1410
  } else {
S
slguan 已提交
1411
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
1427
  SSqlCmd *       pCmd = &pSql->cmd;
1428
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
1429

H
hjxilinx 已提交
1430
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1431 1432
  
  int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
H
hzcheng 已提交
1433 1434 1435
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
Haojun Liao 已提交
1436 1437 1438
int tscProcessLocalRetrieveRsp(SSqlObj *pSql) {
  int32_t numOfRes = 1;
  pSql->res.completed = true;
H
hzcheng 已提交
1439 1440 1441
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
hjxilinx 已提交
1442
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
H
hzcheng 已提交
1443 1444 1445
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
Haojun Liao 已提交
1446 1447 1448 1449 1450 1451
  int32_t code = pRes->code;
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return code;
  }

1452
  pRes->code = tscDoLocalMerge(pSql);
H
hjxilinx 已提交
1453
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1454 1455

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
1456
    tscCreateResPointerInfo(pRes, pQueryInfo);
H
hzcheng 已提交
1457 1458 1459
  }

  pRes->row = 0;
1460
  pRes->completed = (pRes->numOfRows == 0);
H
hzcheng 已提交
1461

H
Haojun Liao 已提交
1462
  code = pRes->code;
H
hjxilinx 已提交
1463 1464 1465 1466
  if (pRes->code == TSDB_CODE_SUCCESS) {
    (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
  } else {
    tscQueueAsyncRes(pSql);
H
hzcheng 已提交
1467 1468 1469 1470 1471
  }

  return code;
}

S
slguan 已提交
1472
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
1473

1474
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1475
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1476
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1477
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
1478
  pCmd->payloadLen = sizeof(SCMConnectMsg);
H
hzcheng 已提交
1479

S
slguan 已提交
1480 1481
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1482
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1483 1484
  }

1485
  SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
H
hzcheng 已提交
1486 1487 1488 1489

  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
B
Bomin Zhang 已提交
1490 1491 1492
  tstrncpy(pConnect->db, db, sizeof(pConnect->db));
  tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion));
  tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion));
H
hzcheng 已提交
1493

1494
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1495 1496
}

H
hjxilinx 已提交
1497
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1498
  SCMTableInfoMsg *pInfoMsg;
S
slguan 已提交
1499
  char *         pMsg;
H
hzcheng 已提交
1500 1501
  int            msgLen = 0;

B
Bomin Zhang 已提交
1502 1503 1504
  char *tmpData = NULL;
  uint32_t len = pSql->cmd.payloadLen;
  if (len > 0) {
H
Haojun Liao 已提交
1505
    if ((tmpData = calloc(1, len)) == NULL) {
1506
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
1507 1508
    }

H
hzcheng 已提交
1509
    // STagData is in binary format, strncpy is not available
B
Bomin Zhang 已提交
1510
    memcpy(tmpData, pSql->cmd.payload, len);
H
hzcheng 已提交
1511 1512
  }

1513 1514 1515
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
1516
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1517

1518
  pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
H
hjxilinx 已提交
1519
  strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
H
hjxilinx 已提交
1520
  pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
S
slguan 已提交
1521

1522
  pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
H
hzcheng 已提交
1523

B
Bomin Zhang 已提交
1524 1525 1526
  if (pSql->cmd.autoCreated && len > 0) {
    memcpy(pInfoMsg->tags, tmpData, len);
    pMsg += len;
H
hzcheng 已提交
1527 1528
  }

S
TD-1057  
Shengliang Guan 已提交
1529
  pCmd->payloadLen = (int32_t)(pMsg - (char*)pInfoMsg);
S
slguan 已提交
1530
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
H
hzcheng 已提交
1531

S
Shengliang Guan 已提交
1532
  taosTFree(tmpData);
H
hzcheng 已提交
1533

S
TD-1057  
Shengliang Guan 已提交
1534
  assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize);
1535
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1536 1537
}

S
slguan 已提交
1538
/**
1539
 *  multi table meta req pkg format:
1540
 *  | SMgmtHead | SCMMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
1541 1542
 *      no used         4B
 **/
1543
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
guanshengliang's avatar
guanshengliang 已提交
1544
#if 0
S
slguan 已提交
1545 1546 1547 1548 1549
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
H
Haojun Liao 已提交
1550
    if ((tmpData = calloc(1, pCmd->payloadLen + 1)) == NULL) return -1;
S
slguan 已提交
1551 1552 1553 1554 1555
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
H
Haojun Liao 已提交
1556
  memset(pMgmt->db, 0, TSDB_TABLE_FNAME_LEN);  // server don't need the db
S
slguan 已提交
1557

1558
  SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
S
slguan 已提交
1559
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
1560 1561

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
1562
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
1563 1564
  }

S
Shengliang Guan 已提交
1565
  taosTFree(tmpData);
S
slguan 已提交
1566

1567
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1568
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
S
slguan 已提交
1569 1570 1571

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

1572
  tscDebug("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
1573 1574 1575
           pCmd->payloadLen);

  return pCmd->payloadLen;
guanshengliang's avatar
guanshengliang 已提交
1576 1577
#endif
  return 0;  
S
slguan 已提交
1578 1579
}

H
hjxilinx 已提交
1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596
//static UNUSED_FUNC int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
////  const int32_t defaultSize =
////      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
////  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
////
////  int32_t n = 0;
////  size_t size = taosArrayGetSize(pQueryInfo->tagCond.pCond);
////  for (int32_t i = 0; i < size; ++i) {
////    assert(0);
//////    n += strlen(pQueryInfo->tagCond.cond[i].cond);
////  }
////
////  int32_t tagLen = n * TSDB_NCHAR_SIZE;
////  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
////    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
////  }
////
H
Haojun Liao 已提交
1597
////  int32_t joinCondLen = (TSDB_TABLE_FNAME_LEN + sizeof(int16_t)) * 2;
H
hjxilinx 已提交
1598 1599 1600 1601 1602 1603 1604 1605
////  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
////
////  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndex);
////
////  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
////
////  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
//}
H
hzcheng 已提交
1606

H
hjxilinx 已提交
1607 1608
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hjxilinx 已提交
1609 1610 1611 1612 1613 1614 1615 1616 1617 1618
  
  char* pMsg = pCmd->payload;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  
  SCMSTableVgroupMsg *pStableVgroupMsg = (SCMSTableVgroupMsg *) pMsg;
  pStableVgroupMsg->numOfTables = htonl(pQueryInfo->numOfTables);
  pMsg += sizeof(SCMSTableVgroupMsg);
  
  for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
1619 1620 1621
    size_t size = sizeof(pTableMetaInfo->name);
    tstrncpy(pMsg, pTableMetaInfo->name, size);
    pMsg += size;
H
hjxilinx 已提交
1622
  }
H
hjxilinx 已提交
1623 1624

  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
S
TD-1057  
Shengliang Guan 已提交
1625
  pCmd->payloadLen = (int32_t)(pMsg - pCmd->payload);
H
hjxilinx 已提交
1626

1627
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1628 1629
}

1630 1631
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1632 1633
  STscObj *pObj = pSql->pTscObj;

1634
  pthread_mutex_lock(&pObj->mutex);
H
hzcheng 已提交
1635

S
Shengliang Guan 已提交
1636
  int32_t numOfQueries = 2;
H
hzcheng 已提交
1637 1638 1639
  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
    tpSql = tpSql->next;
1640
    numOfQueries++;
H
hzcheng 已提交
1641 1642
  }

S
Shengliang Guan 已提交
1643
  int32_t numOfStreams = 2;
H
hzcheng 已提交
1644 1645 1646
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
    pStream = pStream->next;
1647
    numOfStreams++;
H
hzcheng 已提交
1648 1649
  }

1650
  int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100;
S
slguan 已提交
1651
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
1652
    pthread_mutex_unlock(&pObj->mutex);
S
slguan 已提交
1653
    tscError("%p failed to malloc for heartbeat msg", pSql);
H
Haojun Liao 已提交
1654
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1655
  }
H
hzcheng 已提交
1656

1657
  SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
1658 1659
  pHeartbeat->numOfQueries = numOfQueries;
  pHeartbeat->numOfStreams = numOfStreams;
1660
  int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj);
H
hzcheng 已提交
1661 1662 1663 1664

  pthread_mutex_unlock(&pObj->mutex);

  pCmd->payloadLen = msgLen;
S
slguan 已提交
1665
  pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
H
hzcheng 已提交
1666

1667
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1668 1669
}

1670 1671
int tscProcessTableMetaRsp(SSqlObj *pSql) {
  STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
H
hzcheng 已提交
1672

1673 1674
  pMetaMsg->sid = htonl(pMetaMsg->sid);
  pMetaMsg->sversion = htons(pMetaMsg->sversion);
1675
  pMetaMsg->tversion = htons(pMetaMsg->tversion);
H
hjxilinx 已提交
1676 1677
  pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
  
1678 1679
  pMetaMsg->uid = htobe64(pMetaMsg->uid);
  pMetaMsg->contLen = htons(pMetaMsg->contLen);
1680
  pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1681

H
Haojun Liao 已提交
1682 1683
  if ((pMetaMsg->tableType != TSDB_SUPER_TABLE) &&
      (pMetaMsg->sid <= 0 || pMetaMsg->vgroup.vgId < 2 || pMetaMsg->vgroup.numOfEps <= 0)) {
H
Haojun Liao 已提交
1684 1685
    tscError("invalid value in table numOfEps:%d, vgId:%d tid:%d, name:%s", pMetaMsg->vgroup.numOfEps, pMetaMsg->vgroup.vgId,
             pMetaMsg->sid, pMetaMsg->tableId);
1686
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1687 1688
  }

B
Bomin Zhang 已提交
1689
  if (pMetaMsg->numOfTags > TSDB_MAX_TAGS) {
1690
    tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
1691
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1692 1693
  }

1694 1695
  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
1696
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1697 1698
  }

1699 1700
  for (int i = 0; i < pMetaMsg->vgroup.numOfEps; ++i) {
    pMetaMsg->vgroup.epAddr[i].port = htons(pMetaMsg->vgroup.epAddr[i].port);
H
hzcheng 已提交
1701 1702
  }

1703
  SSchema* pSchema = pMetaMsg->schema;
H
hzcheng 已提交
1704

1705
  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
H
hzcheng 已提交
1706 1707 1708
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);
1709 1710 1711 1712 1713

    if (pSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
      assert(i == 0);
    }

H
hjxilinx 已提交
1714
    assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
1715 1716 1717
    pSchema++;
  }

1718 1719
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
1720
  
H
hzcheng 已提交
1721
  // todo add one more function: taosAddDataIfNotExists();
1722
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1723
  assert(pTableMetaInfo->pTableMeta == NULL);
H
hzcheng 已提交
1724

H
Haojun Liao 已提交
1725
  pTableMetaInfo->pTableMeta = (STableMeta *) taosCachePut(tscMetaCache, pTableMetaInfo->name,
1726
      strlen(pTableMetaInfo->name), pTableMeta, size, tsTableMetaKeepTimer * 1000);
H
hjxilinx 已提交
1727
  
1728
  // todo handle out of memory case
1729
  if (pTableMetaInfo->pTableMeta == NULL) {
B
Bomin Zhang 已提交
1730
    free(pTableMeta);
1731
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1732
  }
H
hzcheng 已提交
1733

1734
  tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->id.uid, pTableMeta->id.tid, pTableMetaInfo->name);
1735
  free(pTableMeta);
1736
  
H
hjxilinx 已提交
1737
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1738 1739
}

S
slguan 已提交
1740
/**
1741
 *  multi table meta rsp pkg format:
1742
 *  | STaosRsp | ieType | SCMMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
1743 1744 1745
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
1746
#if 0
S
slguan 已提交
1747 1748 1749 1750 1751
  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
1752
    pSql->res.code = TSDB_CODE_TSC_INVALID_IE;
S
slguan 已提交
1753
    pSql->res.numOfTotal = 0;
1754
    return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1755 1756 1757 1758
  }

  rsp++;

1759
  SCMMultiTableInfoMsg *pInfo = (SCMMultiTableInfoMsg *)rsp;
S
slguan 已提交
1760
  totalNum = htonl(pInfo->numOfTables);
1761
  rsp += sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1762 1763

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
1764
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
1765
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
1766 1767 1768

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
H
hjxilinx 已提交
1769
    pMeta->vgId = htonl(pMeta->vgId);
S
slguan 已提交
1770 1771
    pMeta->uid = htobe64(pMeta->uid);

H
hjxilinx 已提交
1772 1773
    if (pMeta->sid <= 0 || pMeta->vgId < 0) {
      tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
1774
      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
S
slguan 已提交
1775
      pSql->res.numOfTotal = i;
1776
      return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1777 1778
    }

H
hjxilinx 已提交
1779 1780 1781 1782
    //    pMeta->numOfColumns = htons(pMeta->numOfColumns);
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid tag value count:%d", pMeta->numOfTags);
1783
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1784
    //      pSql->res.numOfTotal = i;
1785
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1786 1787 1788 1789
    //    }
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid numOfTags:%d", pMeta->numOfTags);
1790
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1791
    //      pSql->res.numOfTotal = i;
1792
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1793 1794 1795 1796
    //    }
    //
    //    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
    //      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
1797
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1798
    //      pSql->res.numOfTotal = i;
1799
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1800 1801
    //    }
    //
H
hjxilinx 已提交
1802
    //    for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) {
H
hjxilinx 已提交
1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836
    //      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    //    }
    //
    //    pMeta->rowSize = 0;
    //    rsp += sizeof(SMultiTableMeta);
    //    pSchema = (SSchema *)rsp;
    //
    //    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    //    for (int j = 0; j < numOfTotalCols; ++j) {
    //      pSchema->bytes = htons(pSchema->bytes);
    //      pSchema->colId = htons(pSchema->colId);
    //
    //      // ignore the tags length
    //      if (j < pMeta->numOfColumns) {
    //        pMeta->rowSize += pSchema->bytes;
    //      }
    //      pSchema++;
    //    }
    //
    //    rsp += numOfTotalCols * sizeof(SSchema);
    //
    //    int32_t  tagLen = 0;
    //    SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
    //
    //    if (pMeta->tableType == TSDB_CHILD_TABLE) {
    //      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
    //        tagLen += pTagsSchema[j].bytes;
    //      }
    //    }
    //
    //    rsp += tagLen;
    //    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
    //
    //    pMeta->index = 0;
H
Haojun Liao 已提交
1837
    //    (void)taosCachePut(tscMetaCache, pMeta->tableId, (char *)pMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1838
    //  }
S
slguan 已提交
1839
  }
H
hjxilinx 已提交
1840
  
S
slguan 已提交
1841 1842
  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
1843
  tscDebug("%p load multi-metermeta resp from complete num:%d", pSql, pSql->res.numOfTotal);
1844 1845
#endif
  
S
slguan 已提交
1846 1847 1848
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
1849
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
1850
  SSqlRes* pRes = &pSql->res;
H
hjxilinx 已提交
1851
  
H
hjxilinx 已提交
1852
  // NOTE: the order of several table must be preserved.
1853
  SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pRes->pRsp;
H
hjxilinx 已提交
1854 1855
  pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables);
  char* pMsg = pRes->pRsp + sizeof(SCMSTableVgroupRspMsg);
H
hjxilinx 已提交
1856
  
1857 1858 1859
  // master sqlObj locates in param
  SSqlObj* parent = pSql->param;
  assert(parent != NULL);
H
hjxilinx 已提交
1860
  
1861
  SSqlCmd* pCmd = &parent->cmd;
H
hjxilinx 已提交
1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872
  for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) {
    STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
    SVgroupsInfo *  pVgroupInfo = (SVgroupsInfo *)pMsg;
    pVgroupInfo->numOfVgroups = htonl(pVgroupInfo->numOfVgroups);

    size_t size = sizeof(SCMVgroupInfo) * pVgroupInfo->numOfVgroups + sizeof(SVgroupsInfo);
    pInfo->vgroupList = calloc(1, size);
    assert(pInfo->vgroupList != NULL);

    memcpy(pInfo->vgroupList, pVgroupInfo, size);
    for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
dengyihao's avatar
dengyihao 已提交
1873
      //just init, no need to lock
H
hjxilinx 已提交
1874 1875
      SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
      pVgroups->vgId = htonl(pVgroups->vgId);
1876
      assert(pVgroups->numOfEps >= 1);
H
hjxilinx 已提交
1877

1878 1879
      for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
        pVgroups->epAddr[k].port = htons(pVgroups->epAddr[k].port);
H
hjxilinx 已提交
1880
      }
1881
    }
1882 1883

    pMsg += size;
H
hjxilinx 已提交
1884 1885
  }
  
S
slguan 已提交
1886
  return pSql->res.code;
H
hzcheng 已提交
1887 1888 1889 1890 1891 1892
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
H
hjxilinx 已提交
1893
  STableMetaMsg * pMetaMsg;
1894
  SCMShowRsp *pShow;
S
slguan 已提交
1895
  SSchema *    pSchema;
H
hzcheng 已提交
1896 1897
  char         key[20];

1898 1899 1900
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
1901
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
1902

H
hjxilinx 已提交
1903
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1904

1905
  pShow = (SCMShowRsp *)pRes->pRsp;
S
slguan 已提交
1906
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
1907 1908
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
1909
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
1910
  pMetaMsg = &(pShow->tableMeta);
H
hzcheng 已提交
1911

H
hjxilinx 已提交
1912
  pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1913

H
hjxilinx 已提交
1914
  pSchema = pMetaMsg->schema;
H
hjxilinx 已提交
1915 1916
  pMetaMsg->sid = ntohs(pMetaMsg->sid);
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
hzcheng 已提交
1917 1918 1919 1920
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

1921
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
1922 1923
  strcpy(key + 1, "showlist");

H
Haojun Liao 已提交
1924 1925 1926 1927
  if (pTableMetaInfo->pTableMeta != NULL) {
    taosCacheRelease(tscMetaCache, (void *)&(pTableMetaInfo->pTableMeta), false);
  }

H
hjxilinx 已提交
1928 1929 1930
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
  
H
Haojun Liao 已提交
1931
  pTableMetaInfo->pTableMeta = taosCachePut(tscMetaCache, key, strlen(key), (char *)pTableMeta, size,
1932
      tsTableMetaKeepTimer * 1000);
H
hjxilinx 已提交
1933
  SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
1934

1935 1936 1937 1938
  if (pQueryInfo->colList == NULL) {
    pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
  }
  
H
hjxilinx 已提交
1939 1940
  SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
  
S
slguan 已提交
1941
  SColumnIndex index = {0};
H
hjxilinx 已提交
1942 1943 1944
  pSchema = pMetaMsg->schema;
  
  for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) {
S
slguan 已提交
1945
    index.columnIndex = i;
1946 1947
    tscColumnListInsert(pQueryInfo->colList, &index);
    
H
hjxilinx 已提交
1948 1949
    TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes);
    SFieldSupInfo* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
H
hjxilinx 已提交
1950
    
H
hjxilinx 已提交
1951
    pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
1952
                     pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false);
H
hzcheng 已提交
1953
  }
H
hjxilinx 已提交
1954 1955
  
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
1956
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
1957
  
S
Shengliang Guan 已提交
1958
  taosTFree(pTableMeta);
H
hzcheng 已提交
1959 1960 1961
  return 0;
}

1962
// TODO multithread problem
1963 1964 1965 1966 1967 1968 1969 1970 1971 1972
static void createHBObj(STscObj* pObj) {
  if (pObj->pHb != NULL) {
    return;
  }

  SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
  if (NULL == pSql) return;

  pSql->fp = tscProcessHeartBeatRsp;

1973 1974 1975 1976 1977 1978
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetailSafely(&pSql->cmd, 0);
  if (pQueryInfo == NULL) {
    pSql->res.code = terrno;
    return;
  }

1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990
  pQueryInfo->command = TSDB_SQL_HB;

  pSql->cmd.command = pQueryInfo->command;
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
    taosTFree(pSql);
    return;
  }

  pSql->param = pObj;
  pSql->pTscObj = pObj;
  pSql->signature = pSql;
  pObj->pHb = pSql;
H
Haojun Liao 已提交
1991

1992 1993
  tscAddSubqueryInfo(&pObj->pHb->cmd);

1994 1995 1996 1997
  int64_t ad = (int64_t) pSql;
  pSql->self = taosCachePut(tscObjCache, &ad, sizeof(int64_t), &pSql, sizeof(int64_t), 2 * 60 * 1000);
  T_REF_INC(pObj);

1998 1999 2000
  tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj);
}

H
hzcheng 已提交
2001
int tscProcessConnectRsp(SSqlObj *pSql) {
H
Haojun Liao 已提交
2002
  char temp[TSDB_TABLE_FNAME_LEN * 2];
H
hzcheng 已提交
2003 2004 2005
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

2006
  SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
B
Bomin Zhang 已提交
2007
  tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId));  // copy acctId from response
2008 2009
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

B
Bomin Zhang 已提交
2010 2011
  assert(len <= sizeof(pObj->db));
  tstrncpy(pObj->db, temp, sizeof(pObj->db));
H
hjxilinx 已提交
2012
  
dengyihao's avatar
dengyihao 已提交
2013 2014 2015
  if (pConnect->epSet.numOfEps > 0) {
    tscEpSetHtons(&pConnect->epSet);
    tscUpdateMgmtEpSet(&pConnect->epSet);
dengyihao's avatar
dengyihao 已提交
2016
  } 
H
hzcheng 已提交
2017

S
slguan 已提交
2018
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
2019 2020
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
S
Shengliang Guan 已提交
2021
  pObj->connId = htonl(pConnect->connId);
2022 2023

  createHBObj(pObj);
2024
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
H
hzcheng 已提交
2025 2026 2027 2028 2029

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
2030
  STscObj *       pObj = pSql->pTscObj;
2031
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2032

B
Bomin Zhang 已提交
2033
  tstrncpy(pObj->db, pTableMetaInfo->name, sizeof(pObj->db));
H
hzcheng 已提交
2034 2035 2036
  return 0;
}

Y
TD-1039  
yihaoDeng 已提交
2037 2038
int tscProcessDropDbRsp(SSqlObj *pSql) {
  pSql->pTscObj->db[0] = 0;
H
Haojun Liao 已提交
2039
  taosCacheEmpty(tscMetaCache);
H
hzcheng 已提交
2040 2041 2042 2043
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
2044
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2045

H
Haojun Liao 已提交
2046
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
Haojun Liao 已提交
2047
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2048 2049 2050 2051 2052 2053 2054
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
2055 2056
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
2057
   */
2058
  tscDebug("%p force release table meta after drop table:%s", pSql, pTableMetaInfo->name);
H
Haojun Liao 已提交
2059
  taosCacheRelease(tscMetaCache, (void **)&pTableMeta, true);
H
hzcheng 已提交
2060

H
hjxilinx 已提交
2061
  if (pTableMetaInfo->pTableMeta) {
H
Haojun Liao 已提交
2062
    taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2063 2064 2065 2066 2067 2068
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
2069
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2070

H
Haojun Liao 已提交
2071
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2072
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2073 2074 2075
    return 0;
  }

2076
  tscDebug("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
H
Haojun Liao 已提交
2077
  taosCacheRelease(tscMetaCache, (void **)&pTableMeta, true);
H
hzcheng 已提交
2078

H
hjxilinx 已提交
2079
  if (pTableMetaInfo->pTableMeta) {
weixin_48148422's avatar
weixin_48148422 已提交
2080
    bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
H
Haojun Liao 已提交
2081
    taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2082

2083
    if (isSuperTable) {  // if it is a super table, reset whole query cache
2084
      tscDebug("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
H
Haojun Liao 已提交
2085
      taosCacheEmpty(tscMetaCache);
H
hzcheng 已提交
2086 2087 2088 2089 2090 2091 2092 2093 2094 2095
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}
2096 2097 2098
int tscProcessShowCreateRsp(SSqlObj *pSql) {
  return tscLocalResultCommonBuilder(pSql, 1);
}
H
hzcheng 已提交
2099 2100 2101 2102

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

2103
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
2104 2105 2106
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
2107
  pRes->data = NULL;
S
slguan 已提交
2108
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2109 2110 2111
  return 0;
}

H
hjxilinx 已提交
2112
int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
S
slguan 已提交
2113 2114 2115
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
2116
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
2117 2118 2119 2120
  if (pRetrieve == NULL) {
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pRes->code;
  }
H
hzcheng 已提交
2121 2122 2123

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
2124 2125
  pRes->offset    = htobe64(pRetrieve->offset);
  pRes->useconds  = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
2126
  pRes->completed = (pRetrieve->completed == 1);
2127
  pRes->data      = pRetrieve->data;
H
hjxilinx 已提交
2128
  
2129
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2130 2131 2132 2133
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
    return pRes->code;
  }
  
weixin_48148422's avatar
weixin_48148422 已提交
2134
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
2135
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2136
    
H
hjxilinx 已提交
2137
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
H
hjxilinx 已提交
2138 2139
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
2140 2141
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
2142
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
2143
    p += sizeof(int32_t);
S
slguan 已提交
2144
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
2145 2146
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
weixin_48148422's avatar
weixin_48148422 已提交
2147
      p += sizeof(int32_t); // skip tid
weixin_48148422's avatar
weixin_48148422 已提交
2148 2149
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
2150
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
2151
    }
2152 2153
  }

H
hzcheng 已提交
2154
  pRes->row = 0;
2155
  tscDebug("%p numOfRows:%" PRId64 ", offset:%" PRId64 ", complete:%d", pSql, pRes->numOfRows, pRes->offset, pRes->completed);
H
hzcheng 已提交
2156 2157 2158 2159

  return 0;
}

H
hjxilinx 已提交
2160
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
H
hzcheng 已提交
2161

2162
static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
S
slguan 已提交
2163 2164
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
2165
    tscError("%p malloc failed for new sqlobj to get table meta", pSql);
2166
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2167
  }
2168

H
hzcheng 已提交
2169 2170 2171
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
2172

H
Haojun Liao 已提交
2173 2174 2175 2176 2177 2178
  T_REF_INC(pNew->pTscObj);

  // TODO add test case on x86 platform
  uint64_t adr = (uint64_t) pNew;
  pNew->self = taosCachePut(tscObjCache, &adr, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2*60*1000);

2179
  tscAddSubqueryInfo(&pNew->cmd);
2180

2181
  SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0);
2182

H
hjxilinx 已提交
2183
  pNew->cmd.autoCreated = pSql->cmd.autoCreated;  // create table if not exists
B
Bomin Zhang 已提交
2184
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) {
2185
    tscError("%p malloc failed for payload to get table meta", pSql);
S
slguan 已提交
2186
    free(pNew);
2187

2188
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2189 2190
  }

H
hjxilinx 已提交
2191
  STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
2192
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
2193

B
Bomin Zhang 已提交
2194
  tstrncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, sizeof(pNewMeterMetaInfo->name));
B
Bomin Zhang 已提交
2195 2196
  memcpy(pNew->cmd.payload, pSql->cmd.payload, pSql->cmd.payloadLen);  // tag information if table does not exists.
  pNew->cmd.payloadLen = pSql->cmd.payloadLen;
2197
  tscDebug("%p new pSqlObj:%p to get tableMeta, auto create:%d", pSql, pNew, pNew->cmd.autoCreated);
H
hzcheng 已提交
2198

H
hjxilinx 已提交
2199 2200
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
H
hzcheng 已提交
2201

H
hjxilinx 已提交
2202 2203
  int32_t code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2204
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;  // notify upper application that current process need to be terminated
H
hzcheng 已提交
2205 2206 2207 2208 2209
  }

  return code;
}

H
hjxilinx 已提交
2210
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
H
hjxilinx 已提交
2211
  assert(strlen(pTableMetaInfo->name) != 0);
S
slguan 已提交
2212

2213
  // If this STableMetaInfo owns a table meta, release it first
H
hjxilinx 已提交
2214
  if (pTableMetaInfo->pTableMeta != NULL) {
H
Haojun Liao 已提交
2215
    taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), false);
2216 2217
  }
  
H
Haojun Liao 已提交
2218
  pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2219
  if (pTableMetaInfo->pTableMeta != NULL) {
H
hjxilinx 已提交
2220
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
2221
    tscDebug("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
2222
             tinfo.numOfTags, pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2223 2224 2225

    return TSDB_CODE_SUCCESS;
  }
2226 2227
  
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2228 2229
}

H
hjxilinx 已提交
2230
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
H
hjxilinx 已提交
2231
  pSql->cmd.autoCreated = createIfNotExists;
H
hjxilinx 已提交
2232
  return tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
2233 2234 2235
}

/**
H
Haojun Liao 已提交
2236
 * retrieve table meta from mnode, and update the local table meta cache.
H
hzcheng 已提交
2237
 * @param pSql          sql object
B
Bomin Zhang 已提交
2238
 * @param tableIndex    table index
H
hzcheng 已提交
2239 2240
 * @return              status code
 */
B
Bomin Zhang 已提交
2241
int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
H
hzcheng 已提交
2242
  SSqlCmd *pCmd = &pSql->cmd;
2243 2244

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
B
Bomin Zhang 已提交
2245
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
H
hzcheng 已提交
2246

H
Haojun Liao 已提交
2247 2248
  STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
  if (pTableMetaInfo->pTableMeta) {
2249
    tscDebug("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
2250
             tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid, pTableMeta);
H
hzcheng 已提交
2251 2252
  }

H
Haojun Liao 已提交
2253
  taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
H
Haojun Liao 已提交
2254
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2255 2256
}

H
hjxilinx 已提交
2257
static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
2258
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2259
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2260
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2261 2262
    if (pTableMetaInfo->vgroupList == NULL) {
      return false;
S
slguan 已提交
2263 2264
    }
  }
H
hjxilinx 已提交
2265 2266 2267 2268
  
  // all super tables vgroupinfo are retrieved, no need to retrieve vgroup info anymore
  return true;
}
H
hzcheng 已提交
2269

H
hjxilinx 已提交
2270
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
2271
  int      code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hjxilinx 已提交
2272 2273 2274
  SSqlCmd *pCmd = &pSql->cmd;
  
  if (allVgroupInfoRetrieved(pCmd, clauseIndex)) {
H
hzcheng 已提交
2275 2276
    return TSDB_CODE_SUCCESS;
  }
H
hjxilinx 已提交
2277

S
slguan 已提交
2278
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
2279 2280 2281
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

H
hjxilinx 已提交
2282
  pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
H
Haojun Liao 已提交
2283 2284

  // TODO TEST IT
2285 2286
  SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0);
  if (pNewQueryInfo == NULL) {
B
Bomin Zhang 已提交
2287
    tscFreeSqlObj(pNew);
2288 2289
    return code;
  }
2290
  
H
hjxilinx 已提交
2291
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2292
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2293
    STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
H
Haojun Liao 已提交
2294
    STableMeta *pTableMeta = taosCacheAcquireByData(tscMetaCache, pMInfo->pTableMeta);
H
hjxilinx 已提交
2295
    tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList);
S
slguan 已提交
2296 2297 2298 2299 2300 2301
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
2302

2303
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
H
Haojun Liao 已提交
2304
  T_REF_INC(pNew->pTscObj);
H
Haojun Liao 已提交
2305 2306 2307

  uint64_t p = (uint64_t) pNew;
  pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000);
2308
  tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
H
hzcheng 已提交
2309

2310 2311 2312 2313
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
  code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
2314
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2315 2316 2317 2318 2319
  }

  return code;
}

2320
void tscInitMsgsFp() {
S
slguan 已提交
2321 2322
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
J
jtao1735 已提交
2323
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;
H
hzcheng 已提交
2324 2325

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
2326
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
2327

2328 2329
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
2330 2331

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
S
[TD-16]  
slguan 已提交
2332
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserMsg;
H
hzcheng 已提交
2333 2334 2335
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
2336
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
2337 2338 2339
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
2340
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
2341
  tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
H
hzcheng 已提交
2342 2343 2344 2345
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
H
hjxilinx 已提交
2346
  tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
H
hjxilinx 已提交
2347
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
S
slguan 已提交
2348
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
2349 2350 2351 2352

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
2353 2354 2355
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
2356 2357

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
H
hjxilinx 已提交
2358
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
H
hzcheng 已提交
2359 2360 2361 2362 2363

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
2364
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
H
hjxilinx 已提交
2365
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
S
slguan 已提交
2366
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
2367 2368

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
H
hjxilinx 已提交
2369
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode;  // rsp handled by same function.
H
hzcheng 已提交
2370
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
2371

H
Haojun Liao 已提交
2372 2373 2374 2375 2376
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB]   = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION]  = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS]  = tscProcessLocalRetrieveRsp;
2377

H
hzcheng 已提交
2378 2379
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

H
hjxilinx 已提交
2380
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp;
H
hzcheng 已提交
2381 2382 2383 2384

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

2385 2386 2387 2388
  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp;
  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp;
  

H
hzcheng 已提交
2389 2390 2391 2392 2393 2394
  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;
}