tscServer.c 81.2 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17
#include "tcache.h"
H
Haojun Liao 已提交
18
#include "tcmdtype.h"
H
hzcheng 已提交
19
#include "trpc.h"
20 21
#include "tscLocalMerge.h"
#include "tscLog.h"
H
hzcheng 已提交
22 23 24 25 26
#include "tscProfile.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "ttimer.h"
dengyihao's avatar
dengyihao 已提交
27
#include "tlockfree.h"
H
hzcheng 已提交
28

29
SRpcCorEpSet  tscMgmtEpSet;
S
slguan 已提交
30

31 32
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
33 34 35
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
36

B
Bomin Zhang 已提交
37
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt);
38 39
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
40

S
slguan 已提交
41
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
Haojun Liao 已提交
42 43 44 45 46 47 48 49
static int32_t getWaitingTimeInterval(int32_t count) {
  int32_t initial = 100; // 100 ms by default
  if (count <= 1) {
    return 0;
  }

  return initial * (2<<(count - 2));
}
H
hzcheng 已提交
50

51
static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
52 53
  assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);

54
  SRpcEpSet* pEpSet = &pSql->epSet;
H
Haojun Liao 已提交
55 56 57 58

  // Issue the query to one of the vnode among a vgroup randomly.
  // change the inUse property would not affect the isUse attribute of STableMeta
  pEpSet->inUse = rand() % pVgroupInfo->numOfEps;
59 60 61

  // apply the FQDN string length check here
  bool hasFqdn = false;
dengyihao's avatar
bugfix  
dengyihao 已提交
62

63 64 65 66
  pEpSet->numOfEps = pVgroupInfo->numOfEps;
  for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
    strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn);
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
67 68 69 70

    if (!hasFqdn) {
      hasFqdn = (strlen(pEpSet->fqdn[i]) > 0);
    }
71
  }
72 73

  assert(hasFqdn);
74
}
75

dengyihao's avatar
dengyihao 已提交
76 77 78 79
static void tscDumpMgmtEpSet(SRpcEpSet *epSet) {
  taosCorBeginRead(&tscMgmtEpSet.version);
  *epSet = tscMgmtEpSet.epSet;
  taosCorEndRead(&tscMgmtEpSet.version);
80
}  
dengyihao's avatar
dengyihao 已提交
81 82
static void tscEpSetHtons(SRpcEpSet *s) {
   for (int32_t i = 0; i < s->numOfEps; i++) {
dengyihao's avatar
fixbug  
dengyihao 已提交
83 84 85
      s->port[i] = htons(s->port[i]);    
   }
} 
dengyihao's avatar
dengyihao 已提交
86 87
bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
   if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
88 89
     return false;
   } 
dengyihao's avatar
dengyihao 已提交
90
   for (int32_t i = 0; i < s1->numOfEps; i++) {
91 92 93 94 95
     if (s1->port[i] != s2->port[i] 
        || strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0)
        return false;
   }
   return true;
S
slguan 已提交
96
}
dengyihao's avatar
dengyihao 已提交
97
void tscUpdateMgmtEpSet(SRpcEpSet *pEpSet) {
98
  // no need to update if equal
dengyihao's avatar
dengyihao 已提交
99 100 101
  taosCorBeginWrite(&tscMgmtEpSet.version);
  tscMgmtEpSet.epSet = *pEpSet;
  taosCorEndWrite(&tscMgmtEpSet.version);
102
}
dengyihao's avatar
dengyihao 已提交
103
static void tscDumpEpSetFromVgroupInfo(SCMCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
104
  if (pVgroupInfo == NULL) { return;}
dengyihao's avatar
bugfix  
dengyihao 已提交
105
  taosCorBeginRead(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
106
  int8_t inUse = pVgroupInfo->inUse;
dengyihao's avatar
dengyihao 已提交
107 108 109
  pEpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0; 
  pEpSet->numOfEps = pVgroupInfo->numOfEps;  
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
Y
yihaoDeng 已提交
110
    tstrncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, sizeof(pEpSet->fqdn[i]));
dengyihao's avatar
dengyihao 已提交
111
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
S
slguan 已提交
112
  }
dengyihao's avatar
bugfix  
dengyihao 已提交
113
  taosCorEndRead(&pVgroupInfo->version);
S
slguan 已提交
114 115
}

dengyihao's avatar
dengyihao 已提交
116
static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
117 118
  SSqlCmd *pCmd = &pObj->cmd;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
dengyihao's avatar
dengyihao 已提交
119
  if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;}
dengyihao's avatar
bugfix  
dengyihao 已提交
120
  SCMCorVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->corVgroupInfo;
dengyihao's avatar
dengyihao 已提交
121

dengyihao's avatar
bugfix  
dengyihao 已提交
122
  taosCorBeginWrite(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
123
  tscDebug("before: Endpoint in use: %d", pVgroupInfo->inUse);
124 125
  pVgroupInfo->inUse = pEpSet->inUse;
  pVgroupInfo->numOfEps = pEpSet->numOfEps;
dengyihao's avatar
dengyihao 已提交
126
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; i++) {
H
Haojun Liao 已提交
127 128
    taosTFree(pVgroupInfo->epAddr[i].fqdn);
    pVgroupInfo->epAddr[i].fqdn = strndup(pEpSet->fqdn[i], tListLen(pEpSet->fqdn[i]));
129
    pVgroupInfo->epAddr[i].port = pEpSet->port[i];
dengyihao's avatar
dengyihao 已提交
130
  }
H
Haojun Liao 已提交
131

dengyihao's avatar
dengyihao 已提交
132
  tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse);
dengyihao's avatar
bugfix  
dengyihao 已提交
133
  taosCorEndWrite(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
134
}
H
Haojun Liao 已提交
135

136 137 138 139
void tscPrintMgmtEp() {
  SRpcEpSet dump;
  tscDumpMgmtEpSet(&dump);
  if (dump.numOfEps <= 0) {
dengyihao's avatar
dengyihao 已提交
140
    tscError("invalid mnode EP list:%d", dump.numOfEps);
S
slguan 已提交
141
  } else {
142
    for (int i = 0; i < dump.numOfEps; ++i) {
143
      tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]);
S
slguan 已提交
144
    }
S
slguan 已提交
145
  }
S
slguan 已提交
146 147
}

H
hzcheng 已提交
148 149 150
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
H
Haojun Liao 已提交
151

H
hzcheng 已提交
152 153 154 155 156
  if (pObj != pObj->signature) {
    tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
    return;
  }

H
Haojun Liao 已提交
157
  SSqlObj *pSql = tres;
H
hzcheng 已提交
158 159 160
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
161
    SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
dengyihao's avatar
dengyihao 已提交
162 163 164 165
    SRpcEpSet *      epSet = &pRsp->epSet;
    if (epSet->numOfEps > 0) {
      tscEpSetHtons(epSet);
      tscUpdateMgmtEpSet(epSet);
dengyihao's avatar
dengyihao 已提交
166
    } 
S
slguan 已提交
167

S
Shengliang Guan 已提交
168 169
    pSql->pTscObj->connId = htonl(pRsp->connId);

H
hzcheng 已提交
170 171
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
172
      return;
H
hzcheng 已提交
173
    } else {
S
slguan 已提交
174 175
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
176 177
    }
  } else {
H
Haojun Liao 已提交
178
    tscDebug("heartbeat failed, code:%s", tstrerror(code));
H
hzcheng 已提交
179 180
  }

H
Haojun Liao 已提交
181
  if (pObj->pHb != NULL) {
182
    int32_t waitingDuring = tsShellActivityTimer * 500;
183
    tscDebug("%p start heartbeat in %dms", pSql, waitingDuring);
H
Haojun Liao 已提交
184 185 186 187 188

    taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer);
  } else {
    tscDebug("%p start to close tscObj:%p, not send heartbeat again", pSql, pObj);
  }
H
hzcheng 已提交
189 190 191 192
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;
193 194 195
  if (pObj == NULL || pObj->signature != pObj) {
    return;
  }
H
hzcheng 已提交
196

197 198 199
  SSqlObj* pHB = pObj->pHb;
  if (pObj->pTimer != tmrId || pHB == NULL) {
    return;
H
hzcheng 已提交
200 201
  }

H
Haojun Liao 已提交
202
  void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE));
H
Haojun Liao 已提交
203 204 205 206 207 208 209 210 211 212 213 214
  if (p == NULL) {
    tscWarn("%p HB object has been released already", pHB);
    return;
  }

  assert(*pHB->self == pHB);

  int32_t code = tscProcessSql(pHB);
  taosCacheRelease(tscObjCache, (void**) &p, false);

  if (code != TSDB_CODE_SUCCESS) {
    tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
H
hzcheng 已提交
215 216 217 218
  }
}

int tscSendMsgToServer(SSqlObj *pSql) {
219
  STscObj* pObj = pSql->pTscObj;
H
hjxilinx 已提交
220 221 222
  SSqlCmd* pCmd = &pSql->cmd;
  
  char *pMsg = rpcMallocCont(pCmd->payloadLen);
S
slguan 已提交
223
  if (NULL == pMsg) {
224
    tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]);
225
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
226 227
  }

228 229
  // set the mgmt ip list
  if (pSql->cmd.command >= TSDB_SQL_MGMT) {
dengyihao's avatar
dengyihao 已提交
230
    tscDumpMgmtEpSet(&pSql->epSet);
J
jtao1735 已提交
231 232
  }

233
  memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
234

J
jtao1735 已提交
235
  SRpcMsg rpcMsg = {
236 237 238
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
      .contLen = pSql->cmd.payloadLen,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
239 240
      .ahandle = pSql,
      .handle  = &pSql->pRpcCtx,
H
hjxilinx 已提交
241
      .code    = 0
J
jtao1735 已提交
242
  };
H
Haojun Liao 已提交
243

H
Haojun Liao 已提交
244 245 246 247
  // NOTE: the rpc context should be acquired before sending data to server.
  // Otherwise, the pSql object may have been released already during the response function, which is
  // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
  // cause crash.
H
Haojun Liao 已提交
248 249
  rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg);
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
250 251
}

252
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
S
TD-1530  
Shengliang Guan 已提交
253 254
  TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
  void** p = taosCacheAcquireByKey(tscObjCache, &handle, sizeof(TSDB_CACHE_PTR_TYPE));
H
Haojun Liao 已提交
255 256
  if (p == NULL) {
    rpcFreeCont(rpcMsg->pCont);
257 258
    return;
  }
259

H
Haojun Liao 已提交
260 261 262
  SSqlObj* pSql = *p;
  assert(pSql != NULL);

H
Haojun Liao 已提交
263
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
264 265
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
266

H
Haojun Liao 已提交
267
  assert(*pSql->self == pSql);
H
Haojun Liao 已提交
268
  pSql->pRpcCtx = NULL;
H
Haojun Liao 已提交
269

H
Haojun Liao 已提交
270
  if (pObj->signature != pObj) {
271
    tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
H
Haojun Liao 已提交
272

H
Haojun Liao 已提交
273
    taosCacheRelease(tscObjCache, (void**) &p, true);
H
Haojun Liao 已提交
274 275 276 277 278 279
    rpcFreeCont(rpcMsg->pCont);
    return;
  }

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
280
    tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
281
        pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
H
Haojun Liao 已提交
282

H
Haojun Liao 已提交
283 284 285
    void** p1 = p;
    taosCacheRelease(tscObjCache, (void**) &p1, false);

H
Haojun Liao 已提交
286
    taosCacheRelease(tscObjCache, (void**) &p, true);
287
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
288
    return;
H
hzcheng 已提交
289 290
  }

H
Haojun Liao 已提交
291
  if (pEpSet) {
dengyihao's avatar
dengyihao 已提交
292
    if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
H
Haojun Liao 已提交
293 294
      if (pCmd->command < TSDB_SQL_MGMT) {
        tscUpdateVgroupInfo(pSql, pEpSet);
dengyihao's avatar
dengyihao 已提交
295
      } else {
dengyihao's avatar
dengyihao 已提交
296
        tscUpdateMgmtEpSet(pEpSet);
H
Haojun Liao 已提交
297
      }
dengyihao's avatar
dengyihao 已提交
298
    }
J
jtao1735 已提交
299 300
  }

301 302 303 304 305
  int32_t cmd = pCmd->command;
  if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
      (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
       rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
       rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL ||
S
Shengliang Guan 已提交
306
       rpcMsg->code == TSDB_CODE_APP_NOT_READY ||
307 308 309 310 311 312 313
       rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) {
    tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);

    // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
    if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
      pSql->cmd.submitSchema = 1;
    }
314

315 316 317 318
    pSql->res.code = rpcMsg->code;  // keep the previous error code
    if (pSql->retry > pSql->maxRetry) {
      tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
    } else {
H
Haojun Liao 已提交
319
      // wait for a little bit moment and then retry, todo do not sleep in rpc callback thread
H
Haojun Liao 已提交
320 321 322 323 324
      if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
        int32_t duration = getWaitingTimeInterval(pSql->retry);
        taosMsleep(duration);
      }

B
Bomin Zhang 已提交
325
      rpcMsg->code = tscRenewTableMeta(pSql, 0);
326 327 328

      // if there is an error occurring, proceed to the following error handling procedure.
      if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
H
Haojun Liao 已提交
329
        taosCacheRelease(tscObjCache, (void**) &p, false);
330 331
        rpcFreeCont(rpcMsg->pCont);
        return;
H
hzcheng 已提交
332 333
      }
    }
S
slguan 已提交
334
  }
335

H
hzcheng 已提交
336
  pRes->rspLen = 0;
337
  
H
Haojun Liao 已提交
338
  if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
339
    tscDebug("%p query is cancelled, code:%s", pSql, tstrerror(pRes->code));
H
Haojun Liao 已提交
340 341
  } else {
    pRes->code = rpcMsg->code;
H
hzcheng 已提交
342 343
  }

S
slguan 已提交
344
  if (pRes->code == TSDB_CODE_SUCCESS) {
345
    tscDebug("%p reset retry counter to be 0 due to success rsp, old:%d", pSql, pSql->retry);
S
slguan 已提交
346 347 348
    pSql->retry = 0;
  }

349
  if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
350
    assert(rpcMsg->msgType == pCmd->msgType + 1);
351
    pRes->code    = rpcMsg->code;
352
    pRes->rspType = rpcMsg->msgType;
353
    pRes->rspLen  = rpcMsg->contLen;
H
hzcheng 已提交
354

355
    if (pRes->rspLen > 0 && rpcMsg->pCont) {
356 357
      char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
      if (tmp == NULL) {
358
        pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
359 360
      } else {
        pRes->pRsp = tmp;
361
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
362
      }
363 364
    } else {
      pRes->pRsp = NULL;
S
slguan 已提交
365 366
    }

H
hzcheng 已提交
367 368 369 370
    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
371
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
372
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
373 374 375 376 377 378 379
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
380
      tscDebug("%p SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql, sqlCmd[pCmd->command], 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
381
          tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
S
slguan 已提交
382
    } else {
383
      tscDebug("%p SQL cmd:%s, code:%s rspLen:%d", pSql, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
H
hzcheng 已提交
384 385
    }
  }
386
  
H
Haojun Liao 已提交
387
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
388
    rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
Haojun Liao 已提交
389
  }
S
Shengliang Guan 已提交
390

H
Haojun Liao 已提交
391
  bool shouldFree = tscShouldBeFreed(pSql);
392
  if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
S
TD-1057  
Shengliang Guan 已提交
393
    rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
H
Haojun Liao 已提交
394
    (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
H
hzcheng 已提交
395 396
  }

H
Haojun Liao 已提交
397
  void** p1 = p;
H
Haojun Liao 已提交
398
  taosCacheRelease(tscObjCache, (void**) &p1, false);
H
Haojun Liao 已提交
399

H
Haojun Liao 已提交
400 401
  if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it
    taosCacheRelease(tscObjCache, (void **)&p, true);
H
Haojun Liao 已提交
402 403 404
    tscDebug("%p sqlObj is automatically freed", pSql);
  }

405
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
406 407
}

S
slguan 已提交
408 409 410
int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
411

H
hjxilinx 已提交
412 413 414 415 416 417 418
  if (pCmd->command == TSDB_SQL_SELECT ||
      pCmd->command == TSDB_SQL_FETCH ||
      pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT ||
      pCmd->command == TSDB_SQL_CONNECT ||
      pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META ||
H
hjxilinx 已提交
419
      pCmd->command == TSDB_SQL_STABLEVGROUP) {
420 421 422 423 424 425
    pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
  }
  
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return pRes->code;
S
slguan 已提交
426
  }
427

428 429 430
  int32_t code = tscSendMsgToServer(pSql);

  // NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads.
431
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
432
    pRes->code = code;
H
hjxilinx 已提交
433
    tscQueueAsyncRes(pSql);
H
Haojun Liao 已提交
434
    return code;
S
slguan 已提交
435
  }
H
hjxilinx 已提交
436 437
  
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
438 439 440
}

int tscProcessSql(SSqlObj *pSql) {
441
  char    *name = NULL;
442
  SSqlCmd *pCmd = &pSql->cmd;
443
  
444
  SQueryInfo     *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
445
  STableMetaInfo *pTableMetaInfo = NULL;
446
  uint32_t        type = 0;
447

448
  if (pQueryInfo != NULL) {
H
hjxilinx 已提交
449
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
450
    name = (pTableMetaInfo != NULL)? pTableMetaInfo->name:NULL;
451
    type = pQueryInfo->type;
452

H
hjxilinx 已提交
453
    // while numOfTables equals to 0, it must be Heartbeat
H
hjxilinx 已提交
454
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
455
  }
456

457
  tscDebug("%p SQL cmd:%s will be processed, name:%s, type:%d", pSql, sqlCmd[pCmd->command], name, type);
H
hjxilinx 已提交
458
  if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL
H
hjxilinx 已提交
459
    if (pTableMetaInfo == NULL) {
460
      pSql->res.code = TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
461 462
      return pSql->res.code;
    }
H
hjxilinx 已提交
463
  } else if (pCmd->command < TSDB_SQL_LOCAL) {
464
    //pSql->epSet = tscMgmtEpSet;
H
hzcheng 已提交
465 466 467
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }
468
  
S
slguan 已提交
469 470
  return doProcessSql(pSql);
}
H
hzcheng 已提交
471

J
jtao1735 已提交
472
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
473
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
S
slguan 已提交
474
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
S
slguan 已提交
475

H
Haojun Liao 已提交
476
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
S
slguan 已提交
477
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
478

479
  // todo valid the vgroupId at the client side
H
hjxilinx 已提交
480 481
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  
weixin_48148422's avatar
weixin_48148422 已提交
482
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
483
    int32_t vgIndex = pTableMetaInfo->vgroupIndex;
H
Haojun Liao 已提交
484 485 486 487 488 489 490 491 492
    if (pTableMetaInfo->pVgroupTables == NULL) {
      SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList;
      assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);

      pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
      tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex);
    } else {
      int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
      assert(vgIndex >= 0 && vgIndex < numOfVgroups);
H
Haojun Liao 已提交
493

H
Haojun Liao 已提交
494 495 496 497 498
      SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);

      pRetrieveMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId);
      tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex);
    }
499
  } else {
H
hjxilinx 已提交
500
    STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
501
    pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
H
Haojun Liao 已提交
502
    tscDebug("%p build fetch msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId);
503
  }
504 505

  pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg);
J
jtao1735 已提交
506
  pSql->cmd.msgType = TSDB_MSG_TYPE_FETCH;
507 508 509

  pRetrieveMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));

510
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
511 512
}

513
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hjxilinx 已提交
514
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
515
  STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
H
hjxilinx 已提交
516
  
517
  char* pMsg = pSql->cmd.payload;
518 519 520
  
  // NOTE: shell message size should not include SMsgDesc
  int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
521 522
  int32_t vgId = pTableMeta->vgroupInfo.vgId;

523
  SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
524 525
  pMsgDesc->numOfVnodes = htonl(1); // always one vnode

526
  pMsg += sizeof(SMsgDesc);
527
  SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
528

H
hjxilinx 已提交
529
  pShellMsg->header.vgId = htonl(vgId);
H
Haojun Liao 已提交
530
  pShellMsg->header.contLen = htonl(size);      // the length not includes the size of SMsgDesc
531
  pShellMsg->length = pShellMsg->header.contLen;
H
hjxilinx 已提交
532
  
H
Haojun Liao 已提交
533
  pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit);  // number of tables to be inserted
H
hzcheng 已提交
534

H
hjxilinx 已提交
535
  // pSql->cmd.payloadLen is set during copying data into payload
S
slguan 已提交
536
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
dengyihao's avatar
dengyihao 已提交
537
  tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet);
538

539 540
  tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
      pSql->epSet.numOfEps);
541
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
542 543 544
}

/*
545
 * for table query, simply return the size <= 1k
H
hzcheng 已提交
546
 */
547
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
548
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
549
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
550

S
TD-1057  
Shengliang Guan 已提交
551
  int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo));
H
hjxilinx 已提交
552 553
  
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
S
TD-1057  
Shengliang Guan 已提交
554
  int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs);
H
hjxilinx 已提交
555
  
556
  return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + 4096;
H
hzcheng 已提交
557 558
}

559
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
560
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
B
Bomin Zhang 已提交
561
  TSKEY dfltKey = htobe64(pQueryMsg->window.skey);
562

H
hjxilinx 已提交
563
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
weixin_48148422's avatar
weixin_48148422 已提交
564
  if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
565 566
    
    SCMVgroupInfo* pVgroupInfo = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
567
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
568 569
      int32_t index = pTableMetaInfo->vgroupIndex;
      assert(index >= 0);
H
hjxilinx 已提交
570
  
B
Bomin Zhang 已提交
571
      if (pTableMetaInfo->vgroupList->numOfVgroups > 0) {
H
Haojun Liao 已提交
572
        assert(index < pTableMetaInfo->vgroupList->numOfVgroups);
B
Bomin Zhang 已提交
573 574
        pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
      }
H
Haojun Liao 已提交
575
      tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups);
H
hjxilinx 已提交
576 577
    } else {
      pVgroupInfo = &pTableMeta->vgroupInfo;
578
    }
weixin_48148422's avatar
weixin_48148422 已提交
579

580 581 582 583
    assert(pVgroupInfo != NULL);

    tscSetDnodeEpSet(pSql, pVgroupInfo);
    pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
weixin_48148422's avatar
weixin_48148422 已提交
584

585
    STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
586 587 588
    pTableIdInfo->tid = htonl(pTableMeta->id.tid);
    pTableIdInfo->uid = htobe64(pTableMeta->id.uid);
    pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->id.uid, dfltKey));
weixin_48148422's avatar
weixin_48148422 已提交
589

590 591
    pQueryMsg->numOfTables = htonl(1);  // set the number of tables
    pMsg += sizeof(STableIdInfo);
592
  } else { // it is a subquery of the super table query, this EP info is acquired from vgroupInfo
593
    int32_t index = pTableMetaInfo->vgroupIndex;
S
TD-1057  
Shengliang Guan 已提交
594
    int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
595
    assert(index >= 0 && index < numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
596

597
    tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
598

599
    SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
600

dengyihao's avatar
bugfix  
dengyihao 已提交
601
    // set the vgroup info 
602
    tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo);
603 604
    pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
    
S
TD-1057  
Shengliang Guan 已提交
605
    int32_t numOfTables = (int32_t)taosArrayGetSize(pTableIdList->itemList);
606 607 608 609 610 611 612 613 614
    pQueryMsg->numOfTables = htonl(numOfTables);  // set the number of tables
  
    // serialize each table id info
    for(int32_t i = 0; i < numOfTables; ++i) {
      STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i);
      
      STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
      pTableIdInfo->tid = htonl(pItem->tid);
      pTableIdInfo->uid = htobe64(pItem->uid);
B
Bomin Zhang 已提交
615
      pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pItem->uid, dfltKey));
616 617 618 619
      pMsg += sizeof(STableIdInfo);
    }
  }
  
H
Haojun Liao 已提交
620 621
  tscDebug("%p vgId:%d, query on table:%s, tid:%d, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name,
      pTableMeta->id.tid, pTableMeta->id.uid);
H
hjxilinx 已提交
622
  
623 624 625
  return pMsg;
}

626
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
627 628
  SSqlCmd *pCmd = &pSql->cmd;

629
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
630

S
slguan 已提交
631 632
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
H
Haojun Liao 已提交
633
    return TSDB_CODE_TSC_INVALID_SQL;  // todo add test for this
S
slguan 已提交
634
  }
635
  
636
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
637
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
638
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
639 640 641

  size_t numOfSrcCols = taosArrayGetSize(pQueryInfo->colList);
  if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo)) {
S
Shengliang Guan 已提交
642
    tscError("%p illegal value of numOfCols in query msg: %" PRIu64 ", table cols:%d", pSql, (uint64_t)numOfSrcCols,
643 644
        tscGetNumOfColumns(pTableMeta));

H
Haojun Liao 已提交
645
    return TSDB_CODE_TSC_INVALID_SQL;
646
  }
647
  
648
  if (pQueryInfo->interval.interval < 0) {
S
TD-1530  
Shengliang Guan 已提交
649
    tscError("%p illegal value of aggregation time interval in query msg: %" PRId64, pSql, (int64_t)pQueryInfo->interval.interval);
H
Haojun Liao 已提交
650
    return TSDB_CODE_TSC_INVALID_SQL;
651 652 653 654
  }
  
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
H
Haojun Liao 已提交
655
    return TSDB_CODE_TSC_INVALID_SQL;
656
  }
657

658
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
H
hzcheng 已提交
659

S
TD-1057  
Shengliang Guan 已提交
660
  int32_t numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList);
661
  
662
  if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
H
hjxilinx 已提交
663 664
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey);
H
hzcheng 已提交
665
  } else {
H
hjxilinx 已提交
666 667
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.ekey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey);
H
hzcheng 已提交
668 669
  }

670 671
  pQueryMsg->order          = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId     = htons(pQueryInfo->order.orderColId);
H
Haojun Liao 已提交
672
  pQueryMsg->fillType       = htons(pQueryInfo->fillType);
673 674
  pQueryMsg->limit          = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset         = htobe64(pQueryInfo->limit.offset);
S
TD-1057  
Shengliang Guan 已提交
675
  pQueryMsg->numOfCols      = htons((int16_t)taosArrayGetSize(pQueryInfo->colList));
H
Haojun Liao 已提交
676 677
  pQueryMsg->interval.interval = htobe64(pQueryInfo->interval.interval);
  pQueryMsg->interval.sliding  = htobe64(pQueryInfo->interval.sliding);
678 679
  pQueryMsg->interval.offset   = htobe64(pQueryInfo->interval.offset);
  pQueryMsg->interval.intervalUnit = pQueryInfo->interval.intervalUnit;
H
Haojun Liao 已提交
680 681
  pQueryMsg->interval.slidingUnit  = pQueryInfo->interval.slidingUnit;
  pQueryMsg->interval.offsetUnit   = pQueryInfo->interval.offsetUnit;
682
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
683
  pQueryMsg->numOfTags      = htonl(numOfTags);
weixin_48148422's avatar
weixin_48148422 已提交
684
  pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
H
Haojun Liao 已提交
685
  pQueryMsg->queryType      = htonl(pQueryInfo->type);
H
hjxilinx 已提交
686 687
  
  size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
S
TD-1057  
Shengliang Guan 已提交
688
  pQueryMsg->numOfOutput = htons((int16_t)numOfOutput);
H
hzcheng 已提交
689 690

  // set column list ids
691 692
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
H
hjxilinx 已提交
693
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
694
  
695 696 697
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
    SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
698

699
    if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
700
        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
701 702
      tscError("%p tid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
          pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
703 704
               pColSchema->name);

705
      return TSDB_CODE_TSC_INVALID_SQL;
706
    }
H
hzcheng 已提交
707 708 709

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
710
    pQueryMsg->colList[i].type  = htons(pColSchema->type);
S
slguan 已提交
711
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
712

S
slguan 已提交
713 714 715
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
716

S
slguan 已提交
717
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
718
      pFilterMsg->filterstr = htons(pColFilter->filterstr);
S
slguan 已提交
719 720

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
721

722
      if (pColFilter->filterstr) {
S
slguan 已提交
723
        pFilterMsg->len = htobe64(pColFilter->len);
724
        memcpy(pMsg, (void *)pColFilter->pz, (size_t)(pColFilter->len + 1));
S
slguan 已提交
725 726 727 728 729 730 731 732
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
733

S
slguan 已提交
734 735
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
H
Haojun Liao 已提交
736
        return TSDB_CODE_TSC_INVALID_SQL;
S
slguan 已提交
737 738
      }
    }
H
hzcheng 已提交
739 740
  }

H
hjxilinx 已提交
741
  SSqlFuncMsg *pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hjxilinx 已提交
742
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
743
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
744

745
    if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
H
hzcheng 已提交
746
      tscError("%p table schema is not matched with parsed sql", pSql);
H
Haojun Liao 已提交
747
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
748 749
    }

750 751 752
    pSqlFuncExpr->colInfo.colId    = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
    pSqlFuncExpr->colInfo.flag     = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
753

754
    pSqlFuncExpr->functionId  = htons(pExpr->functionId);
H
hzcheng 已提交
755
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
H
hjxilinx 已提交
756
    pMsg += sizeof(SSqlFuncMsg);
H
hzcheng 已提交
757 758

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
H
Haojun Liao 已提交
759
      // todo add log
H
hzcheng 已提交
760 761 762 763 764
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
765
        pMsg += pExpr->param[j].nLen;
H
hzcheng 已提交
766 767 768 769 770
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

H
hjxilinx 已提交
771
    pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hzcheng 已提交
772
  }
773
  
774
  // serialize the table info (sid, uid, tags)
775 776
  pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
  
777
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
778
  if (pGroupbyExpr->numOfGroupCols > 0) {
S
slguan 已提交
779
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
780 781
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
782
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
783 784
      SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
  
S
slguan 已提交
785 786 787
      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

788 789
      *((int16_t *)pMsg) += pCol->colIndex;
      pMsg += sizeof(pCol->colIndex);
S
slguan 已提交
790 791 792

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
793 794 795
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
796 797 798
    }
  }

799
  if (pQueryInfo->fillType != TSDB_FILL_NONE) {
H
hjxilinx 已提交
800
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
801 802
      *((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]);
      pMsg += sizeof(pQueryInfo->fillVal[0]);
S
slguan 已提交
803 804
    }
  }
805 806 807 808 809 810 811 812 813
  
  if (numOfTags != 0) {
    int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
    int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
    int32_t total = numOfTagColumns + numOfColumns;
    
    pSchema = tscGetTableTagSchema(pTableMeta);
    
    for (int32_t i = 0; i < numOfTags; ++i) {
H
hjxilinx 已提交
814
      SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
815 816 817 818
      SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];

      if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
          (pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
819 820
        tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
                 pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, total, numOfTagColumns,
B
Bomin Zhang 已提交
821
                 pCol->colIndex.columnIndex, pColSchema->name);
822

823
        return TSDB_CODE_TSC_INVALID_SQL;
824 825 826 827 828 829 830 831 832 833 834 835
      }
  
      SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
  
      pTagCol->colId = htons(pColSchema->colId);
      pTagCol->bytes = htons(pColSchema->bytes);
      pTagCol->type  = htons(pColSchema->type);
      pTagCol->numOfFilters = 0;
      
      pMsg += sizeof(SColumnInfo);
    }
  }
S
slguan 已提交
836

H
Haojun Liao 已提交
837 838 839 840
  // serialize tag column query condition
  if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
    STagCond* pTagCond = &pQueryInfo->tagCond;
    
841
    SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid);
H
Haojun Liao 已提交
842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857
    if (pCond != NULL && pCond->cond != NULL) {
      pQueryMsg->tagCondLen = htons(pCond->len);
      memcpy(pMsg, pCond->cond, pCond->len);
      
      pMsg += pCond->len;
    }
  }
  
  if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
    *pMsg = 0;
    pMsg++;
  } else {
    strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
    pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
  }

S
slguan 已提交
858
  // compressed ts block
S
TD-1057  
Shengliang Guan 已提交
859
  pQueryMsg->tsOffset = htonl((int32_t)(pMsg - pCmd->payload));
S
slguan 已提交
860 861 862
  int32_t tsLen = 0;
  int32_t numOfBlocks = 0;

863
  if (pQueryInfo->tsBuf != NULL) {
H
Haojun Liao 已提交
864 865
    int32_t vnodeId = htonl(pQueryMsg->head.vgId);
    STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, vnodeId);
866
    assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL);  // this query should not be sent
S
slguan 已提交
867 868

    // todo refactor
B
Bomin Zhang 已提交
869 870 871 872 873
    if (fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET) != 0) {
      int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
      tscError("%p: fseek failed: %s", pSql, tstrerror(code));
      return code;
    }
H
Haojun Liao 已提交
874 875 876

    size_t s = fread(pMsg, 1, pBlockInfo->compLen, pQueryInfo->tsBuf->f);
    if (s != pBlockInfo->compLen) {
B
Bomin Zhang 已提交
877 878 879 880
      int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
      tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
      return code;
    }
S
slguan 已提交
881 882 883 884

    pMsg += pBlockInfo->compLen;
    tsLen = pBlockInfo->compLen;
    numOfBlocks = pBlockInfo->numOfBlocks;
H
hzcheng 已提交
885 886
  }

S
slguan 已提交
887 888
  pQueryMsg->tsLen = htonl(tsLen);
  pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
889 890
  if (pQueryInfo->tsBuf != NULL) {
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
H
hzcheng 已提交
891 892
  }

S
TD-1057  
Shengliang Guan 已提交
893
  int32_t msgLen = (int32_t)(pMsg - pCmd->payload);
H
hzcheng 已提交
894

895
  tscDebug("%p msg built success,len:%d bytes", pSql, msgLen);
H
hzcheng 已提交
896
  pCmd->payloadLen = msgLen;
S
slguan 已提交
897
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hjxilinx 已提交
898
  
899
  pQueryMsg->head.contLen = htonl(msgLen);
S
TD-1057  
Shengliang Guan 已提交
900
  assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize);
901 902

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
903 904
}

905 906
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
907
  pCmd->payloadLen = sizeof(SCMCreateDbMsg);
S
slguan 已提交
908
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
H
hzcheng 已提交
909

910
  SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload;
911

912
  assert(pCmd->numOfClause == 1);
913
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
914
  tstrncpy(pCreateDbMsg->db, pTableMetaInfo->name, sizeof(pCreateDbMsg->db));
H
hzcheng 已提交
915

916
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
917 918
}

919 920
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
921
  pCmd->payloadLen = sizeof(SCMCreateDnodeMsg);
S
slguan 已提交
922 923
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
924
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
925
  }
H
hzcheng 已提交
926

927
  SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload;
J
jtao1735 已提交
928 929
  strncpy(pCreate->ep, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
  
S
slguan 已提交
930
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
H
hzcheng 已提交
931

932
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
933 934
}

935 936
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
937
  pCmd->payloadLen = sizeof(SCMCreateAcctMsg);
S
slguan 已提交
938 939
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
940
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
941
  }
H
hzcheng 已提交
942

943
  SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
944

H
Haojun Liao 已提交
945 946
  SStrToken *pName = &pInfo->pDCLInfo->user.user;
  SStrToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
947

948 949
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
950

951
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
952

953 954 955 956 957 958 959 960
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
961

962 963 964 965 966 967 968 969 970 971 972 973 974
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
975

S
slguan 已提交
976
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
977
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
978 979
}

980 981
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
982
  pCmd->payloadLen = sizeof(SCMCreateUserMsg);
S
slguan 已提交
983

S
slguan 已提交
984 985
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
986
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
987 988
  }

989
  SCMCreateUserMsg *pAlterMsg = (SCMCreateUserMsg*)pCmd->payload;
H
hzcheng 已提交
990

991 992
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
S
TD-1057  
Shengliang Guan 已提交
993
  pAlterMsg->flag = (int8_t)pUser->type;
H
hzcheng 已提交
994

995 996 997 998
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
999 1000
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
1001
  }
H
hzcheng 已提交
1002

1003
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
S
slguan 已提交
1004
    pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
1005
  } else {
S
slguan 已提交
1006
    pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
1007
  }
H
hzcheng 已提交
1008

1009
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1010 1011
}

1012 1013
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1014
  pCmd->payloadLen = sizeof(SCMCfgDnodeMsg);
guanshengliang's avatar
guanshengliang 已提交
1015
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONFIG_DNODE;
1016 1017
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
1018

1019 1020
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1021
  pCmd->payloadLen = sizeof(SCMDropDbMsg);
H
hzcheng 已提交
1022

S
slguan 已提交
1023 1024
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1025
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1026 1027
  }

1028
  SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
1029

1030
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1031
  tstrncpy(pDropDbMsg->db, pTableMetaInfo->name, sizeof(pDropDbMsg->db));
1032
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1033

S
slguan 已提交
1034
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
1035
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1036 1037
}

1038 1039
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1040
  pCmd->payloadLen = sizeof(SCMDropTableMsg);
H
hzcheng 已提交
1041

S
slguan 已提交
1042 1043
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1044
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1045
  }
H
hzcheng 已提交
1046

1047
  SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
1048
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1049
  strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
S
slguan 已提交
1050
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1051

S
slguan 已提交
1052
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
1053
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1054 1055
}

1056
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1057
  SSqlCmd *pCmd = &pSql->cmd;
1058
  pCmd->payloadLen = sizeof(SCMDropDnodeMsg);
S
slguan 已提交
1059 1060
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1061
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1062
  }
H
hzcheng 已提交
1063

1064
  SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
1065
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1066
  tstrncpy(pDrop->ep, pTableMetaInfo->name, sizeof(pDrop->ep));
S
slguan 已提交
1067
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
H
hzcheng 已提交
1068

1069
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1070 1071
}

S
[TD-16]  
slguan 已提交
1072
int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1073
  SSqlCmd *pCmd = &pSql->cmd;
1074
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
S
slguan 已提交
1075
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
H
hzcheng 已提交
1076

S
slguan 已提交
1077 1078
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1079
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1080
  }
H
hzcheng 已提交
1081

1082
  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
1083
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1084
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
H
hzcheng 已提交
1085

1086
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1087 1088
}

S
[TD-16]  
slguan 已提交
1089 1090 1091 1092 1093 1094 1095
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_ACCT;

  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1096
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
[TD-16]  
slguan 已提交
1097 1098 1099 1100
  }

  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1101
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
S
[TD-16]  
slguan 已提交
1102 1103 1104 1105

  return TSDB_CODE_SUCCESS;
}

1106 1107
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1108
  pCmd->payloadLen = sizeof(SCMUseDbMsg);
H
hzcheng 已提交
1109

S
slguan 已提交
1110 1111
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1112
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1113
  }
1114

1115
  SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload;
1116
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1117
  strcpy(pUseDbMsg->db, pTableMetaInfo->name);
S
slguan 已提交
1118
  pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
H
hzcheng 已提交
1119

1120
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1121 1122
}

1123
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1124
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1125
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1126
  pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
1127
  pCmd->payloadLen = sizeof(SCMShowMsg) + 100;
H
hzcheng 已提交
1128

S
slguan 已提交
1129 1130
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1131
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1132
  }
H
hzcheng 已提交
1133

1134
  SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload;
S
slguan 已提交
1135

1136
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1137
  size_t nameLen = strlen(pTableMetaInfo->name);
S
slguan 已提交
1138
  if (nameLen > 0) {
B
Bomin Zhang 已提交
1139
    tstrncpy(pShowMsg->db, pTableMetaInfo->name, sizeof(pShowMsg->db));  // prefix is set here
H
hzcheng 已提交
1140
  } else {
B
Bomin Zhang 已提交
1141
    tstrncpy(pShowMsg->db, pObj->db, sizeof(pShowMsg->db));
H
hzcheng 已提交
1142 1143
  }

1144 1145 1146 1147
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
H
Haojun Liao 已提交
1148
    SStrToken *pPattern = &pShowInfo->pattern;
1149 1150 1151 1152 1153
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
H
Haojun Liao 已提交
1154
    SStrToken *pEpAddr = &pShowInfo->prefix;
dengyihao's avatar
dengyihao 已提交
1155
    assert(pEpAddr->n > 0 && pEpAddr->type > 0);
H
hzcheng 已提交
1156

dengyihao's avatar
dengyihao 已提交
1157 1158
    strncpy(pShowMsg->payload, pEpAddr->z, pEpAddr->n);
    pShowMsg->payloadLen = htons(pEpAddr->n);
1159 1160
  }

1161
  pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
1162
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1163 1164
}

1165
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1166
  SSqlCmd *pCmd = &pSql->cmd;
1167
  pCmd->payloadLen = sizeof(SCMKillQueryMsg);
H
hzcheng 已提交
1168

1169 1170
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
S
slguan 已提交
1171
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
1172 1173
      break;
    case TSDB_SQL_KILL_CONNECTION:
S
slguan 已提交
1174
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
1175 1176
      break;
    case TSDB_SQL_KILL_STREAM:
S
slguan 已提交
1177
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
1178 1179 1180
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1181 1182
}

1183
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1184 1185
  SSqlCmd *pCmd = &(pSql->cmd);

guanshengliang's avatar
guanshengliang 已提交
1186
  int32_t size = minMsgSize() + sizeof(SCMCreateTableMsg);
H
hzcheng 已提交
1187

1188
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1189
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
1190 1191
    size += sizeof(STagData);
  } else {
S
slguan 已提交
1192
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1193
  }
1194

1195 1196 1197
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1198 1199 1200 1201

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1202
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1203
  int              msgLen = 0;
S
slguan 已提交
1204
  SSchema *        pSchema;
H
hzcheng 已提交
1205
  int              size = 0;
1206 1207 1208
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1209
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1210 1211

  // Reallocate the payload size
1212
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1213 1214
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1215
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1216
  }
H
hzcheng 已提交
1217 1218


1219
  SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
H
hjxilinx 已提交
1220
  strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
1221 1222

  // use dbinfo from table id without modifying current db info
H
Haojun Liao 已提交
1223
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pCreateTableMsg->db);
H
hzcheng 已提交
1224

1225 1226 1227
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;
H
hzcheng 已提交
1228 1229 1230 1231
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
S
slguan 已提交
1232
  char *pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
1233

1234 1235
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
1236 1237 1238 1239 1240 1241 1242
    STagData* pTag = &pInfo->pCreateTableInfo->usingInfo.tagdata;
    *(int32_t*)pMsg = htonl(pTag->dataLen);
    pMsg += sizeof(int32_t);
    memcpy(pMsg, pTag->name, sizeof(pTag->name));
    pMsg += sizeof(pTag->name);
    memcpy(pMsg, pTag->data, pTag->dataLen);
    pMsg += pTag->dataLen;
1243
  } else {  // create (super) table
1244
    pSchema = (SSchema *)pCreateTableMsg->schema;
1245

H
hzcheng 已提交
1246
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
H
hjxilinx 已提交
1247
      TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hzcheng 已提交
1248 1249 1250 1251

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
1252

H
hzcheng 已提交
1253 1254 1255 1256
      pSchema++;
    }

    pMsg = (char *)pSchema;
1257 1258
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
1259

1260 1261 1262
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
1263 1264 1265
    }
  }

H
hjxilinx 已提交
1266
  tscFieldInfoClear(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
1267

S
TD-1057  
Shengliang Guan 已提交
1268
  msgLen = (int32_t)(pMsg - (char*)pCreateTableMsg);
S
slguan 已提交
1269
  pCreateTableMsg->contLen = htonl(msgLen);
H
hzcheng 已提交
1270
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1271
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
H
hzcheng 已提交
1272 1273

  assert(msgLen + minMsgSize() <= size);
1274
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1275 1276 1277
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
1278
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
guanshengliang's avatar
guanshengliang 已提交
1279
  return minMsgSize() + sizeof(SCMAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
1280 1281 1282
         TSDB_EXTRA_PAYLOAD_SIZE;
}

1283
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1284 1285
  char *pMsg;
  int   msgLen = 0;
H
hzcheng 已提交
1286

1287
  SSqlCmd    *pCmd = &pSql->cmd;
1288 1289
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
1290
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1291 1292 1293
  
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;
  int size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
1294 1295
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
H
Haojun Liao 已提交
1296
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1297
  }
1298 1299
  
  SCMAlterTableMsg *pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
H
Haojun Liao 已提交
1300
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
1301

H
hjxilinx 已提交
1302
  strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
1303
  pAlterTableMsg->type = htons(pAlterInfo->type);
1304

1305
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
S
slguan 已提交
1306
  SSchema *pSchema = pAlterTableMsg->schema;
1307
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
H
hjxilinx 已提交
1308
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
1309
  
H
hzcheng 已提交
1310 1311 1312 1313 1314 1315 1316
    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;
1317 1318 1319
  pAlterTableMsg->tagValLen = htonl(pAlterInfo->tagData.dataLen);
  memcpy(pMsg, pAlterInfo->tagData.data, pAlterInfo->tagData.dataLen);
  pMsg += pAlterInfo->tagData.dataLen;
H
hzcheng 已提交
1320

S
TD-1057  
Shengliang Guan 已提交
1321
  msgLen = (int32_t)(pMsg - (char*)pAlterTableMsg);
1322

H
hzcheng 已提交
1323
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1324
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
H
hzcheng 已提交
1325 1326

  assert(msgLen + minMsgSize() <= size);
1327

1328
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1329 1330
}

1331 1332 1333 1334
int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
  SSqlCmd* pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL;
  
1335
  SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
1336
  pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
1337

1338 1339
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1340

dengyihao's avatar
dengyihao 已提交
1341
  tscDumpEpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->epSet);
1342

1343 1344 1345
  return TSDB_CODE_SUCCESS;
}

1346
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1347
  SSqlCmd *pCmd = &pSql->cmd;
1348
  pCmd->payloadLen = sizeof(SCMAlterDbMsg);
S
slguan 已提交
1349
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
H
hzcheng 已提交
1350

1351
  SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
1352
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1353
  tstrncpy(pAlterDbMsg->db, pTableMetaInfo->name, sizeof(pAlterDbMsg->db));
H
hzcheng 已提交
1354

1355
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1356 1357
}

1358
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1359
  SSqlCmd *pCmd = &pSql->cmd;
J
jtao1735 已提交
1360
  pCmd->msgType = TSDB_MSG_TYPE_CM_RETRIEVE;
S
slguan 已提交
1361
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
1362

S
slguan 已提交
1363 1364
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1365
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
1366
  }
S
slguan 已提交
1367

S
slguan 已提交
1368 1369 1370 1371
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
1372

1373
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1374 1375
}

1376
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
1377
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1378 1379 1380
    return pRes->code;
  }

H
hjxilinx 已提交
1381
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
H
hjxilinx 已提交
1382
    int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
1383
    pRes->tsrow[i] = (unsigned char*)((char*) pRes->data + offset * pRes->numOfRows);
H
hzcheng 已提交
1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
1398

1399
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1400

H
hzcheng 已提交
1401 1402 1403 1404 1405 1406
  pRes->code = TSDB_CODE_SUCCESS;
  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

1407
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1408
  } else {
S
slguan 已提交
1409
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
1425
  SSqlCmd *       pCmd = &pSql->cmd;
1426
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
1427

H
hjxilinx 已提交
1428
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1429 1430
  
  int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
H
hzcheng 已提交
1431 1432 1433
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
Haojun Liao 已提交
1434 1435 1436
int tscProcessLocalRetrieveRsp(SSqlObj *pSql) {
  int32_t numOfRes = 1;
  pSql->res.completed = true;
H
hzcheng 已提交
1437 1438 1439
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
hjxilinx 已提交
1440
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
H
hzcheng 已提交
1441
  SSqlRes *pRes = &pSql->res;
H
Haojun Liao 已提交
1442
  SSqlCmd* pCmd = &pSql->cmd;
H
hzcheng 已提交
1443

H
Haojun Liao 已提交
1444 1445 1446 1447 1448 1449
  int32_t code = pRes->code;
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return code;
  }

1450
  pRes->code = tscDoLocalMerge(pSql);
H
hjxilinx 已提交
1451
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1452 1453

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
1454
    tscCreateResPointerInfo(pRes, pQueryInfo);
H
hzcheng 已提交
1455 1456 1457
  }

  pRes->row = 0;
1458
  pRes->completed = (pRes->numOfRows == 0);
H
hzcheng 已提交
1459

H
Haojun Liao 已提交
1460
  code = pRes->code;
H
hjxilinx 已提交
1461 1462 1463 1464
  if (pRes->code == TSDB_CODE_SUCCESS) {
    (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
  } else {
    tscQueueAsyncRes(pSql);
H
hzcheng 已提交
1465 1466 1467 1468 1469
  }

  return code;
}

S
slguan 已提交
1470
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
1471

1472
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1473
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1474
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1475
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
1476
  pCmd->payloadLen = sizeof(SCMConnectMsg);
H
hzcheng 已提交
1477

S
slguan 已提交
1478 1479
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1480
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1481 1482
  }

1483
  SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
H
hzcheng 已提交
1484

H
Haojun Liao 已提交
1485
  // TODO refactor full_name
H
hzcheng 已提交
1486 1487 1488
  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
B
Bomin Zhang 已提交
1489 1490 1491
  tstrncpy(pConnect->db, db, sizeof(pConnect->db));
  tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion));
  tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion));
H
hzcheng 已提交
1492

H
Haojun Liao 已提交
1493 1494 1495
  pConnect->pid = htonl(taosGetPId());
  taosGetCurrentAPPName(pConnect->appName, NULL);

1496
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1497 1498
}

H
hjxilinx 已提交
1499
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1500 1501 1502
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
1503
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1504

B
Bomin Zhang 已提交
1505
  SCMTableInfoMsg* pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
H
hjxilinx 已提交
1506
  strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
H
hjxilinx 已提交
1507
  pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
S
slguan 已提交
1508

B
Bomin Zhang 已提交
1509
  char* pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
H
hzcheng 已提交
1510

B
Bomin Zhang 已提交
1511 1512 1513 1514 1515 1516 1517
  size_t len = htonl(pCmd->tagData.dataLen);
  if (pSql->cmd.autoCreated) {
    if (len > 0) {
      len += sizeof(pCmd->tagData.name) + sizeof(pCmd->tagData.dataLen);
      memcpy(pInfoMsg->tags, &pCmd->tagData, len);
      pMsg += len;
    }
H
hzcheng 已提交
1518 1519
  }

S
TD-1057  
Shengliang Guan 已提交
1520
  pCmd->payloadLen = (int32_t)(pMsg - (char*)pInfoMsg);
S
slguan 已提交
1521
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
H
hzcheng 已提交
1522

1523
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1524 1525
}

S
slguan 已提交
1526
/**
1527
 *  multi table meta req pkg format:
1528
 *  | SMgmtHead | SCMMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
1529 1530
 *      no used         4B
 **/
1531
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
guanshengliang's avatar
guanshengliang 已提交
1532
#if 0
S
slguan 已提交
1533 1534 1535 1536 1537
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
H
Haojun Liao 已提交
1538
    if ((tmpData = calloc(1, pCmd->payloadLen + 1)) == NULL) return -1;
S
slguan 已提交
1539 1540 1541 1542 1543
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
H
Haojun Liao 已提交
1544
  memset(pMgmt->db, 0, TSDB_TABLE_FNAME_LEN);  // server don't need the db
S
slguan 已提交
1545

1546
  SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
S
slguan 已提交
1547
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
1548 1549

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
1550
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
1551 1552
  }

S
Shengliang Guan 已提交
1553
  taosTFree(tmpData);
S
slguan 已提交
1554

1555
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1556
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
S
slguan 已提交
1557 1558 1559

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

1560
  tscDebug("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
1561 1562 1563
           pCmd->payloadLen);

  return pCmd->payloadLen;
guanshengliang's avatar
guanshengliang 已提交
1564 1565
#endif
  return 0;  
S
slguan 已提交
1566 1567
}

H
hjxilinx 已提交
1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584
//static UNUSED_FUNC int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
////  const int32_t defaultSize =
////      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
////  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
////
////  int32_t n = 0;
////  size_t size = taosArrayGetSize(pQueryInfo->tagCond.pCond);
////  for (int32_t i = 0; i < size; ++i) {
////    assert(0);
//////    n += strlen(pQueryInfo->tagCond.cond[i].cond);
////  }
////
////  int32_t tagLen = n * TSDB_NCHAR_SIZE;
////  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
////    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
////  }
////
H
Haojun Liao 已提交
1585
////  int32_t joinCondLen = (TSDB_TABLE_FNAME_LEN + sizeof(int16_t)) * 2;
H
hjxilinx 已提交
1586 1587 1588 1589 1590 1591 1592 1593
////  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
////
////  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndex);
////
////  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
////
////  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
//}
H
hzcheng 已提交
1594

H
hjxilinx 已提交
1595 1596
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hjxilinx 已提交
1597 1598 1599 1600 1601 1602 1603 1604 1605 1606
  
  char* pMsg = pCmd->payload;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  
  SCMSTableVgroupMsg *pStableVgroupMsg = (SCMSTableVgroupMsg *) pMsg;
  pStableVgroupMsg->numOfTables = htonl(pQueryInfo->numOfTables);
  pMsg += sizeof(SCMSTableVgroupMsg);
  
  for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
1607 1608 1609
    size_t size = sizeof(pTableMetaInfo->name);
    tstrncpy(pMsg, pTableMetaInfo->name, size);
    pMsg += size;
H
hjxilinx 已提交
1610
  }
H
hjxilinx 已提交
1611 1612

  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
S
TD-1057  
Shengliang Guan 已提交
1613
  pCmd->payloadLen = (int32_t)(pMsg - pCmd->payload);
H
hjxilinx 已提交
1614

1615
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1616 1617
}

1618 1619
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1620 1621
  STscObj *pObj = pSql->pTscObj;

1622
  pthread_mutex_lock(&pObj->mutex);
H
hzcheng 已提交
1623

S
Shengliang Guan 已提交
1624
  int32_t numOfQueries = 2;
H
hzcheng 已提交
1625 1626 1627
  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
    tpSql = tpSql->next;
1628
    numOfQueries++;
H
hzcheng 已提交
1629 1630
  }

S
Shengliang Guan 已提交
1631
  int32_t numOfStreams = 2;
H
hzcheng 已提交
1632 1633 1634
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
    pStream = pStream->next;
1635
    numOfStreams++;
H
hzcheng 已提交
1636 1637
  }

1638
  int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100;
S
slguan 已提交
1639
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
1640
    pthread_mutex_unlock(&pObj->mutex);
S
slguan 已提交
1641
    tscError("%p failed to malloc for heartbeat msg", pSql);
H
Haojun Liao 已提交
1642
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1643
  }
H
hzcheng 已提交
1644

1645
  SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
1646 1647
  pHeartbeat->numOfQueries = numOfQueries;
  pHeartbeat->numOfStreams = numOfStreams;
H
Haojun Liao 已提交
1648 1649 1650 1651

  pHeartbeat->pid = htonl(taosGetPId());
  taosGetCurrentAPPName(pHeartbeat->appName, NULL);

1652
  int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj);
H
hzcheng 已提交
1653 1654 1655 1656

  pthread_mutex_unlock(&pObj->mutex);

  pCmd->payloadLen = msgLen;
S
slguan 已提交
1657
  pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
H
hzcheng 已提交
1658

1659
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1660 1661
}

1662 1663
int tscProcessTableMetaRsp(SSqlObj *pSql) {
  STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
H
hzcheng 已提交
1664

H
Haojun Liao 已提交
1665
  pMetaMsg->tid = htonl(pMetaMsg->tid);
1666
  pMetaMsg->sversion = htons(pMetaMsg->sversion);
1667
  pMetaMsg->tversion = htons(pMetaMsg->tversion);
H
hjxilinx 已提交
1668 1669
  pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
  
1670 1671
  pMetaMsg->uid = htobe64(pMetaMsg->uid);
  pMetaMsg->contLen = htons(pMetaMsg->contLen);
1672
  pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1673

H
Haojun Liao 已提交
1674
  if ((pMetaMsg->tableType != TSDB_SUPER_TABLE) &&
H
Haojun Liao 已提交
1675
      (pMetaMsg->tid <= 0 || pMetaMsg->vgroup.vgId < 2 || pMetaMsg->vgroup.numOfEps <= 0)) {
H
Haojun Liao 已提交
1676
    tscError("invalid value in table numOfEps:%d, vgId:%d tid:%d, name:%s", pMetaMsg->vgroup.numOfEps, pMetaMsg->vgroup.vgId,
H
Haojun Liao 已提交
1677
             pMetaMsg->tid, pMetaMsg->tableId);
1678
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1679 1680
  }

B
Bomin Zhang 已提交
1681
  if (pMetaMsg->numOfTags > TSDB_MAX_TAGS) {
1682
    tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
1683
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1684 1685
  }

1686 1687
  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
1688
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1689 1690
  }

1691 1692
  for (int i = 0; i < pMetaMsg->vgroup.numOfEps; ++i) {
    pMetaMsg->vgroup.epAddr[i].port = htons(pMetaMsg->vgroup.epAddr[i].port);
H
hzcheng 已提交
1693 1694
  }

1695
  SSchema* pSchema = pMetaMsg->schema;
H
hzcheng 已提交
1696

1697
  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
H
hzcheng 已提交
1698 1699 1700
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);
1701 1702 1703 1704 1705

    if (pSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
      assert(i == 0);
    }

H
hjxilinx 已提交
1706
    assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
1707 1708 1709
    pSchema++;
  }

1710 1711
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
1712
  
H
hzcheng 已提交
1713
  // todo add one more function: taosAddDataIfNotExists();
1714
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1715
  assert(pTableMetaInfo->pTableMeta == NULL);
H
hzcheng 已提交
1716

H
Haojun Liao 已提交
1717
  pTableMetaInfo->pTableMeta = (STableMeta *) taosCachePut(tscMetaCache, pTableMetaInfo->name,
1718
      strlen(pTableMetaInfo->name), pTableMeta, size, tsTableMetaKeepTimer * 1000);
H
hjxilinx 已提交
1719
  
1720
  // todo handle out of memory case
1721
  if (pTableMetaInfo->pTableMeta == NULL) {
B
Bomin Zhang 已提交
1722
    free(pTableMeta);
1723
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1724
  }
H
hzcheng 已提交
1725

1726
  tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->id.uid, pTableMeta->id.tid, pTableMetaInfo->name);
1727
  free(pTableMeta);
1728
  
H
hjxilinx 已提交
1729
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1730 1731
}

S
slguan 已提交
1732
/**
1733
 *  multi table meta rsp pkg format:
1734
 *  | STaosRsp | ieType | SCMMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
1735 1736 1737
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
1738
#if 0
S
slguan 已提交
1739 1740 1741 1742 1743
  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
1744
    pSql->res.code = TSDB_CODE_TSC_INVALID_IE;
S
slguan 已提交
1745
    pSql->res.numOfTotal = 0;
1746
    return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1747 1748 1749 1750
  }

  rsp++;

1751
  SCMMultiTableInfoMsg *pInfo = (SCMMultiTableInfoMsg *)rsp;
S
slguan 已提交
1752
  totalNum = htonl(pInfo->numOfTables);
1753
  rsp += sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1754 1755

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
1756
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
1757
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
1758 1759 1760

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
H
hjxilinx 已提交
1761
    pMeta->vgId = htonl(pMeta->vgId);
S
slguan 已提交
1762 1763
    pMeta->uid = htobe64(pMeta->uid);

H
hjxilinx 已提交
1764 1765
    if (pMeta->sid <= 0 || pMeta->vgId < 0) {
      tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
1766
      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
S
slguan 已提交
1767
      pSql->res.numOfTotal = i;
1768
      return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1769 1770
    }

H
hjxilinx 已提交
1771 1772 1773 1774
    //    pMeta->numOfColumns = htons(pMeta->numOfColumns);
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid tag value count:%d", pMeta->numOfTags);
1775
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1776
    //      pSql->res.numOfTotal = i;
1777
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1778 1779 1780 1781
    //    }
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid numOfTags:%d", pMeta->numOfTags);
1782
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1783
    //      pSql->res.numOfTotal = i;
1784
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1785 1786 1787 1788
    //    }
    //
    //    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
    //      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
1789
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1790
    //      pSql->res.numOfTotal = i;
1791
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1792 1793
    //    }
    //
H
hjxilinx 已提交
1794
    //    for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) {
H
hjxilinx 已提交
1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828
    //      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    //    }
    //
    //    pMeta->rowSize = 0;
    //    rsp += sizeof(SMultiTableMeta);
    //    pSchema = (SSchema *)rsp;
    //
    //    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    //    for (int j = 0; j < numOfTotalCols; ++j) {
    //      pSchema->bytes = htons(pSchema->bytes);
    //      pSchema->colId = htons(pSchema->colId);
    //
    //      // ignore the tags length
    //      if (j < pMeta->numOfColumns) {
    //        pMeta->rowSize += pSchema->bytes;
    //      }
    //      pSchema++;
    //    }
    //
    //    rsp += numOfTotalCols * sizeof(SSchema);
    //
    //    int32_t  tagLen = 0;
    //    SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
    //
    //    if (pMeta->tableType == TSDB_CHILD_TABLE) {
    //      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
    //        tagLen += pTagsSchema[j].bytes;
    //      }
    //    }
    //
    //    rsp += tagLen;
    //    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
    //
    //    pMeta->index = 0;
H
Haojun Liao 已提交
1829
    //    (void)taosCachePut(tscMetaCache, pMeta->tableId, (char *)pMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1830
    //  }
S
slguan 已提交
1831
  }
H
hjxilinx 已提交
1832
  
S
slguan 已提交
1833 1834
  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
1835
  tscDebug("%p load multi-metermeta resp from complete num:%d", pSql, pSql->res.numOfTotal);
1836 1837
#endif
  
S
slguan 已提交
1838 1839 1840
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
1841
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
1842
  SSqlRes* pRes = &pSql->res;
H
hjxilinx 已提交
1843
  
H
hjxilinx 已提交
1844
  // NOTE: the order of several table must be preserved.
1845
  SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pRes->pRsp;
H
hjxilinx 已提交
1846 1847
  pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables);
  char* pMsg = pRes->pRsp + sizeof(SCMSTableVgroupRspMsg);
H
hjxilinx 已提交
1848
  
1849 1850 1851
  // master sqlObj locates in param
  SSqlObj* parent = pSql->param;
  assert(parent != NULL);
H
hjxilinx 已提交
1852
  
1853
  SSqlCmd* pCmd = &parent->cmd;
H
hjxilinx 已提交
1854 1855 1856
  for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) {
    STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);

H
Haojun Liao 已提交
1857 1858 1859
    SVgroupsMsg *  pVgroupMsg = (SVgroupsMsg *) pMsg;
    pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups);

H
Haojun Liao 已提交
1860 1861 1862 1863
    size_t size = sizeof(SCMVgroupMsg) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsMsg);

    size_t vgroupsz = sizeof(SCMVgroupInfo) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsInfo);
    pInfo->vgroupList = calloc(1, vgroupsz);
H
hjxilinx 已提交
1864 1865
    assert(pInfo->vgroupList != NULL);

H
Haojun Liao 已提交
1866
    pInfo->vgroupList->numOfVgroups = pVgroupMsg->numOfVgroups;
H
hjxilinx 已提交
1867
    for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
dengyihao's avatar
dengyihao 已提交
1868
      //just init, no need to lock
H
hjxilinx 已提交
1869
      SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
H
Haojun Liao 已提交
1870

H
Haojun Liao 已提交
1871 1872 1873
      SCMVgroupMsg *vmsg = &pVgroupMsg->vgroups[j];
      pVgroups->vgId = htonl(vmsg->vgId);
      pVgroups->numOfEps = vmsg->numOfEps;
H
Haojun Liao 已提交
1874

H
Haojun Liao 已提交
1875
      assert(pVgroups->numOfEps >= 1 && pVgroups->vgId >= 1);
H
hjxilinx 已提交
1876

1877
      for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
H
Haojun Liao 已提交
1878 1879
        pVgroups->epAddr[k].port = htons(vmsg->epAddr[k].port);
        pVgroups->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, tListLen(vmsg->epAddr[k].fqdn));
H
hjxilinx 已提交
1880
      }
1881
    }
1882 1883

    pMsg += size;
H
hjxilinx 已提交
1884 1885
  }
  
S
slguan 已提交
1886
  return pSql->res.code;
H
hzcheng 已提交
1887 1888 1889 1890 1891 1892
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
H
hjxilinx 已提交
1893
  STableMetaMsg * pMetaMsg;
1894
  SCMShowRsp *pShow;
S
slguan 已提交
1895
  SSchema *    pSchema;
H
hzcheng 已提交
1896 1897
  char         key[20];

1898 1899 1900
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
1901
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
1902

H
hjxilinx 已提交
1903
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1904

1905
  pShow = (SCMShowRsp *)pRes->pRsp;
S
slguan 已提交
1906
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
1907 1908
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
1909
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
1910
  pMetaMsg = &(pShow->tableMeta);
H
hzcheng 已提交
1911

H
hjxilinx 已提交
1912
  pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1913

H
hjxilinx 已提交
1914
  pSchema = pMetaMsg->schema;
H
Haojun Liao 已提交
1915
  pMetaMsg->tid = ntohs(pMetaMsg->tid);
H
hjxilinx 已提交
1916
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
hzcheng 已提交
1917 1918 1919 1920
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

1921
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
1922 1923
  strcpy(key + 1, "showlist");

H
Haojun Liao 已提交
1924 1925 1926 1927
  if (pTableMetaInfo->pTableMeta != NULL) {
    taosCacheRelease(tscMetaCache, (void *)&(pTableMetaInfo->pTableMeta), false);
  }

H
hjxilinx 已提交
1928 1929 1930
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
  
H
Haojun Liao 已提交
1931
  pTableMetaInfo->pTableMeta = taosCachePut(tscMetaCache, key, strlen(key), (char *)pTableMeta, size,
1932
      tsTableMetaKeepTimer * 1000);
H
hjxilinx 已提交
1933
  SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
1934

1935 1936 1937 1938
  if (pQueryInfo->colList == NULL) {
    pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
  }
  
H
hjxilinx 已提交
1939 1940
  SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
  
S
slguan 已提交
1941
  SColumnIndex index = {0};
H
hjxilinx 已提交
1942 1943 1944
  pSchema = pMetaMsg->schema;
  
  for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) {
S
slguan 已提交
1945
    index.columnIndex = i;
1946 1947
    tscColumnListInsert(pQueryInfo->colList, &index);
    
H
hjxilinx 已提交
1948
    TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes);
H
Haojun Liao 已提交
1949
    SInternalField* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
H
hjxilinx 已提交
1950
    
H
hjxilinx 已提交
1951
    pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
1952
                     pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false);
H
hzcheng 已提交
1953
  }
H
hjxilinx 已提交
1954 1955
  
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
1956
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
1957
  
S
Shengliang Guan 已提交
1958
  taosTFree(pTableMeta);
H
hzcheng 已提交
1959 1960 1961
  return 0;
}

1962
// TODO multithread problem
1963 1964 1965 1966 1967 1968 1969 1970 1971 1972
static void createHBObj(STscObj* pObj) {
  if (pObj->pHb != NULL) {
    return;
  }

  SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
  if (NULL == pSql) return;

  pSql->fp = tscProcessHeartBeatRsp;

1973 1974 1975 1976 1977 1978
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetailSafely(&pSql->cmd, 0);
  if (pQueryInfo == NULL) {
    pSql->res.code = terrno;
    return;
  }

1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990
  pQueryInfo->command = TSDB_SQL_HB;

  pSql->cmd.command = pQueryInfo->command;
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
    taosTFree(pSql);
    return;
  }

  pSql->param = pObj;
  pSql->pTscObj = pObj;
  pSql->signature = pSql;

1991 1992
  registerSqlObj(pSql);
  tscDebug("%p HB is allocated, pObj:%p", pSql, pObj);
1993

1994
  pObj->pHb = pSql;
1995 1996
}

H
hzcheng 已提交
1997
int tscProcessConnectRsp(SSqlObj *pSql) {
H
Haojun Liao 已提交
1998
  char temp[TSDB_TABLE_FNAME_LEN * 2];
H
hzcheng 已提交
1999 2000 2001
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

2002
  SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
B
Bomin Zhang 已提交
2003
  tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId));  // copy acctId from response
2004 2005
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

B
Bomin Zhang 已提交
2006 2007
  assert(len <= sizeof(pObj->db));
  tstrncpy(pObj->db, temp, sizeof(pObj->db));
H
hjxilinx 已提交
2008
  
dengyihao's avatar
dengyihao 已提交
2009 2010 2011
  if (pConnect->epSet.numOfEps > 0) {
    tscEpSetHtons(&pConnect->epSet);
    tscUpdateMgmtEpSet(&pConnect->epSet);
dengyihao's avatar
dengyihao 已提交
2012
  } 
H
hzcheng 已提交
2013

S
slguan 已提交
2014
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
2015 2016
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
S
Shengliang Guan 已提交
2017
  pObj->connId = htonl(pConnect->connId);
2018 2019

  createHBObj(pObj);
2020
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
H
hzcheng 已提交
2021 2022 2023 2024 2025

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
2026
  STscObj *       pObj = pSql->pTscObj;
2027
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2028

B
Bomin Zhang 已提交
2029
  tstrncpy(pObj->db, pTableMetaInfo->name, sizeof(pObj->db));
H
hzcheng 已提交
2030 2031 2032
  return 0;
}

Y
TD-1039  
yihaoDeng 已提交
2033 2034
int tscProcessDropDbRsp(SSqlObj *pSql) {
  pSql->pTscObj->db[0] = 0;
H
Haojun Liao 已提交
2035
  taosCacheEmpty(tscMetaCache);
H
hzcheng 已提交
2036 2037 2038 2039
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
2040
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2041

H
Haojun Liao 已提交
2042
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
Haojun Liao 已提交
2043
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2044 2045 2046 2047 2048 2049 2050
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
2051 2052
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
2053
   */
2054
  tscDebug("%p force release table meta after drop table:%s", pSql, pTableMetaInfo->name);
H
Haojun Liao 已提交
2055
  taosCacheRelease(tscMetaCache, (void **)&pTableMeta, true);
H
hzcheng 已提交
2056

H
hjxilinx 已提交
2057
  if (pTableMetaInfo->pTableMeta) {
H
Haojun Liao 已提交
2058
    taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2059 2060 2061 2062 2063 2064
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
2065
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2066

H
Haojun Liao 已提交
2067
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2068
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2069 2070 2071
    return 0;
  }

2072
  tscDebug("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
H
Haojun Liao 已提交
2073
  taosCacheRelease(tscMetaCache, (void **)&pTableMeta, true);
H
hzcheng 已提交
2074

H
hjxilinx 已提交
2075
  if (pTableMetaInfo->pTableMeta) {
weixin_48148422's avatar
weixin_48148422 已提交
2076
    bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
H
Haojun Liao 已提交
2077
    taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2078

2079
    if (isSuperTable) {  // if it is a super table, reset whole query cache
2080
      tscDebug("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
H
Haojun Liao 已提交
2081
      taosCacheEmpty(tscMetaCache);
H
hzcheng 已提交
2082 2083 2084 2085 2086 2087 2088 2089 2090 2091
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}
2092 2093 2094
int tscProcessShowCreateRsp(SSqlObj *pSql) {
  return tscLocalResultCommonBuilder(pSql, 1);
}
H
hzcheng 已提交
2095 2096 2097 2098

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

2099
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
2100 2101 2102
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
2103
  pRes->data = NULL;
S
slguan 已提交
2104
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2105 2106 2107
  return 0;
}

H
hjxilinx 已提交
2108
int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
S
slguan 已提交
2109 2110 2111
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
2112
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
2113 2114 2115 2116
  if (pRetrieve == NULL) {
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pRes->code;
  }
H
hzcheng 已提交
2117 2118 2119

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
2120 2121
  pRes->offset    = htobe64(pRetrieve->offset);
  pRes->useconds  = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
2122
  pRes->completed = (pRetrieve->completed == 1);
2123
  pRes->data      = pRetrieve->data;
H
hjxilinx 已提交
2124
  
2125
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2126 2127 2128 2129
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
    return pRes->code;
  }
  
weixin_48148422's avatar
weixin_48148422 已提交
2130
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
2131
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2132
    
H
hjxilinx 已提交
2133
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
H
hjxilinx 已提交
2134 2135
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
2136 2137
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
2138
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
2139
    p += sizeof(int32_t);
S
slguan 已提交
2140
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
2141 2142
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
weixin_48148422's avatar
weixin_48148422 已提交
2143
      p += sizeof(int32_t); // skip tid
weixin_48148422's avatar
weixin_48148422 已提交
2144 2145
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
2146
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
2147
    }
2148 2149
  }

H
hzcheng 已提交
2150
  pRes->row = 0;
2151
  tscDebug("%p numOfRows:%" PRId64 ", offset:%" PRId64 ", complete:%d", pSql, pRes->numOfRows, pRes->offset, pRes->completed);
H
hzcheng 已提交
2152 2153 2154 2155

  return 0;
}

H
hjxilinx 已提交
2156
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
H
hzcheng 已提交
2157

2158
static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
S
slguan 已提交
2159 2160
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
2161
    tscError("%p malloc failed for new sqlobj to get table meta", pSql);
2162
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2163
  }
2164

H
hzcheng 已提交
2165 2166 2167
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
2168

2169
  registerSqlObj(pNew);
H
Haojun Liao 已提交
2170

2171
  tscAddSubqueryInfo(&pNew->cmd);
2172

2173
  SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0);
2174

H
hjxilinx 已提交
2175
  pNew->cmd.autoCreated = pSql->cmd.autoCreated;  // create table if not exists
B
Bomin Zhang 已提交
2176
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) {
2177
    tscError("%p malloc failed for payload to get table meta", pSql);
S
slguan 已提交
2178
    free(pNew);
2179

2180
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2181 2182
  }

H
hjxilinx 已提交
2183
  STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
2184
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
2185

B
Bomin Zhang 已提交
2186
  tstrncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, sizeof(pNewMeterMetaInfo->name));
B
Bomin Zhang 已提交
2187
  memcpy(&pNew->cmd.tagData, &pSql->cmd.tagData, sizeof(pSql->cmd.tagData));
2188
  tscDebug("%p new pSqlObj:%p to get tableMeta, auto create:%d", pSql, pNew, pNew->cmd.autoCreated);
H
hzcheng 已提交
2189

H
hjxilinx 已提交
2190 2191
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
H
hzcheng 已提交
2192

H
hjxilinx 已提交
2193 2194
  int32_t code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2195
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;  // notify upper application that current process need to be terminated
H
hzcheng 已提交
2196 2197 2198 2199 2200
  }

  return code;
}

H
hjxilinx 已提交
2201
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
H
hjxilinx 已提交
2202
  assert(strlen(pTableMetaInfo->name) != 0);
S
slguan 已提交
2203

2204
  // If this STableMetaInfo owns a table meta, release it first
H
hjxilinx 已提交
2205
  if (pTableMetaInfo->pTableMeta != NULL) {
H
Haojun Liao 已提交
2206
    taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), false);
2207 2208
  }
  
H
Haojun Liao 已提交
2209
  pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2210
  if (pTableMetaInfo->pTableMeta != NULL) {
H
hjxilinx 已提交
2211
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
2212
    tscDebug("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
2213
             tinfo.numOfTags, pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2214 2215 2216

    return TSDB_CODE_SUCCESS;
  }
2217 2218
  
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2219 2220
}

H
hjxilinx 已提交
2221
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
H
hjxilinx 已提交
2222
  pSql->cmd.autoCreated = createIfNotExists;
H
hjxilinx 已提交
2223
  return tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
2224 2225 2226
}

/**
H
Haojun Liao 已提交
2227
 * retrieve table meta from mnode, and update the local table meta cache.
H
hzcheng 已提交
2228
 * @param pSql          sql object
B
Bomin Zhang 已提交
2229
 * @param tableIndex    table index
H
hzcheng 已提交
2230 2231
 * @return              status code
 */
B
Bomin Zhang 已提交
2232
int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
H
hzcheng 已提交
2233
  SSqlCmd *pCmd = &pSql->cmd;
2234 2235

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
B
Bomin Zhang 已提交
2236
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
H
hzcheng 已提交
2237

H
Haojun Liao 已提交
2238 2239
  STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
  if (pTableMetaInfo->pTableMeta) {
2240
    tscDebug("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
2241
             tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid, pTableMeta);
H
hzcheng 已提交
2242 2243
  }

H
Haojun Liao 已提交
2244
  taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
H
Haojun Liao 已提交
2245
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2246 2247
}

H
hjxilinx 已提交
2248
static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
2249
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2250
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2251
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2252 2253
    if (pTableMetaInfo->vgroupList == NULL) {
      return false;
S
slguan 已提交
2254 2255
    }
  }
H
hjxilinx 已提交
2256 2257 2258 2259
  
  // all super tables vgroupinfo are retrieved, no need to retrieve vgroup info anymore
  return true;
}
H
hzcheng 已提交
2260

H
hjxilinx 已提交
2261
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
2262
  int      code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hjxilinx 已提交
2263 2264 2265
  SSqlCmd *pCmd = &pSql->cmd;
  
  if (allVgroupInfoRetrieved(pCmd, clauseIndex)) {
H
hzcheng 已提交
2266 2267
    return TSDB_CODE_SUCCESS;
  }
H
hjxilinx 已提交
2268

S
slguan 已提交
2269
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
2270 2271 2272
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

H
hjxilinx 已提交
2273
  pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
H
Haojun Liao 已提交
2274 2275

  // TODO TEST IT
2276 2277
  SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0);
  if (pNewQueryInfo == NULL) {
B
Bomin Zhang 已提交
2278
    tscFreeSqlObj(pNew);
2279 2280
    return code;
  }
2281
  
H
hjxilinx 已提交
2282
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2283
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2284
    STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
H
Haojun Liao 已提交
2285
    STableMeta *pTableMeta = taosCacheAcquireByData(tscMetaCache, pMInfo->pTableMeta);
H
Haojun Liao 已提交
2286
    tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList, pMInfo->pVgroupTables);
S
slguan 已提交
2287 2288 2289 2290 2291 2292
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
2293

2294
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
2295
  registerSqlObj(pNew);
H
Haojun Liao 已提交
2296

2297
  tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
H
hzcheng 已提交
2298

2299 2300 2301 2302
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
  code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
2303
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2304 2305 2306 2307 2308
  }

  return code;
}

2309
void tscInitMsgsFp() {
S
slguan 已提交
2310 2311
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
J
jtao1735 已提交
2312
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;
H
hzcheng 已提交
2313 2314

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
2315
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
2316

2317 2318
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
2319 2320

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
S
[TD-16]  
slguan 已提交
2321
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserMsg;
H
hzcheng 已提交
2322 2323 2324
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
2325
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
2326 2327 2328
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
2329
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
2330
  tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
H
hzcheng 已提交
2331 2332 2333 2334
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
H
hjxilinx 已提交
2335
  tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
H
hjxilinx 已提交
2336
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
S
slguan 已提交
2337
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
2338 2339 2340 2341

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
2342 2343 2344
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
2345 2346

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
H
hjxilinx 已提交
2347
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
H
hzcheng 已提交
2348 2349 2350 2351 2352

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
2353
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
H
hjxilinx 已提交
2354
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
S
slguan 已提交
2355
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
2356 2357

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
H
hjxilinx 已提交
2358
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode;  // rsp handled by same function.
H
hzcheng 已提交
2359
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
2360

H
Haojun Liao 已提交
2361 2362 2363 2364 2365
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB]   = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION]  = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS]  = tscProcessLocalRetrieveRsp;
2366

H
hzcheng 已提交
2367 2368
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

H
hjxilinx 已提交
2369
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp;
H
hzcheng 已提交
2370 2371 2372 2373

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

2374 2375 2376 2377
  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp;
  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp;
  

H
hzcheng 已提交
2378 2379 2380 2381 2382 2383
  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;
}