tscServer.c 80.4 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17
#include "tcache.h"
H
Haojun Liao 已提交
18
#include "tcmdtype.h"
H
hzcheng 已提交
19
#include "trpc.h"
20 21
#include "tscLocalMerge.h"
#include "tscLog.h"
H
hzcheng 已提交
22 23 24 25 26
#include "tscProfile.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "ttimer.h"
dengyihao's avatar
dengyihao 已提交
27
#include "tlockfree.h"
H
hzcheng 已提交
28

29
SRpcCorEpSet  tscMgmtEpSet;
S
slguan 已提交
30

31 32
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
33 34 35
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
36

B
Bomin Zhang 已提交
37
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt);
38 39
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
40

S
slguan 已提交
41
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
Haojun Liao 已提交
42 43 44 45 46 47 48 49
static int32_t getWaitingTimeInterval(int32_t count) {
  int32_t initial = 100; // 100 ms by default
  if (count <= 1) {
    return 0;
  }

  return initial * (2<<(count - 2));
}
H
hzcheng 已提交
50

51
static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
52 53
  assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);

54
  SRpcEpSet* pEpSet = &pSql->epSet;
H
Haojun Liao 已提交
55 56 57 58

  // Issue the query to one of the vnode among a vgroup randomly.
  // change the inUse property would not affect the isUse attribute of STableMeta
  pEpSet->inUse = rand() % pVgroupInfo->numOfEps;
59 60 61

  // apply the FQDN string length check here
  bool hasFqdn = false;
dengyihao's avatar
bugfix  
dengyihao 已提交
62

63 64 65 66
  pEpSet->numOfEps = pVgroupInfo->numOfEps;
  for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
    strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn);
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
67 68 69 70

    if (!hasFqdn) {
      hasFqdn = (strlen(pEpSet->fqdn[i]) > 0);
    }
71
  }
72 73

  assert(hasFqdn);
74
}
75

dengyihao's avatar
dengyihao 已提交
76 77 78 79
static void tscDumpMgmtEpSet(SRpcEpSet *epSet) {
  taosCorBeginRead(&tscMgmtEpSet.version);
  *epSet = tscMgmtEpSet.epSet;
  taosCorEndRead(&tscMgmtEpSet.version);
80
}  
dengyihao's avatar
dengyihao 已提交
81 82
static void tscEpSetHtons(SRpcEpSet *s) {
   for (int32_t i = 0; i < s->numOfEps; i++) {
dengyihao's avatar
fixbug  
dengyihao 已提交
83 84 85
      s->port[i] = htons(s->port[i]);    
   }
} 
dengyihao's avatar
dengyihao 已提交
86 87
bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
   if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
88 89
     return false;
   } 
dengyihao's avatar
dengyihao 已提交
90
   for (int32_t i = 0; i < s1->numOfEps; i++) {
91 92 93 94 95
     if (s1->port[i] != s2->port[i] 
        || strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0)
        return false;
   }
   return true;
S
slguan 已提交
96
}
dengyihao's avatar
dengyihao 已提交
97
void tscUpdateMgmtEpSet(SRpcEpSet *pEpSet) {
98
  // no need to update if equal
dengyihao's avatar
dengyihao 已提交
99 100 101
  taosCorBeginWrite(&tscMgmtEpSet.version);
  tscMgmtEpSet.epSet = *pEpSet;
  taosCorEndWrite(&tscMgmtEpSet.version);
102
}
dengyihao's avatar
dengyihao 已提交
103
static void tscDumpEpSetFromVgroupInfo(SCMCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
104
  if (pVgroupInfo == NULL) { return;}
dengyihao's avatar
bugfix  
dengyihao 已提交
105
  taosCorBeginRead(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
106
  int8_t inUse = pVgroupInfo->inUse;
dengyihao's avatar
dengyihao 已提交
107 108 109
  pEpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0; 
  pEpSet->numOfEps = pVgroupInfo->numOfEps;  
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
Y
yihaoDeng 已提交
110
    tstrncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, sizeof(pEpSet->fqdn[i]));
dengyihao's avatar
dengyihao 已提交
111
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
S
slguan 已提交
112
  }
dengyihao's avatar
bugfix  
dengyihao 已提交
113
  taosCorEndRead(&pVgroupInfo->version);
S
slguan 已提交
114 115
}

dengyihao's avatar
dengyihao 已提交
116
static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
117 118
  SSqlCmd *pCmd = &pObj->cmd;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
dengyihao's avatar
dengyihao 已提交
119
  if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;}
dengyihao's avatar
bugfix  
dengyihao 已提交
120
  SCMCorVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->corVgroupInfo;
dengyihao's avatar
dengyihao 已提交
121

dengyihao's avatar
bugfix  
dengyihao 已提交
122
  taosCorBeginWrite(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
123
  tscDebug("before: Endpoint in use: %d", pVgroupInfo->inUse);
124 125
  pVgroupInfo->inUse = pEpSet->inUse;
  pVgroupInfo->numOfEps = pEpSet->numOfEps;
dengyihao's avatar
dengyihao 已提交
126
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; i++) {
H
Haojun Liao 已提交
127 128
    taosTFree(pVgroupInfo->epAddr[i].fqdn);
    pVgroupInfo->epAddr[i].fqdn = strndup(pEpSet->fqdn[i], tListLen(pEpSet->fqdn[i]));
129
    pVgroupInfo->epAddr[i].port = pEpSet->port[i];
dengyihao's avatar
dengyihao 已提交
130
  }
H
Haojun Liao 已提交
131

dengyihao's avatar
dengyihao 已提交
132
  tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse);
dengyihao's avatar
bugfix  
dengyihao 已提交
133
  taosCorEndWrite(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
134
}
H
Haojun Liao 已提交
135

136 137 138 139
void tscPrintMgmtEp() {
  SRpcEpSet dump;
  tscDumpMgmtEpSet(&dump);
  if (dump.numOfEps <= 0) {
dengyihao's avatar
dengyihao 已提交
140
    tscError("invalid mnode EP list:%d", dump.numOfEps);
S
slguan 已提交
141
  } else {
142
    for (int i = 0; i < dump.numOfEps; ++i) {
143
      tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]);
S
slguan 已提交
144
    }
S
slguan 已提交
145
  }
S
slguan 已提交
146 147
}

H
hzcheng 已提交
148 149 150
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
H
Haojun Liao 已提交
151

H
hzcheng 已提交
152 153 154 155 156
  if (pObj != pObj->signature) {
    tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
    return;
  }

H
Haojun Liao 已提交
157
  SSqlObj *pSql = tres;
H
hzcheng 已提交
158 159 160
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
161
    SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
dengyihao's avatar
dengyihao 已提交
162 163 164 165
    SRpcEpSet *      epSet = &pRsp->epSet;
    if (epSet->numOfEps > 0) {
      tscEpSetHtons(epSet);
      tscUpdateMgmtEpSet(epSet);
dengyihao's avatar
dengyihao 已提交
166
    } 
S
slguan 已提交
167

S
Shengliang Guan 已提交
168 169
    pSql->pTscObj->connId = htonl(pRsp->connId);

H
hzcheng 已提交
170 171
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
172
      return;
H
hzcheng 已提交
173
    } else {
S
slguan 已提交
174 175
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
176 177
    }
  } else {
H
Haojun Liao 已提交
178
    tscDebug("heartbeat failed, code:%s", tstrerror(code));
H
hzcheng 已提交
179 180
  }

H
Haojun Liao 已提交
181
  if (pObj->pHb != NULL) {
182
    int32_t waitingDuring = tsShellActivityTimer * 500;
183
    tscDebug("%p start heartbeat in %dms", pSql, waitingDuring);
H
Haojun Liao 已提交
184 185 186 187 188

    taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer);
  } else {
    tscDebug("%p start to close tscObj:%p, not send heartbeat again", pSql, pObj);
  }
H
hzcheng 已提交
189 190 191 192
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;
193 194 195
  if (pObj == NULL || pObj->signature != pObj) {
    return;
  }
H
hzcheng 已提交
196

197 198 199
  SSqlObj* pHB = pObj->pHb;
  if (pObj->pTimer != tmrId || pHB == NULL) {
    return;
H
hzcheng 已提交
200 201
  }

H
Haojun Liao 已提交
202
  void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE));
H
Haojun Liao 已提交
203 204 205 206 207 208 209 210 211 212 213 214
  if (p == NULL) {
    tscWarn("%p HB object has been released already", pHB);
    return;
  }

  assert(*pHB->self == pHB);

  int32_t code = tscProcessSql(pHB);
  taosCacheRelease(tscObjCache, (void**) &p, false);

  if (code != TSDB_CODE_SUCCESS) {
    tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
H
hzcheng 已提交
215 216 217 218
  }
}

int tscSendMsgToServer(SSqlObj *pSql) {
219
  STscObj* pObj = pSql->pTscObj;
H
hjxilinx 已提交
220 221 222
  SSqlCmd* pCmd = &pSql->cmd;
  
  char *pMsg = rpcMallocCont(pCmd->payloadLen);
S
slguan 已提交
223
  if (NULL == pMsg) {
224
    tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]);
225
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
226 227
  }

228 229
  // set the mgmt ip list
  if (pSql->cmd.command >= TSDB_SQL_MGMT) {
dengyihao's avatar
dengyihao 已提交
230
    tscDumpMgmtEpSet(&pSql->epSet);
J
jtao1735 已提交
231 232
  }

233
  memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
234

J
jtao1735 已提交
235
  SRpcMsg rpcMsg = {
236 237 238
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
      .contLen = pSql->cmd.payloadLen,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
239 240
      .ahandle = pSql,
      .handle  = &pSql->pRpcCtx,
H
hjxilinx 已提交
241
      .code    = 0
J
jtao1735 已提交
242
  };
H
Haojun Liao 已提交
243

H
Haojun Liao 已提交
244 245 246 247
  // NOTE: the rpc context should be acquired before sending data to server.
  // Otherwise, the pSql object may have been released already during the response function, which is
  // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
  // cause crash.
H
Haojun Liao 已提交
248 249
  rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg);
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
250 251
}

252
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
S
TD-1530  
Shengliang Guan 已提交
253 254
  TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
  void** p = taosCacheAcquireByKey(tscObjCache, &handle, sizeof(TSDB_CACHE_PTR_TYPE));
H
Haojun Liao 已提交
255 256
  if (p == NULL) {
    rpcFreeCont(rpcMsg->pCont);
257 258
    return;
  }
259

H
Haojun Liao 已提交
260 261 262
  SSqlObj* pSql = *p;
  assert(pSql != NULL);

H
Haojun Liao 已提交
263
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
264 265
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
266

H
Haojun Liao 已提交
267
  assert(*pSql->self == pSql);
H
Haojun Liao 已提交
268
  pSql->pRpcCtx = NULL;
H
Haojun Liao 已提交
269

H
Haojun Liao 已提交
270
  if (pObj->signature != pObj) {
271
    tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
H
Haojun Liao 已提交
272

H
Haojun Liao 已提交
273
    taosCacheRelease(tscObjCache, (void**) &p, true);
H
Haojun Liao 已提交
274 275 276 277 278 279
    rpcFreeCont(rpcMsg->pCont);
    return;
  }

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
280
    tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
281
        pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
H
Haojun Liao 已提交
282

H
Haojun Liao 已提交
283 284 285
    void** p1 = p;
    taosCacheRelease(tscObjCache, (void**) &p1, false);

H
Haojun Liao 已提交
286
    taosCacheRelease(tscObjCache, (void**) &p, true);
287
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
288
    return;
H
hzcheng 已提交
289 290
  }

H
Haojun Liao 已提交
291
  if (pEpSet) {
dengyihao's avatar
dengyihao 已提交
292
    if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
H
Haojun Liao 已提交
293 294
      if (pCmd->command < TSDB_SQL_MGMT) {
        tscUpdateVgroupInfo(pSql, pEpSet);
dengyihao's avatar
dengyihao 已提交
295
      } else {
dengyihao's avatar
dengyihao 已提交
296
        tscUpdateMgmtEpSet(pEpSet);
H
Haojun Liao 已提交
297
      }
dengyihao's avatar
dengyihao 已提交
298
    }
J
jtao1735 已提交
299 300
  }

301 302 303 304 305
  int32_t cmd = pCmd->command;
  if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
      (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
       rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
       rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL ||
S
Shengliang Guan 已提交
306
       rpcMsg->code == TSDB_CODE_APP_NOT_READY ||
307 308 309 310 311 312 313
       rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) {
    tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);

    // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
    if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
      pSql->cmd.submitSchema = 1;
    }
314

315 316 317 318
    pSql->res.code = rpcMsg->code;  // keep the previous error code
    if (pSql->retry > pSql->maxRetry) {
      tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
    } else {
H
Haojun Liao 已提交
319
      // wait for a little bit moment and then retry, todo do not sleep in rpc callback thread
H
Haojun Liao 已提交
320 321 322 323 324
      if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
        int32_t duration = getWaitingTimeInterval(pSql->retry);
        taosMsleep(duration);
      }

B
Bomin Zhang 已提交
325
      rpcMsg->code = tscRenewTableMeta(pSql, 0);
326 327 328

      // if there is an error occurring, proceed to the following error handling procedure.
      if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
H
Haojun Liao 已提交
329
        taosCacheRelease(tscObjCache, (void**) &p, false);
330 331
        rpcFreeCont(rpcMsg->pCont);
        return;
H
hzcheng 已提交
332 333
      }
    }
S
slguan 已提交
334
  }
335

H
hzcheng 已提交
336
  pRes->rspLen = 0;
337
  
H
Haojun Liao 已提交
338
  if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
339
    tscDebug("%p query is cancelled, code:%s", pSql, tstrerror(pRes->code));
H
Haojun Liao 已提交
340 341
  } else {
    pRes->code = rpcMsg->code;
H
hzcheng 已提交
342 343
  }

S
slguan 已提交
344
  if (pRes->code == TSDB_CODE_SUCCESS) {
345
    tscDebug("%p reset retry counter to be 0 due to success rsp, old:%d", pSql, pSql->retry);
S
slguan 已提交
346 347 348
    pSql->retry = 0;
  }

349
  if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
350
    assert(rpcMsg->msgType == pCmd->msgType + 1);
351
    pRes->code    = rpcMsg->code;
352
    pRes->rspType = rpcMsg->msgType;
353
    pRes->rspLen  = rpcMsg->contLen;
H
hzcheng 已提交
354

355
    if (pRes->rspLen > 0 && rpcMsg->pCont) {
356 357
      char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
      if (tmp == NULL) {
358
        pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
359 360
      } else {
        pRes->pRsp = tmp;
361
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
362
      }
363 364
    } else {
      pRes->pRsp = NULL;
S
slguan 已提交
365 366
    }

H
hzcheng 已提交
367 368 369 370
    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
371
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
372
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
373 374 375 376 377 378 379
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
380
      tscDebug("%p SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql, sqlCmd[pCmd->command], 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
381
          tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
S
slguan 已提交
382
    } else {
383
      tscDebug("%p SQL cmd:%s, code:%s rspLen:%d", pSql, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
H
hzcheng 已提交
384 385
    }
  }
386
  
H
Haojun Liao 已提交
387
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
388
    rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
Haojun Liao 已提交
389
  }
S
Shengliang Guan 已提交
390

H
Haojun Liao 已提交
391
  bool shouldFree = tscShouldBeFreed(pSql);
392
  if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
S
TD-1057  
Shengliang Guan 已提交
393
    rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
H
Haojun Liao 已提交
394
    (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
H
hzcheng 已提交
395 396
  }

H
Haojun Liao 已提交
397
  void** p1 = p;
H
Haojun Liao 已提交
398
  taosCacheRelease(tscObjCache, (void**) &p1, false);
H
Haojun Liao 已提交
399

H
Haojun Liao 已提交
400 401
  if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it
    taosCacheRelease(tscObjCache, (void **)&p, true);
H
Haojun Liao 已提交
402 403 404
    tscDebug("%p sqlObj is automatically freed", pSql);
  }

405
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
406 407
}

S
slguan 已提交
408 409 410
int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
411

H
hjxilinx 已提交
412 413 414 415 416 417 418
  if (pCmd->command == TSDB_SQL_SELECT ||
      pCmd->command == TSDB_SQL_FETCH ||
      pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT ||
      pCmd->command == TSDB_SQL_CONNECT ||
      pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META ||
H
hjxilinx 已提交
419
      pCmd->command == TSDB_SQL_STABLEVGROUP) {
420 421 422 423 424 425
    pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
  }
  
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return pRes->code;
S
slguan 已提交
426
  }
427

428 429 430
  int32_t code = tscSendMsgToServer(pSql);

  // NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads.
431
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
432
    pRes->code = code;
H
hjxilinx 已提交
433
    tscQueueAsyncRes(pSql);
H
Haojun Liao 已提交
434
    return code;
S
slguan 已提交
435
  }
H
hjxilinx 已提交
436 437
  
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
438 439 440
}

int tscProcessSql(SSqlObj *pSql) {
441
  char    *name = NULL;
442
  SSqlCmd *pCmd = &pSql->cmd;
443
  
444
  SQueryInfo     *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
445
  STableMetaInfo *pTableMetaInfo = NULL;
446
  uint32_t        type = 0;
447

448
  if (pQueryInfo != NULL) {
H
hjxilinx 已提交
449
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
450
    name = (pTableMetaInfo != NULL)? pTableMetaInfo->name:NULL;
451
    type = pQueryInfo->type;
452

H
hjxilinx 已提交
453
    // while numOfTables equals to 0, it must be Heartbeat
H
hjxilinx 已提交
454
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
455
  }
456

457
  tscDebug("%p SQL cmd:%s will be processed, name:%s, type:%d", pSql, sqlCmd[pCmd->command], name, type);
H
hjxilinx 已提交
458
  if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL
H
hjxilinx 已提交
459
    if (pTableMetaInfo == NULL) {
460
      pSql->res.code = TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
461 462
      return pSql->res.code;
    }
H
hjxilinx 已提交
463
  } else if (pCmd->command < TSDB_SQL_LOCAL) {
464
    //pSql->epSet = tscMgmtEpSet;
H
hzcheng 已提交
465 466 467
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }
468
  
S
slguan 已提交
469 470
  return doProcessSql(pSql);
}
H
hzcheng 已提交
471

J
jtao1735 已提交
472
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
473
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
S
slguan 已提交
474
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
S
slguan 已提交
475

H
Haojun Liao 已提交
476
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
S
slguan 已提交
477
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
478

479
  // todo valid the vgroupId at the client side
H
hjxilinx 已提交
480 481
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  
weixin_48148422's avatar
weixin_48148422 已提交
482
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
483
    int32_t vgIndex = pTableMetaInfo->vgroupIndex;
484
    
H
hjxilinx 已提交
485
    SVgroupsInfo* pVgroupInfo = pTableMetaInfo->vgroupList;
H
Haojun Liao 已提交
486 487
    assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);

H
hjxilinx 已提交
488
    pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
489
  } else {
H
hjxilinx 已提交
490
    STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
491
    pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
492
  }
493 494

  pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg);
J
jtao1735 已提交
495
  pSql->cmd.msgType = TSDB_MSG_TYPE_FETCH;
496 497 498

  pRetrieveMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));

499
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
500 501
}

502
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hjxilinx 已提交
503
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
504
  STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
H
hjxilinx 已提交
505
  
506
  char* pMsg = pSql->cmd.payload;
507 508 509
  
  // NOTE: shell message size should not include SMsgDesc
  int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
510 511
  int32_t vgId = pTableMeta->vgroupInfo.vgId;

512
  SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
513 514
  pMsgDesc->numOfVnodes = htonl(1); // always one vnode

515
  pMsg += sizeof(SMsgDesc);
516
  SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
517

H
hjxilinx 已提交
518
  pShellMsg->header.vgId = htonl(vgId);
H
Haojun Liao 已提交
519
  pShellMsg->header.contLen = htonl(size);      // the length not includes the size of SMsgDesc
520
  pShellMsg->length = pShellMsg->header.contLen;
H
hjxilinx 已提交
521
  
H
Haojun Liao 已提交
522
  pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit);  // number of tables to be inserted
H
hzcheng 已提交
523

H
hjxilinx 已提交
524
  // pSql->cmd.payloadLen is set during copying data into payload
S
slguan 已提交
525
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
dengyihao's avatar
dengyihao 已提交
526
  tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet);
527

528 529
  tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
      pSql->epSet.numOfEps);
530
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
531 532 533
}

/*
534
 * for table query, simply return the size <= 1k
H
hzcheng 已提交
535
 */
536
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
537
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
538
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
539

S
TD-1057  
Shengliang Guan 已提交
540
  int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo));
H
hjxilinx 已提交
541 542
  
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
S
TD-1057  
Shengliang Guan 已提交
543
  int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs);
H
hjxilinx 已提交
544
  
545
  return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + 4096;
H
hzcheng 已提交
546 547
}

548
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
549
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
B
Bomin Zhang 已提交
550
  TSKEY dfltKey = htobe64(pQueryMsg->window.skey);
551

H
hjxilinx 已提交
552
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
weixin_48148422's avatar
weixin_48148422 已提交
553
  if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
554 555
    
    SCMVgroupInfo* pVgroupInfo = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
556
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
557 558
      int32_t index = pTableMetaInfo->vgroupIndex;
      assert(index >= 0);
H
hjxilinx 已提交
559
  
B
Bomin Zhang 已提交
560
      if (pTableMetaInfo->vgroupList->numOfVgroups > 0) {
H
Haojun Liao 已提交
561
        assert(index < pTableMetaInfo->vgroupList->numOfVgroups);
B
Bomin Zhang 已提交
562 563
        pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
      }
564
      tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups);
H
hjxilinx 已提交
565 566
    } else {
      pVgroupInfo = &pTableMeta->vgroupInfo;
567
    }
weixin_48148422's avatar
weixin_48148422 已提交
568

569 570 571 572
    assert(pVgroupInfo != NULL);

    tscSetDnodeEpSet(pSql, pVgroupInfo);
    pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
weixin_48148422's avatar
weixin_48148422 已提交
573

574
    STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
575 576 577
    pTableIdInfo->tid = htonl(pTableMeta->id.tid);
    pTableIdInfo->uid = htobe64(pTableMeta->id.uid);
    pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->id.uid, dfltKey));
weixin_48148422's avatar
weixin_48148422 已提交
578

579 580
    pQueryMsg->numOfTables = htonl(1);  // set the number of tables
    pMsg += sizeof(STableIdInfo);
581
  } else { // it is a subquery of the super table query, this EP info is acquired from vgroupInfo
582
    int32_t index = pTableMetaInfo->vgroupIndex;
S
TD-1057  
Shengliang Guan 已提交
583
    int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
584
    assert(index >= 0 && index < numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
585

586
    tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
587

588
    SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
589

dengyihao's avatar
bugfix  
dengyihao 已提交
590
    // set the vgroup info 
591
    tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo);
592 593
    pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
    
S
TD-1057  
Shengliang Guan 已提交
594
    int32_t numOfTables = (int32_t)taosArrayGetSize(pTableIdList->itemList);
595 596 597 598 599 600 601 602 603
    pQueryMsg->numOfTables = htonl(numOfTables);  // set the number of tables
  
    // serialize each table id info
    for(int32_t i = 0; i < numOfTables; ++i) {
      STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i);
      
      STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
      pTableIdInfo->tid = htonl(pItem->tid);
      pTableIdInfo->uid = htobe64(pItem->uid);
B
Bomin Zhang 已提交
604
      pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pItem->uid, dfltKey));
605 606 607 608
      pMsg += sizeof(STableIdInfo);
    }
  }
  
609
  tscDebug("%p vgId:%d, query on table:%s, tid:%d, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name,
610
      pTableMeta->id.tid, pTableMeta->id.uid);
H
hjxilinx 已提交
611
  
612 613 614
  return pMsg;
}

615
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
616 617
  SSqlCmd *pCmd = &pSql->cmd;

618
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
619

S
slguan 已提交
620 621
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
H
Haojun Liao 已提交
622
    return TSDB_CODE_TSC_INVALID_SQL;  // todo add test for this
S
slguan 已提交
623
  }
624
  
625
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
626
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
627
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
628 629 630

  size_t numOfSrcCols = taosArrayGetSize(pQueryInfo->colList);
  if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo)) {
S
Shengliang Guan 已提交
631
    tscError("%p illegal value of numOfCols in query msg: %" PRIu64 ", table cols:%d", pSql, (uint64_t)numOfSrcCols,
632 633
        tscGetNumOfColumns(pTableMeta));

H
Haojun Liao 已提交
634
    return TSDB_CODE_TSC_INVALID_SQL;
635
  }
636
  
637
  if (pQueryInfo->interval.interval < 0) {
S
TD-1530  
Shengliang Guan 已提交
638
    tscError("%p illegal value of aggregation time interval in query msg: %" PRId64, pSql, (int64_t)pQueryInfo->interval.interval);
H
Haojun Liao 已提交
639
    return TSDB_CODE_TSC_INVALID_SQL;
640 641 642 643
  }
  
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
H
Haojun Liao 已提交
644
    return TSDB_CODE_TSC_INVALID_SQL;
645
  }
646

647
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
H
hzcheng 已提交
648

S
TD-1057  
Shengliang Guan 已提交
649
  int32_t numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList);
650
  
651
  if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
H
hjxilinx 已提交
652 653
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey);
H
hzcheng 已提交
654
  } else {
H
hjxilinx 已提交
655 656
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.ekey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey);
H
hzcheng 已提交
657 658
  }

659 660
  pQueryMsg->order          = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId     = htons(pQueryInfo->order.orderColId);
H
Haojun Liao 已提交
661
  pQueryMsg->fillType       = htons(pQueryInfo->fillType);
662 663
  pQueryMsg->limit          = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset         = htobe64(pQueryInfo->limit.offset);
S
TD-1057  
Shengliang Guan 已提交
664
  pQueryMsg->numOfCols      = htons((int16_t)taosArrayGetSize(pQueryInfo->colList));
665 666 667 668 669 670
  pQueryMsg->interval.interval   = htobe64(pQueryInfo->interval.interval);
  pQueryMsg->interval.sliding   = htobe64(pQueryInfo->interval.sliding);
  pQueryMsg->interval.offset   = htobe64(pQueryInfo->interval.offset);
  pQueryMsg->interval.intervalUnit = pQueryInfo->interval.intervalUnit;
  pQueryMsg->interval.slidingUnit = pQueryInfo->interval.slidingUnit;
  pQueryMsg->interval.offsetUnit = pQueryInfo->interval.offsetUnit;
671
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
672
  pQueryMsg->numOfTags      = htonl(numOfTags);
weixin_48148422's avatar
weixin_48148422 已提交
673
  pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
H
Haojun Liao 已提交
674
  pQueryMsg->queryType      = htonl(pQueryInfo->type);
H
hjxilinx 已提交
675 676
  
  size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
S
TD-1057  
Shengliang Guan 已提交
677
  pQueryMsg->numOfOutput = htons((int16_t)numOfOutput);
H
hzcheng 已提交
678 679

  // set column list ids
680 681
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
H
hjxilinx 已提交
682
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
683
  
684 685 686
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
    SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
687

688
    if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
689
        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
690 691
      tscError("%p tid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
          pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
692 693
               pColSchema->name);

694
      return TSDB_CODE_TSC_INVALID_SQL;
695
    }
H
hzcheng 已提交
696 697 698

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
699
    pQueryMsg->colList[i].type  = htons(pColSchema->type);
S
slguan 已提交
700
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
701

S
slguan 已提交
702 703 704
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
705

S
slguan 已提交
706
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
707
      pFilterMsg->filterstr = htons(pColFilter->filterstr);
S
slguan 已提交
708 709

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
710

711
      if (pColFilter->filterstr) {
S
slguan 已提交
712
        pFilterMsg->len = htobe64(pColFilter->len);
713
        memcpy(pMsg, (void *)pColFilter->pz, (size_t)(pColFilter->len + 1));
S
slguan 已提交
714 715 716 717 718 719 720 721
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
722

S
slguan 已提交
723 724
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
H
Haojun Liao 已提交
725
        return TSDB_CODE_TSC_INVALID_SQL;
S
slguan 已提交
726 727
      }
    }
H
hzcheng 已提交
728 729
  }

H
hjxilinx 已提交
730
  SSqlFuncMsg *pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hjxilinx 已提交
731
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
732
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
733

734
    if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
H
hzcheng 已提交
735
      tscError("%p table schema is not matched with parsed sql", pSql);
H
Haojun Liao 已提交
736
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
737 738
    }

739 740 741
    pSqlFuncExpr->colInfo.colId    = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
    pSqlFuncExpr->colInfo.flag     = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
742

743
    pSqlFuncExpr->functionId  = htons(pExpr->functionId);
H
hzcheng 已提交
744
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
H
hjxilinx 已提交
745
    pMsg += sizeof(SSqlFuncMsg);
H
hzcheng 已提交
746 747

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
H
Haojun Liao 已提交
748
      // todo add log
H
hzcheng 已提交
749 750 751 752 753
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
754
        pMsg += pExpr->param[j].nLen;
H
hzcheng 已提交
755 756 757 758 759
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

H
hjxilinx 已提交
760
    pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hzcheng 已提交
761
  }
762
  
763
  // serialize the table info (sid, uid, tags)
764 765
  pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
  
766
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
767
  if (pGroupbyExpr->numOfGroupCols > 0) {
S
slguan 已提交
768
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
769 770
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
771
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
772 773
      SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
  
S
slguan 已提交
774 775 776
      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

777 778
      *((int16_t *)pMsg) += pCol->colIndex;
      pMsg += sizeof(pCol->colIndex);
S
slguan 已提交
779 780 781

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
782 783 784
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
785 786 787
    }
  }

788
  if (pQueryInfo->fillType != TSDB_FILL_NONE) {
H
hjxilinx 已提交
789
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
790 791
      *((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]);
      pMsg += sizeof(pQueryInfo->fillVal[0]);
S
slguan 已提交
792 793
    }
  }
794 795 796 797 798 799 800 801 802
  
  if (numOfTags != 0) {
    int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
    int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
    int32_t total = numOfTagColumns + numOfColumns;
    
    pSchema = tscGetTableTagSchema(pTableMeta);
    
    for (int32_t i = 0; i < numOfTags; ++i) {
H
hjxilinx 已提交
803
      SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
804 805 806 807
      SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];

      if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
          (pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
808 809
        tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
                 pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, total, numOfTagColumns,
B
Bomin Zhang 已提交
810
                 pCol->colIndex.columnIndex, pColSchema->name);
811

812
        return TSDB_CODE_TSC_INVALID_SQL;
813 814 815 816 817 818 819 820 821 822 823 824
      }
  
      SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
  
      pTagCol->colId = htons(pColSchema->colId);
      pTagCol->bytes = htons(pColSchema->bytes);
      pTagCol->type  = htons(pColSchema->type);
      pTagCol->numOfFilters = 0;
      
      pMsg += sizeof(SColumnInfo);
    }
  }
S
slguan 已提交
825

H
Haojun Liao 已提交
826 827 828 829
  // serialize tag column query condition
  if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
    STagCond* pTagCond = &pQueryInfo->tagCond;
    
830
    SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid);
H
Haojun Liao 已提交
831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846
    if (pCond != NULL && pCond->cond != NULL) {
      pQueryMsg->tagCondLen = htons(pCond->len);
      memcpy(pMsg, pCond->cond, pCond->len);
      
      pMsg += pCond->len;
    }
  }
  
  if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
    *pMsg = 0;
    pMsg++;
  } else {
    strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
    pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
  }

S
slguan 已提交
847
  // compressed ts block
S
TD-1057  
Shengliang Guan 已提交
848
  pQueryMsg->tsOffset = htonl((int32_t)(pMsg - pCmd->payload));
S
slguan 已提交
849 850 851
  int32_t tsLen = 0;
  int32_t numOfBlocks = 0;

852
  if (pQueryInfo->tsBuf != NULL) {
H
hjxilinx 已提交
853
    STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vgroupIndex);
854
    assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL);  // this query should not be sent
S
slguan 已提交
855 856

    // todo refactor
B
Bomin Zhang 已提交
857 858 859 860 861
    if (fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET) != 0) {
      int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
      tscError("%p: fseek failed: %s", pSql, tstrerror(code));
      return code;
    }
H
Haojun Liao 已提交
862 863 864

    size_t s = fread(pMsg, 1, pBlockInfo->compLen, pQueryInfo->tsBuf->f);
    if (s != pBlockInfo->compLen) {
B
Bomin Zhang 已提交
865 866 867 868
      int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
      tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
      return code;
    }
S
slguan 已提交
869 870 871 872

    pMsg += pBlockInfo->compLen;
    tsLen = pBlockInfo->compLen;
    numOfBlocks = pBlockInfo->numOfBlocks;
H
hzcheng 已提交
873 874
  }

S
slguan 已提交
875 876
  pQueryMsg->tsLen = htonl(tsLen);
  pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
877 878
  if (pQueryInfo->tsBuf != NULL) {
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
H
hzcheng 已提交
879 880
  }

S
TD-1057  
Shengliang Guan 已提交
881
  int32_t msgLen = (int32_t)(pMsg - pCmd->payload);
H
hzcheng 已提交
882

883
  tscDebug("%p msg built success,len:%d bytes", pSql, msgLen);
H
hzcheng 已提交
884
  pCmd->payloadLen = msgLen;
S
slguan 已提交
885
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hjxilinx 已提交
886
  
887
  pQueryMsg->head.contLen = htonl(msgLen);
S
TD-1057  
Shengliang Guan 已提交
888
  assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize);
889 890

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
891 892
}

893 894
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
895
  pCmd->payloadLen = sizeof(SCMCreateDbMsg);
S
slguan 已提交
896
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
H
hzcheng 已提交
897

898
  SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload;
899

900
  assert(pCmd->numOfClause == 1);
901
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
902
  tstrncpy(pCreateDbMsg->db, pTableMetaInfo->name, sizeof(pCreateDbMsg->db));
H
hzcheng 已提交
903

904
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
905 906
}

907 908
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
909
  pCmd->payloadLen = sizeof(SCMCreateDnodeMsg);
S
slguan 已提交
910 911
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
912
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
913
  }
H
hzcheng 已提交
914

915
  SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload;
J
jtao1735 已提交
916 917
  strncpy(pCreate->ep, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
  
S
slguan 已提交
918
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
H
hzcheng 已提交
919

920
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
921 922
}

923 924
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
925
  pCmd->payloadLen = sizeof(SCMCreateAcctMsg);
S
slguan 已提交
926 927
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
928
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
929
  }
H
hzcheng 已提交
930

931
  SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
932

H
Haojun Liao 已提交
933 934
  SStrToken *pName = &pInfo->pDCLInfo->user.user;
  SStrToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
935

936 937
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
938

939
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
940

941 942 943 944 945 946 947 948
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
949

950 951 952 953 954 955 956 957 958 959 960 961 962
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
963

S
slguan 已提交
964
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
965
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
966 967
}

968 969
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
970
  pCmd->payloadLen = sizeof(SCMCreateUserMsg);
S
slguan 已提交
971

S
slguan 已提交
972 973
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
974
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
975 976
  }

977
  SCMCreateUserMsg *pAlterMsg = (SCMCreateUserMsg*)pCmd->payload;
H
hzcheng 已提交
978

979 980
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
S
TD-1057  
Shengliang Guan 已提交
981
  pAlterMsg->flag = (int8_t)pUser->type;
H
hzcheng 已提交
982

983 984 985 986
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
987 988
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
989
  }
H
hzcheng 已提交
990

991
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
S
slguan 已提交
992
    pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
993
  } else {
S
slguan 已提交
994
    pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
995
  }
H
hzcheng 已提交
996

997
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
998 999
}

1000 1001
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1002
  pCmd->payloadLen = sizeof(SCMCfgDnodeMsg);
guanshengliang's avatar
guanshengliang 已提交
1003
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONFIG_DNODE;
1004 1005
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
1006

1007 1008
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1009
  pCmd->payloadLen = sizeof(SCMDropDbMsg);
H
hzcheng 已提交
1010

S
slguan 已提交
1011 1012
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1013
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1014 1015
  }

1016
  SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
1017

1018
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1019
  tstrncpy(pDropDbMsg->db, pTableMetaInfo->name, sizeof(pDropDbMsg->db));
1020
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1021

S
slguan 已提交
1022
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
1023
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1024 1025
}

1026 1027
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1028
  pCmd->payloadLen = sizeof(SCMDropTableMsg);
H
hzcheng 已提交
1029

S
slguan 已提交
1030 1031
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1032
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1033
  }
H
hzcheng 已提交
1034

1035
  SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
1036
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1037
  strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
S
slguan 已提交
1038
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1039

S
slguan 已提交
1040
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
1041
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1042 1043
}

1044
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1045
  SSqlCmd *pCmd = &pSql->cmd;
1046
  pCmd->payloadLen = sizeof(SCMDropDnodeMsg);
S
slguan 已提交
1047 1048
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1049
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1050
  }
H
hzcheng 已提交
1051

1052
  SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
1053
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1054
  tstrncpy(pDrop->ep, pTableMetaInfo->name, sizeof(pDrop->ep));
S
slguan 已提交
1055
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
H
hzcheng 已提交
1056

1057
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1058 1059
}

S
[TD-16]  
slguan 已提交
1060
int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1061
  SSqlCmd *pCmd = &pSql->cmd;
1062
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
S
slguan 已提交
1063
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
H
hzcheng 已提交
1064

S
slguan 已提交
1065 1066
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1067
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1068
  }
H
hzcheng 已提交
1069

1070
  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
1071
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1072
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
H
hzcheng 已提交
1073

1074
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1075 1076
}

S
[TD-16]  
slguan 已提交
1077 1078 1079 1080 1081 1082 1083
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_ACCT;

  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1084
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
[TD-16]  
slguan 已提交
1085 1086 1087 1088
  }

  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1089
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
S
[TD-16]  
slguan 已提交
1090 1091 1092 1093

  return TSDB_CODE_SUCCESS;
}

1094 1095
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1096
  pCmd->payloadLen = sizeof(SCMUseDbMsg);
H
hzcheng 已提交
1097

S
slguan 已提交
1098 1099
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1100
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1101
  }
1102

1103
  SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload;
1104
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1105
  strcpy(pUseDbMsg->db, pTableMetaInfo->name);
S
slguan 已提交
1106
  pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
H
hzcheng 已提交
1107

1108
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1109 1110
}

1111
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1112
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1113
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1114
  pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
1115
  pCmd->payloadLen = sizeof(SCMShowMsg) + 100;
H
hzcheng 已提交
1116

S
slguan 已提交
1117 1118
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1119
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1120
  }
H
hzcheng 已提交
1121

1122
  SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload;
S
slguan 已提交
1123

1124
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1125
  size_t nameLen = strlen(pTableMetaInfo->name);
S
slguan 已提交
1126
  if (nameLen > 0) {
B
Bomin Zhang 已提交
1127
    tstrncpy(pShowMsg->db, pTableMetaInfo->name, sizeof(pShowMsg->db));  // prefix is set here
H
hzcheng 已提交
1128
  } else {
B
Bomin Zhang 已提交
1129
    tstrncpy(pShowMsg->db, pObj->db, sizeof(pShowMsg->db));
H
hzcheng 已提交
1130 1131
  }

1132 1133 1134 1135
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
H
Haojun Liao 已提交
1136
    SStrToken *pPattern = &pShowInfo->pattern;
1137 1138 1139 1140 1141
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
H
Haojun Liao 已提交
1142
    SStrToken *pEpAddr = &pShowInfo->prefix;
dengyihao's avatar
dengyihao 已提交
1143
    assert(pEpAddr->n > 0 && pEpAddr->type > 0);
H
hzcheng 已提交
1144

dengyihao's avatar
dengyihao 已提交
1145 1146
    strncpy(pShowMsg->payload, pEpAddr->z, pEpAddr->n);
    pShowMsg->payloadLen = htons(pEpAddr->n);
1147 1148
  }

1149
  pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
1150
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1151 1152
}

1153
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1154
  SSqlCmd *pCmd = &pSql->cmd;
1155
  pCmd->payloadLen = sizeof(SCMKillQueryMsg);
H
hzcheng 已提交
1156

1157 1158
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
S
slguan 已提交
1159
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
1160 1161
      break;
    case TSDB_SQL_KILL_CONNECTION:
S
slguan 已提交
1162
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
1163 1164
      break;
    case TSDB_SQL_KILL_STREAM:
S
slguan 已提交
1165
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
1166 1167 1168
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1169 1170
}

1171
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1172 1173
  SSqlCmd *pCmd = &(pSql->cmd);

guanshengliang's avatar
guanshengliang 已提交
1174
  int32_t size = minMsgSize() + sizeof(SCMCreateTableMsg);
H
hzcheng 已提交
1175

1176
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1177
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
1178 1179
    size += sizeof(STagData);
  } else {
S
slguan 已提交
1180
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1181
  }
1182

1183 1184 1185
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1186 1187 1188 1189

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1190
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1191
  int              msgLen = 0;
S
slguan 已提交
1192
  SSchema *        pSchema;
H
hzcheng 已提交
1193
  int              size = 0;
1194 1195 1196
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1197
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1198 1199

  // Reallocate the payload size
1200
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1201 1202
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1203
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1204
  }
H
hzcheng 已提交
1205 1206


1207
  SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
H
hjxilinx 已提交
1208
  strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
1209 1210

  // use dbinfo from table id without modifying current db info
H
Haojun Liao 已提交
1211
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pCreateTableMsg->db);
H
hzcheng 已提交
1212

1213 1214 1215
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;
H
hzcheng 已提交
1216 1217 1218 1219
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
S
slguan 已提交
1220
  char *pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
1221

1222 1223
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
1224 1225 1226 1227 1228 1229 1230
    STagData* pTag = &pInfo->pCreateTableInfo->usingInfo.tagdata;
    *(int32_t*)pMsg = htonl(pTag->dataLen);
    pMsg += sizeof(int32_t);
    memcpy(pMsg, pTag->name, sizeof(pTag->name));
    pMsg += sizeof(pTag->name);
    memcpy(pMsg, pTag->data, pTag->dataLen);
    pMsg += pTag->dataLen;
1231
  } else {  // create (super) table
1232
    pSchema = (SSchema *)pCreateTableMsg->schema;
1233

H
hzcheng 已提交
1234
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
H
hjxilinx 已提交
1235
      TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hzcheng 已提交
1236 1237 1238 1239

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
1240

H
hzcheng 已提交
1241 1242 1243 1244
      pSchema++;
    }

    pMsg = (char *)pSchema;
1245 1246
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
1247

1248 1249 1250
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
1251 1252 1253
    }
  }

H
hjxilinx 已提交
1254
  tscFieldInfoClear(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
1255

S
TD-1057  
Shengliang Guan 已提交
1256
  msgLen = (int32_t)(pMsg - (char*)pCreateTableMsg);
S
slguan 已提交
1257
  pCreateTableMsg->contLen = htonl(msgLen);
H
hzcheng 已提交
1258
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1259
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
H
hzcheng 已提交
1260 1261

  assert(msgLen + minMsgSize() <= size);
1262
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1263 1264 1265
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
1266
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
guanshengliang's avatar
guanshengliang 已提交
1267
  return minMsgSize() + sizeof(SCMAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
1268 1269 1270
         TSDB_EXTRA_PAYLOAD_SIZE;
}

1271
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1272 1273
  char *pMsg;
  int   msgLen = 0;
H
hzcheng 已提交
1274

1275
  SSqlCmd    *pCmd = &pSql->cmd;
1276 1277
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
1278
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1279 1280 1281
  
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;
  int size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
1282 1283
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
H
Haojun Liao 已提交
1284
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1285
  }
1286 1287
  
  SCMAlterTableMsg *pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
H
Haojun Liao 已提交
1288
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
1289

H
hjxilinx 已提交
1290
  strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
1291
  pAlterTableMsg->type = htons(pAlterInfo->type);
1292

1293
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
S
slguan 已提交
1294
  SSchema *pSchema = pAlterTableMsg->schema;
1295
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
H
hjxilinx 已提交
1296
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
1297
  
H
hzcheng 已提交
1298 1299 1300 1301 1302 1303 1304
    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;
1305 1306 1307
  pAlterTableMsg->tagValLen = htonl(pAlterInfo->tagData.dataLen);
  memcpy(pMsg, pAlterInfo->tagData.data, pAlterInfo->tagData.dataLen);
  pMsg += pAlterInfo->tagData.dataLen;
H
hzcheng 已提交
1308

S
TD-1057  
Shengliang Guan 已提交
1309
  msgLen = (int32_t)(pMsg - (char*)pAlterTableMsg);
1310

H
hzcheng 已提交
1311
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1312
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
H
hzcheng 已提交
1313 1314

  assert(msgLen + minMsgSize() <= size);
1315

1316
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1317 1318
}

1319 1320 1321 1322
int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
  SSqlCmd* pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL;
  
1323
  SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
1324
  pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
1325

1326 1327
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1328

dengyihao's avatar
dengyihao 已提交
1329
  tscDumpEpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->epSet);
1330

1331 1332 1333
  return TSDB_CODE_SUCCESS;
}

1334
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1335
  SSqlCmd *pCmd = &pSql->cmd;
1336
  pCmd->payloadLen = sizeof(SCMAlterDbMsg);
S
slguan 已提交
1337
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
H
hzcheng 已提交
1338

1339
  SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
1340
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1341
  tstrncpy(pAlterDbMsg->db, pTableMetaInfo->name, sizeof(pAlterDbMsg->db));
H
hzcheng 已提交
1342

1343
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1344 1345
}

1346
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1347
  SSqlCmd *pCmd = &pSql->cmd;
J
jtao1735 已提交
1348
  pCmd->msgType = TSDB_MSG_TYPE_CM_RETRIEVE;
S
slguan 已提交
1349
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
1350

S
slguan 已提交
1351 1352
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1353
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
1354
  }
S
slguan 已提交
1355

S
slguan 已提交
1356 1357 1358 1359
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
1360

1361
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1362 1363
}

1364
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
1365
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1366 1367 1368
    return pRes->code;
  }

H
hjxilinx 已提交
1369
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
H
hjxilinx 已提交
1370
    int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
1371
    pRes->tsrow[i] = (unsigned char*)((char*) pRes->data + offset * pRes->numOfRows);
H
hzcheng 已提交
1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
1386

1387
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1388

H
hzcheng 已提交
1389 1390 1391 1392 1393 1394
  pRes->code = TSDB_CODE_SUCCESS;
  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

1395
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1396
  } else {
S
slguan 已提交
1397
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
1413
  SSqlCmd *       pCmd = &pSql->cmd;
1414
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
1415

H
hjxilinx 已提交
1416
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1417 1418
  
  int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
H
hzcheng 已提交
1419 1420 1421
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
Haojun Liao 已提交
1422 1423 1424
int tscProcessLocalRetrieveRsp(SSqlObj *pSql) {
  int32_t numOfRes = 1;
  pSql->res.completed = true;
H
hzcheng 已提交
1425 1426 1427
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
hjxilinx 已提交
1428
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
H
hzcheng 已提交
1429
  SSqlRes *pRes = &pSql->res;
H
Haojun Liao 已提交
1430
  SSqlCmd* pCmd = &pSql->cmd;
H
hzcheng 已提交
1431

H
Haojun Liao 已提交
1432 1433 1434 1435 1436 1437
  int32_t code = pRes->code;
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return code;
  }

1438
  pRes->code = tscDoLocalMerge(pSql);
H
hjxilinx 已提交
1439
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1440 1441

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
1442
    tscCreateResPointerInfo(pRes, pQueryInfo);
H
hzcheng 已提交
1443 1444 1445
  }

  pRes->row = 0;
1446
  pRes->completed = (pRes->numOfRows == 0);
H
hzcheng 已提交
1447

H
Haojun Liao 已提交
1448
  code = pRes->code;
H
hjxilinx 已提交
1449 1450 1451 1452
  if (pRes->code == TSDB_CODE_SUCCESS) {
    (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
  } else {
    tscQueueAsyncRes(pSql);
H
hzcheng 已提交
1453 1454 1455 1456 1457
  }

  return code;
}

S
slguan 已提交
1458
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
1459

1460
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1461
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1462
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1463
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
1464
  pCmd->payloadLen = sizeof(SCMConnectMsg);
H
hzcheng 已提交
1465

S
slguan 已提交
1466 1467
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1468
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1469 1470
  }

1471
  SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
H
hzcheng 已提交
1472

H
Haojun Liao 已提交
1473
  // TODO refactor full_name
H
hzcheng 已提交
1474 1475 1476
  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
B
Bomin Zhang 已提交
1477 1478 1479
  tstrncpy(pConnect->db, db, sizeof(pConnect->db));
  tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion));
  tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion));
H
hzcheng 已提交
1480

H
Haojun Liao 已提交
1481 1482 1483
  pConnect->pid = htonl(taosGetPId());
  taosGetCurrentAPPName(pConnect->appName, NULL);

1484
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1485 1486
}

H
hjxilinx 已提交
1487
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1488 1489 1490
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
1491
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1492

B
Bomin Zhang 已提交
1493
  SCMTableInfoMsg* pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
H
hjxilinx 已提交
1494
  strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
H
hjxilinx 已提交
1495
  pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
S
slguan 已提交
1496

B
Bomin Zhang 已提交
1497
  char* pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
H
hzcheng 已提交
1498

B
Bomin Zhang 已提交
1499 1500 1501 1502 1503 1504 1505
  size_t len = htonl(pCmd->tagData.dataLen);
  if (pSql->cmd.autoCreated) {
    if (len > 0) {
      len += sizeof(pCmd->tagData.name) + sizeof(pCmd->tagData.dataLen);
      memcpy(pInfoMsg->tags, &pCmd->tagData, len);
      pMsg += len;
    }
H
hzcheng 已提交
1506 1507
  }

S
TD-1057  
Shengliang Guan 已提交
1508
  pCmd->payloadLen = (int32_t)(pMsg - (char*)pInfoMsg);
S
slguan 已提交
1509
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
H
hzcheng 已提交
1510

1511
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1512 1513
}

S
slguan 已提交
1514
/**
1515
 *  multi table meta req pkg format:
1516
 *  | SMgmtHead | SCMMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
1517 1518
 *      no used         4B
 **/
1519
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
guanshengliang's avatar
guanshengliang 已提交
1520
#if 0
S
slguan 已提交
1521 1522 1523 1524 1525
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
H
Haojun Liao 已提交
1526
    if ((tmpData = calloc(1, pCmd->payloadLen + 1)) == NULL) return -1;
S
slguan 已提交
1527 1528 1529 1530 1531
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
H
Haojun Liao 已提交
1532
  memset(pMgmt->db, 0, TSDB_TABLE_FNAME_LEN);  // server don't need the db
S
slguan 已提交
1533

1534
  SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
S
slguan 已提交
1535
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
1536 1537

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
1538
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
1539 1540
  }

S
Shengliang Guan 已提交
1541
  taosTFree(tmpData);
S
slguan 已提交
1542

1543
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1544
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
S
slguan 已提交
1545 1546 1547

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

1548
  tscDebug("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
1549 1550 1551
           pCmd->payloadLen);

  return pCmd->payloadLen;
guanshengliang's avatar
guanshengliang 已提交
1552 1553
#endif
  return 0;  
S
slguan 已提交
1554 1555
}

H
hjxilinx 已提交
1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572
//static UNUSED_FUNC int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
////  const int32_t defaultSize =
////      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
////  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
////
////  int32_t n = 0;
////  size_t size = taosArrayGetSize(pQueryInfo->tagCond.pCond);
////  for (int32_t i = 0; i < size; ++i) {
////    assert(0);
//////    n += strlen(pQueryInfo->tagCond.cond[i].cond);
////  }
////
////  int32_t tagLen = n * TSDB_NCHAR_SIZE;
////  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
////    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
////  }
////
H
Haojun Liao 已提交
1573
////  int32_t joinCondLen = (TSDB_TABLE_FNAME_LEN + sizeof(int16_t)) * 2;
H
hjxilinx 已提交
1574 1575 1576 1577 1578 1579 1580 1581
////  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
////
////  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndex);
////
////  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
////
////  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
//}
H
hzcheng 已提交
1582

H
hjxilinx 已提交
1583 1584
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hjxilinx 已提交
1585 1586 1587 1588 1589 1590 1591 1592 1593 1594
  
  char* pMsg = pCmd->payload;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  
  SCMSTableVgroupMsg *pStableVgroupMsg = (SCMSTableVgroupMsg *) pMsg;
  pStableVgroupMsg->numOfTables = htonl(pQueryInfo->numOfTables);
  pMsg += sizeof(SCMSTableVgroupMsg);
  
  for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
1595 1596 1597
    size_t size = sizeof(pTableMetaInfo->name);
    tstrncpy(pMsg, pTableMetaInfo->name, size);
    pMsg += size;
H
hjxilinx 已提交
1598
  }
H
hjxilinx 已提交
1599 1600

  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
S
TD-1057  
Shengliang Guan 已提交
1601
  pCmd->payloadLen = (int32_t)(pMsg - pCmd->payload);
H
hjxilinx 已提交
1602

1603
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1604 1605
}

1606 1607
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1608 1609
  STscObj *pObj = pSql->pTscObj;

1610
  pthread_mutex_lock(&pObj->mutex);
H
hzcheng 已提交
1611

S
Shengliang Guan 已提交
1612
  int32_t numOfQueries = 2;
H
hzcheng 已提交
1613 1614 1615
  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
    tpSql = tpSql->next;
1616
    numOfQueries++;
H
hzcheng 已提交
1617 1618
  }

S
Shengliang Guan 已提交
1619
  int32_t numOfStreams = 2;
H
hzcheng 已提交
1620 1621 1622
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
    pStream = pStream->next;
1623
    numOfStreams++;
H
hzcheng 已提交
1624 1625
  }

1626
  int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100;
S
slguan 已提交
1627
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
1628
    pthread_mutex_unlock(&pObj->mutex);
S
slguan 已提交
1629
    tscError("%p failed to malloc for heartbeat msg", pSql);
H
Haojun Liao 已提交
1630
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1631
  }
H
hzcheng 已提交
1632

1633
  SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
1634 1635
  pHeartbeat->numOfQueries = numOfQueries;
  pHeartbeat->numOfStreams = numOfStreams;
H
Haojun Liao 已提交
1636 1637 1638 1639

  pHeartbeat->pid = htonl(taosGetPId());
  taosGetCurrentAPPName(pHeartbeat->appName, NULL);

1640
  int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj);
H
hzcheng 已提交
1641 1642 1643 1644

  pthread_mutex_unlock(&pObj->mutex);

  pCmd->payloadLen = msgLen;
S
slguan 已提交
1645
  pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
H
hzcheng 已提交
1646

1647
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1648 1649
}

1650 1651
int tscProcessTableMetaRsp(SSqlObj *pSql) {
  STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
H
hzcheng 已提交
1652

H
Haojun Liao 已提交
1653
  pMetaMsg->tid = htonl(pMetaMsg->tid);
1654
  pMetaMsg->sversion = htons(pMetaMsg->sversion);
1655
  pMetaMsg->tversion = htons(pMetaMsg->tversion);
H
hjxilinx 已提交
1656 1657
  pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
  
1658 1659
  pMetaMsg->uid = htobe64(pMetaMsg->uid);
  pMetaMsg->contLen = htons(pMetaMsg->contLen);
1660
  pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1661

H
Haojun Liao 已提交
1662
  if ((pMetaMsg->tableType != TSDB_SUPER_TABLE) &&
H
Haojun Liao 已提交
1663
      (pMetaMsg->tid <= 0 || pMetaMsg->vgroup.vgId < 2 || pMetaMsg->vgroup.numOfEps <= 0)) {
H
Haojun Liao 已提交
1664
    tscError("invalid value in table numOfEps:%d, vgId:%d tid:%d, name:%s", pMetaMsg->vgroup.numOfEps, pMetaMsg->vgroup.vgId,
H
Haojun Liao 已提交
1665
             pMetaMsg->tid, pMetaMsg->tableId);
1666
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1667 1668
  }

B
Bomin Zhang 已提交
1669
  if (pMetaMsg->numOfTags > TSDB_MAX_TAGS) {
1670
    tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
1671
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1672 1673
  }

1674 1675
  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
1676
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1677 1678
  }

1679 1680
  for (int i = 0; i < pMetaMsg->vgroup.numOfEps; ++i) {
    pMetaMsg->vgroup.epAddr[i].port = htons(pMetaMsg->vgroup.epAddr[i].port);
H
hzcheng 已提交
1681 1682
  }

1683
  SSchema* pSchema = pMetaMsg->schema;
H
hzcheng 已提交
1684

1685
  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
H
hzcheng 已提交
1686 1687 1688
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);
1689 1690 1691 1692 1693

    if (pSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
      assert(i == 0);
    }

H
hjxilinx 已提交
1694
    assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
1695 1696 1697
    pSchema++;
  }

1698 1699
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
1700
  
H
hzcheng 已提交
1701
  // todo add one more function: taosAddDataIfNotExists();
1702
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1703
  assert(pTableMetaInfo->pTableMeta == NULL);
H
hzcheng 已提交
1704

H
Haojun Liao 已提交
1705
  pTableMetaInfo->pTableMeta = (STableMeta *) taosCachePut(tscMetaCache, pTableMetaInfo->name,
1706
      strlen(pTableMetaInfo->name), pTableMeta, size, tsTableMetaKeepTimer * 1000);
H
hjxilinx 已提交
1707
  
1708
  // todo handle out of memory case
1709
  if (pTableMetaInfo->pTableMeta == NULL) {
B
Bomin Zhang 已提交
1710
    free(pTableMeta);
1711
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1712
  }
H
hzcheng 已提交
1713

1714
  tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->id.uid, pTableMeta->id.tid, pTableMetaInfo->name);
1715
  free(pTableMeta);
1716
  
H
hjxilinx 已提交
1717
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1718 1719
}

S
slguan 已提交
1720
/**
1721
 *  multi table meta rsp pkg format:
1722
 *  | STaosRsp | ieType | SCMMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
1723 1724 1725
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
1726
#if 0
S
slguan 已提交
1727 1728 1729 1730 1731
  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
1732
    pSql->res.code = TSDB_CODE_TSC_INVALID_IE;
S
slguan 已提交
1733
    pSql->res.numOfTotal = 0;
1734
    return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1735 1736 1737 1738
  }

  rsp++;

1739
  SCMMultiTableInfoMsg *pInfo = (SCMMultiTableInfoMsg *)rsp;
S
slguan 已提交
1740
  totalNum = htonl(pInfo->numOfTables);
1741
  rsp += sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1742 1743

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
1744
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
1745
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
1746 1747 1748

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
H
hjxilinx 已提交
1749
    pMeta->vgId = htonl(pMeta->vgId);
S
slguan 已提交
1750 1751
    pMeta->uid = htobe64(pMeta->uid);

H
hjxilinx 已提交
1752 1753
    if (pMeta->sid <= 0 || pMeta->vgId < 0) {
      tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
1754
      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
S
slguan 已提交
1755
      pSql->res.numOfTotal = i;
1756
      return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1757 1758
    }

H
hjxilinx 已提交
1759 1760 1761 1762
    //    pMeta->numOfColumns = htons(pMeta->numOfColumns);
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid tag value count:%d", pMeta->numOfTags);
1763
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1764
    //      pSql->res.numOfTotal = i;
1765
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1766 1767 1768 1769
    //    }
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid numOfTags:%d", pMeta->numOfTags);
1770
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1771
    //      pSql->res.numOfTotal = i;
1772
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1773 1774 1775 1776
    //    }
    //
    //    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
    //      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
1777
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1778
    //      pSql->res.numOfTotal = i;
1779
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1780 1781
    //    }
    //
H
hjxilinx 已提交
1782
    //    for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) {
H
hjxilinx 已提交
1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816
    //      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    //    }
    //
    //    pMeta->rowSize = 0;
    //    rsp += sizeof(SMultiTableMeta);
    //    pSchema = (SSchema *)rsp;
    //
    //    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    //    for (int j = 0; j < numOfTotalCols; ++j) {
    //      pSchema->bytes = htons(pSchema->bytes);
    //      pSchema->colId = htons(pSchema->colId);
    //
    //      // ignore the tags length
    //      if (j < pMeta->numOfColumns) {
    //        pMeta->rowSize += pSchema->bytes;
    //      }
    //      pSchema++;
    //    }
    //
    //    rsp += numOfTotalCols * sizeof(SSchema);
    //
    //    int32_t  tagLen = 0;
    //    SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
    //
    //    if (pMeta->tableType == TSDB_CHILD_TABLE) {
    //      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
    //        tagLen += pTagsSchema[j].bytes;
    //      }
    //    }
    //
    //    rsp += tagLen;
    //    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
    //
    //    pMeta->index = 0;
H
Haojun Liao 已提交
1817
    //    (void)taosCachePut(tscMetaCache, pMeta->tableId, (char *)pMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1818
    //  }
S
slguan 已提交
1819
  }
H
hjxilinx 已提交
1820
  
S
slguan 已提交
1821 1822
  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
1823
  tscDebug("%p load multi-metermeta resp from complete num:%d", pSql, pSql->res.numOfTotal);
1824 1825
#endif
  
S
slguan 已提交
1826 1827 1828
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
1829
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
1830
  SSqlRes* pRes = &pSql->res;
H
hjxilinx 已提交
1831
  
H
hjxilinx 已提交
1832
  // NOTE: the order of several table must be preserved.
1833
  SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pRes->pRsp;
H
hjxilinx 已提交
1834 1835
  pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables);
  char* pMsg = pRes->pRsp + sizeof(SCMSTableVgroupRspMsg);
H
hjxilinx 已提交
1836
  
1837 1838 1839
  // master sqlObj locates in param
  SSqlObj* parent = pSql->param;
  assert(parent != NULL);
H
hjxilinx 已提交
1840
  
1841
  SSqlCmd* pCmd = &parent->cmd;
H
hjxilinx 已提交
1842 1843 1844
  for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) {
    STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);

H
Haojun Liao 已提交
1845 1846 1847
    SVgroupsMsg *  pVgroupMsg = (SVgroupsMsg *) pMsg;
    pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups);

H
Haojun Liao 已提交
1848 1849 1850 1851
    size_t size = sizeof(SCMVgroupMsg) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsMsg);

    size_t vgroupsz = sizeof(SCMVgroupInfo) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsInfo);
    pInfo->vgroupList = calloc(1, vgroupsz);
H
hjxilinx 已提交
1852 1853
    assert(pInfo->vgroupList != NULL);

H
Haojun Liao 已提交
1854
    pInfo->vgroupList->numOfVgroups = pVgroupMsg->numOfVgroups;
H
hjxilinx 已提交
1855
    for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
dengyihao's avatar
dengyihao 已提交
1856
      //just init, no need to lock
H
hjxilinx 已提交
1857
      SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
H
Haojun Liao 已提交
1858

H
Haojun Liao 已提交
1859 1860 1861
      SCMVgroupMsg *vmsg = &pVgroupMsg->vgroups[j];
      pVgroups->vgId = htonl(vmsg->vgId);
      pVgroups->numOfEps = vmsg->numOfEps;
H
Haojun Liao 已提交
1862

H
Haojun Liao 已提交
1863
      assert(pVgroups->numOfEps >= 1 && pVgroups->vgId >= 1);
H
hjxilinx 已提交
1864

1865
      for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
H
Haojun Liao 已提交
1866 1867
        pVgroups->epAddr[k].port = htons(vmsg->epAddr[k].port);
        pVgroups->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, tListLen(vmsg->epAddr[k].fqdn));
H
hjxilinx 已提交
1868
      }
1869
    }
1870 1871

    pMsg += size;
H
hjxilinx 已提交
1872 1873
  }
  
S
slguan 已提交
1874
  return pSql->res.code;
H
hzcheng 已提交
1875 1876 1877 1878 1879 1880
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
H
hjxilinx 已提交
1881
  STableMetaMsg * pMetaMsg;
1882
  SCMShowRsp *pShow;
S
slguan 已提交
1883
  SSchema *    pSchema;
H
hzcheng 已提交
1884 1885
  char         key[20];

1886 1887 1888
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
1889
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
1890

H
hjxilinx 已提交
1891
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1892

1893
  pShow = (SCMShowRsp *)pRes->pRsp;
S
slguan 已提交
1894
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
1895 1896
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
1897
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
1898
  pMetaMsg = &(pShow->tableMeta);
H
hzcheng 已提交
1899

H
hjxilinx 已提交
1900
  pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1901

H
hjxilinx 已提交
1902
  pSchema = pMetaMsg->schema;
H
Haojun Liao 已提交
1903
  pMetaMsg->tid = ntohs(pMetaMsg->tid);
H
hjxilinx 已提交
1904
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
hzcheng 已提交
1905 1906 1907 1908
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

1909
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
1910 1911
  strcpy(key + 1, "showlist");

H
Haojun Liao 已提交
1912 1913 1914 1915
  if (pTableMetaInfo->pTableMeta != NULL) {
    taosCacheRelease(tscMetaCache, (void *)&(pTableMetaInfo->pTableMeta), false);
  }

H
hjxilinx 已提交
1916 1917 1918
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
  
H
Haojun Liao 已提交
1919
  pTableMetaInfo->pTableMeta = taosCachePut(tscMetaCache, key, strlen(key), (char *)pTableMeta, size,
1920
      tsTableMetaKeepTimer * 1000);
H
hjxilinx 已提交
1921
  SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
1922

1923 1924 1925 1926
  if (pQueryInfo->colList == NULL) {
    pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
  }
  
H
hjxilinx 已提交
1927 1928
  SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
  
S
slguan 已提交
1929
  SColumnIndex index = {0};
H
hjxilinx 已提交
1930 1931 1932
  pSchema = pMetaMsg->schema;
  
  for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) {
S
slguan 已提交
1933
    index.columnIndex = i;
1934 1935
    tscColumnListInsert(pQueryInfo->colList, &index);
    
H
hjxilinx 已提交
1936
    TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes);
H
Haojun Liao 已提交
1937
    SInternalField* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
H
hjxilinx 已提交
1938
    
H
hjxilinx 已提交
1939
    pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
1940
                     pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false);
H
hzcheng 已提交
1941
  }
H
hjxilinx 已提交
1942 1943
  
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
1944
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
1945
  
S
Shengliang Guan 已提交
1946
  taosTFree(pTableMeta);
H
hzcheng 已提交
1947 1948 1949
  return 0;
}

1950
// TODO multithread problem
1951 1952 1953 1954 1955 1956 1957 1958 1959 1960
static void createHBObj(STscObj* pObj) {
  if (pObj->pHb != NULL) {
    return;
  }

  SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
  if (NULL == pSql) return;

  pSql->fp = tscProcessHeartBeatRsp;

1961 1962 1963 1964 1965 1966
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetailSafely(&pSql->cmd, 0);
  if (pQueryInfo == NULL) {
    pSql->res.code = terrno;
    return;
  }

1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978
  pQueryInfo->command = TSDB_SQL_HB;

  pSql->cmd.command = pQueryInfo->command;
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
    taosTFree(pSql);
    return;
  }

  pSql->param = pObj;
  pSql->pTscObj = pObj;
  pSql->signature = pSql;

1979 1980
  registerSqlObj(pSql);
  tscDebug("%p HB is allocated, pObj:%p", pSql, pObj);
1981

1982
  pObj->pHb = pSql;
1983 1984
}

H
hzcheng 已提交
1985
int tscProcessConnectRsp(SSqlObj *pSql) {
H
Haojun Liao 已提交
1986
  char temp[TSDB_TABLE_FNAME_LEN * 2];
H
hzcheng 已提交
1987 1988 1989
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

1990
  SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
B
Bomin Zhang 已提交
1991
  tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId));  // copy acctId from response
1992 1993
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

B
Bomin Zhang 已提交
1994 1995
  assert(len <= sizeof(pObj->db));
  tstrncpy(pObj->db, temp, sizeof(pObj->db));
H
hjxilinx 已提交
1996
  
dengyihao's avatar
dengyihao 已提交
1997 1998 1999
  if (pConnect->epSet.numOfEps > 0) {
    tscEpSetHtons(&pConnect->epSet);
    tscUpdateMgmtEpSet(&pConnect->epSet);
dengyihao's avatar
dengyihao 已提交
2000
  } 
H
hzcheng 已提交
2001

S
slguan 已提交
2002
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
2003 2004
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
S
Shengliang Guan 已提交
2005
  pObj->connId = htonl(pConnect->connId);
2006 2007

  createHBObj(pObj);
2008
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
H
hzcheng 已提交
2009 2010 2011 2012 2013

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
2014
  STscObj *       pObj = pSql->pTscObj;
2015
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2016

B
Bomin Zhang 已提交
2017
  tstrncpy(pObj->db, pTableMetaInfo->name, sizeof(pObj->db));
H
hzcheng 已提交
2018 2019 2020
  return 0;
}

Y
TD-1039  
yihaoDeng 已提交
2021 2022
int tscProcessDropDbRsp(SSqlObj *pSql) {
  pSql->pTscObj->db[0] = 0;
H
Haojun Liao 已提交
2023
  taosCacheEmpty(tscMetaCache);
H
hzcheng 已提交
2024 2025 2026 2027
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
2028
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2029

H
Haojun Liao 已提交
2030
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
Haojun Liao 已提交
2031
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2032 2033 2034 2035 2036 2037 2038
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
2039 2040
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
2041
   */
2042
  tscDebug("%p force release table meta after drop table:%s", pSql, pTableMetaInfo->name);
H
Haojun Liao 已提交
2043
  taosCacheRelease(tscMetaCache, (void **)&pTableMeta, true);
H
hzcheng 已提交
2044

H
hjxilinx 已提交
2045
  if (pTableMetaInfo->pTableMeta) {
H
Haojun Liao 已提交
2046
    taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2047 2048 2049 2050 2051 2052
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
2053
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2054

H
Haojun Liao 已提交
2055
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2056
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2057 2058 2059
    return 0;
  }

2060
  tscDebug("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
H
Haojun Liao 已提交
2061
  taosCacheRelease(tscMetaCache, (void **)&pTableMeta, true);
H
hzcheng 已提交
2062

H
hjxilinx 已提交
2063
  if (pTableMetaInfo->pTableMeta) {
weixin_48148422's avatar
weixin_48148422 已提交
2064
    bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
H
Haojun Liao 已提交
2065
    taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2066

2067
    if (isSuperTable) {  // if it is a super table, reset whole query cache
2068
      tscDebug("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
H
Haojun Liao 已提交
2069
      taosCacheEmpty(tscMetaCache);
H
hzcheng 已提交
2070 2071 2072 2073 2074 2075 2076 2077 2078 2079
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}
2080 2081 2082
int tscProcessShowCreateRsp(SSqlObj *pSql) {
  return tscLocalResultCommonBuilder(pSql, 1);
}
H
hzcheng 已提交
2083 2084 2085 2086

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

2087
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
2088 2089 2090
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
2091
  pRes->data = NULL;
S
slguan 已提交
2092
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2093 2094 2095
  return 0;
}

H
hjxilinx 已提交
2096
int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
S
slguan 已提交
2097 2098 2099
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
2100
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
2101 2102 2103 2104
  if (pRetrieve == NULL) {
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pRes->code;
  }
H
hzcheng 已提交
2105 2106 2107

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
2108 2109
  pRes->offset    = htobe64(pRetrieve->offset);
  pRes->useconds  = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
2110
  pRes->completed = (pRetrieve->completed == 1);
2111
  pRes->data      = pRetrieve->data;
H
hjxilinx 已提交
2112
  
2113
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2114 2115 2116 2117
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
    return pRes->code;
  }
  
weixin_48148422's avatar
weixin_48148422 已提交
2118
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
2119
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2120
    
H
hjxilinx 已提交
2121
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
H
hjxilinx 已提交
2122 2123
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
2124 2125
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
2126
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
2127
    p += sizeof(int32_t);
S
slguan 已提交
2128
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
2129 2130
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
weixin_48148422's avatar
weixin_48148422 已提交
2131
      p += sizeof(int32_t); // skip tid
weixin_48148422's avatar
weixin_48148422 已提交
2132 2133
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
2134
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
2135
    }
2136 2137
  }

H
hzcheng 已提交
2138
  pRes->row = 0;
2139
  tscDebug("%p numOfRows:%" PRId64 ", offset:%" PRId64 ", complete:%d", pSql, pRes->numOfRows, pRes->offset, pRes->completed);
H
hzcheng 已提交
2140 2141 2142 2143

  return 0;
}

H
hjxilinx 已提交
2144
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
H
hzcheng 已提交
2145

2146
static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
S
slguan 已提交
2147 2148
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
2149
    tscError("%p malloc failed for new sqlobj to get table meta", pSql);
2150
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2151
  }
2152

H
hzcheng 已提交
2153 2154 2155
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
2156

2157
  registerSqlObj(pNew);
H
Haojun Liao 已提交
2158

2159
  tscAddSubqueryInfo(&pNew->cmd);
2160

2161
  SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0);
2162

H
hjxilinx 已提交
2163
  pNew->cmd.autoCreated = pSql->cmd.autoCreated;  // create table if not exists
B
Bomin Zhang 已提交
2164
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) {
2165
    tscError("%p malloc failed for payload to get table meta", pSql);
S
slguan 已提交
2166
    free(pNew);
2167

2168
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2169 2170
  }

H
hjxilinx 已提交
2171
  STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
2172
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
2173

B
Bomin Zhang 已提交
2174
  tstrncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, sizeof(pNewMeterMetaInfo->name));
B
Bomin Zhang 已提交
2175
  memcpy(&pNew->cmd.tagData, &pSql->cmd.tagData, sizeof(pSql->cmd.tagData));
2176
  tscDebug("%p new pSqlObj:%p to get tableMeta, auto create:%d", pSql, pNew, pNew->cmd.autoCreated);
H
hzcheng 已提交
2177

H
hjxilinx 已提交
2178 2179
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
H
hzcheng 已提交
2180

H
hjxilinx 已提交
2181 2182
  int32_t code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2183
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;  // notify upper application that current process need to be terminated
H
hzcheng 已提交
2184 2185 2186 2187 2188
  }

  return code;
}

H
hjxilinx 已提交
2189
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
H
hjxilinx 已提交
2190
  assert(strlen(pTableMetaInfo->name) != 0);
S
slguan 已提交
2191

2192
  // If this STableMetaInfo owns a table meta, release it first
H
hjxilinx 已提交
2193
  if (pTableMetaInfo->pTableMeta != NULL) {
H
Haojun Liao 已提交
2194
    taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), false);
2195 2196
  }
  
H
Haojun Liao 已提交
2197
  pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2198
  if (pTableMetaInfo->pTableMeta != NULL) {
H
hjxilinx 已提交
2199
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
2200
    tscDebug("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
2201
             tinfo.numOfTags, pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2202 2203 2204

    return TSDB_CODE_SUCCESS;
  }
2205 2206
  
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2207 2208
}

H
hjxilinx 已提交
2209
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
H
hjxilinx 已提交
2210
  pSql->cmd.autoCreated = createIfNotExists;
H
hjxilinx 已提交
2211
  return tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
2212 2213 2214
}

/**
H
Haojun Liao 已提交
2215
 * retrieve table meta from mnode, and update the local table meta cache.
H
hzcheng 已提交
2216
 * @param pSql          sql object
B
Bomin Zhang 已提交
2217
 * @param tableIndex    table index
H
hzcheng 已提交
2218 2219
 * @return              status code
 */
B
Bomin Zhang 已提交
2220
int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
H
hzcheng 已提交
2221
  SSqlCmd *pCmd = &pSql->cmd;
2222 2223

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
B
Bomin Zhang 已提交
2224
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
H
hzcheng 已提交
2225

H
Haojun Liao 已提交
2226 2227
  STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
  if (pTableMetaInfo->pTableMeta) {
2228
    tscDebug("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
2229
             tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid, pTableMeta);
H
hzcheng 已提交
2230 2231
  }

H
Haojun Liao 已提交
2232
  taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
H
Haojun Liao 已提交
2233
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2234 2235
}

H
hjxilinx 已提交
2236
static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
2237
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2238
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2239
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2240 2241
    if (pTableMetaInfo->vgroupList == NULL) {
      return false;
S
slguan 已提交
2242 2243
    }
  }
H
hjxilinx 已提交
2244 2245 2246 2247
  
  // all super tables vgroupinfo are retrieved, no need to retrieve vgroup info anymore
  return true;
}
H
hzcheng 已提交
2248

H
hjxilinx 已提交
2249
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
2250
  int      code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hjxilinx 已提交
2251 2252 2253
  SSqlCmd *pCmd = &pSql->cmd;
  
  if (allVgroupInfoRetrieved(pCmd, clauseIndex)) {
H
hzcheng 已提交
2254 2255
    return TSDB_CODE_SUCCESS;
  }
H
hjxilinx 已提交
2256

S
slguan 已提交
2257
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
2258 2259 2260
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

H
hjxilinx 已提交
2261
  pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
H
Haojun Liao 已提交
2262 2263

  // TODO TEST IT
2264 2265
  SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0);
  if (pNewQueryInfo == NULL) {
B
Bomin Zhang 已提交
2266
    tscFreeSqlObj(pNew);
2267 2268
    return code;
  }
2269
  
H
hjxilinx 已提交
2270
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2271
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2272
    STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
H
Haojun Liao 已提交
2273
    STableMeta *pTableMeta = taosCacheAcquireByData(tscMetaCache, pMInfo->pTableMeta);
H
hjxilinx 已提交
2274
    tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList);
S
slguan 已提交
2275 2276 2277 2278 2279 2280
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
2281

2282
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
2283
  registerSqlObj(pNew);
H
Haojun Liao 已提交
2284

2285
  tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
H
hzcheng 已提交
2286

2287 2288 2289 2290
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
  code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
2291
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2292 2293 2294 2295 2296
  }

  return code;
}

2297
void tscInitMsgsFp() {
S
slguan 已提交
2298 2299
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
J
jtao1735 已提交
2300
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;
H
hzcheng 已提交
2301 2302

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
2303
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
2304

2305 2306
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
2307 2308

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
S
[TD-16]  
slguan 已提交
2309
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserMsg;
H
hzcheng 已提交
2310 2311 2312
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
2313
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
2314 2315 2316
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
2317
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
2318
  tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
H
hzcheng 已提交
2319 2320 2321 2322
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
H
hjxilinx 已提交
2323
  tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
H
hjxilinx 已提交
2324
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
S
slguan 已提交
2325
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
2326 2327 2328 2329

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
2330 2331 2332
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
2333 2334

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
H
hjxilinx 已提交
2335
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
H
hzcheng 已提交
2336 2337 2338 2339 2340

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
2341
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
H
hjxilinx 已提交
2342
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
S
slguan 已提交
2343
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
2344 2345

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
H
hjxilinx 已提交
2346
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode;  // rsp handled by same function.
H
hzcheng 已提交
2347
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
2348

H
Haojun Liao 已提交
2349 2350 2351 2352 2353
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB]   = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION]  = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS]  = tscProcessLocalRetrieveRsp;
2354

H
hzcheng 已提交
2355 2356
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

H
hjxilinx 已提交
2357
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp;
H
hzcheng 已提交
2358 2359 2360 2361

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

2362 2363 2364 2365
  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp;
  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp;
  

H
hzcheng 已提交
2366 2367 2368 2369 2370 2371
  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;
}