tscServer.c 81.5 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17
#include "tcache.h"
H
Haojun Liao 已提交
18
#include "tcmdtype.h"
H
hzcheng 已提交
19
#include "trpc.h"
20 21
#include "tscLocalMerge.h"
#include "tscLog.h"
H
hzcheng 已提交
22 23 24 25 26
#include "tscProfile.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "ttimer.h"
dengyihao's avatar
dengyihao 已提交
27
#include "tlockfree.h"
H
hzcheng 已提交
28

29
SRpcCorEpSet  tscMgmtEpSet;
S
slguan 已提交
30

31 32
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
33 34 35
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
36

B
Bomin Zhang 已提交
37
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt);
38 39
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
40

S
slguan 已提交
41
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
Haojun Liao 已提交
42 43 44 45 46 47 48 49
static int32_t getWaitingTimeInterval(int32_t count) {
  int32_t initial = 100; // 100 ms by default
  if (count <= 1) {
    return 0;
  }

  return initial * (2<<(count - 2));
}
H
hzcheng 已提交
50

51
static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
52 53
  assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);

54
  SRpcEpSet* pEpSet = &pSql->epSet;
H
Haojun Liao 已提交
55 56 57 58

  // Issue the query to one of the vnode among a vgroup randomly.
  // change the inUse property would not affect the isUse attribute of STableMeta
  pEpSet->inUse = rand() % pVgroupInfo->numOfEps;
59 60 61

  // apply the FQDN string length check here
  bool hasFqdn = false;
dengyihao's avatar
bugfix  
dengyihao 已提交
62

63 64 65 66
  pEpSet->numOfEps = pVgroupInfo->numOfEps;
  for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
    strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn);
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
67 68 69 70

    if (!hasFqdn) {
      hasFqdn = (strlen(pEpSet->fqdn[i]) > 0);
    }
71
  }
72 73

  assert(hasFqdn);
74
}
75

dengyihao's avatar
dengyihao 已提交
76 77 78 79
static void tscDumpMgmtEpSet(SRpcEpSet *epSet) {
  taosCorBeginRead(&tscMgmtEpSet.version);
  *epSet = tscMgmtEpSet.epSet;
  taosCorEndRead(&tscMgmtEpSet.version);
80
}  
dengyihao's avatar
dengyihao 已提交
81 82
static void tscEpSetHtons(SRpcEpSet *s) {
   for (int32_t i = 0; i < s->numOfEps; i++) {
dengyihao's avatar
fixbug  
dengyihao 已提交
83 84 85
      s->port[i] = htons(s->port[i]);    
   }
} 
dengyihao's avatar
dengyihao 已提交
86 87
bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
   if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
88 89
     return false;
   } 
dengyihao's avatar
dengyihao 已提交
90
   for (int32_t i = 0; i < s1->numOfEps; i++) {
91 92 93 94 95
     if (s1->port[i] != s2->port[i] 
        || strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0)
        return false;
   }
   return true;
S
slguan 已提交
96
}
dengyihao's avatar
dengyihao 已提交
97
void tscUpdateMgmtEpSet(SRpcEpSet *pEpSet) {
98
  // no need to update if equal
dengyihao's avatar
dengyihao 已提交
99 100 101
  taosCorBeginWrite(&tscMgmtEpSet.version);
  tscMgmtEpSet.epSet = *pEpSet;
  taosCorEndWrite(&tscMgmtEpSet.version);
102
}
dengyihao's avatar
dengyihao 已提交
103
static void tscDumpEpSetFromVgroupInfo(SCMCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
104
  if (pVgroupInfo == NULL) { return;}
dengyihao's avatar
bugfix  
dengyihao 已提交
105
  taosCorBeginRead(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
106
  int8_t inUse = pVgroupInfo->inUse;
dengyihao's avatar
dengyihao 已提交
107 108 109
  pEpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0; 
  pEpSet->numOfEps = pVgroupInfo->numOfEps;  
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
Y
yihaoDeng 已提交
110
    tstrncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, sizeof(pEpSet->fqdn[i]));
dengyihao's avatar
dengyihao 已提交
111
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
S
slguan 已提交
112
  }
dengyihao's avatar
bugfix  
dengyihao 已提交
113
  taosCorEndRead(&pVgroupInfo->version);
S
slguan 已提交
114 115
}

dengyihao's avatar
dengyihao 已提交
116
static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
117 118
  SSqlCmd *pCmd = &pObj->cmd;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
dengyihao's avatar
dengyihao 已提交
119
  if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;}
dengyihao's avatar
bugfix  
dengyihao 已提交
120
  SCMCorVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->corVgroupInfo;
dengyihao's avatar
dengyihao 已提交
121

dengyihao's avatar
bugfix  
dengyihao 已提交
122
  taosCorBeginWrite(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
123
  tscDebug("before: Endpoint in use: %d", pVgroupInfo->inUse);
124 125
  pVgroupInfo->inUse = pEpSet->inUse;
  pVgroupInfo->numOfEps = pEpSet->numOfEps;
dengyihao's avatar
dengyihao 已提交
126
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; i++) {
H
Haojun Liao 已提交
127 128
    taosTFree(pVgroupInfo->epAddr[i].fqdn);
    pVgroupInfo->epAddr[i].fqdn = strndup(pEpSet->fqdn[i], tListLen(pEpSet->fqdn[i]));
129
    pVgroupInfo->epAddr[i].port = pEpSet->port[i];
dengyihao's avatar
dengyihao 已提交
130
  }
H
Haojun Liao 已提交
131

dengyihao's avatar
dengyihao 已提交
132
  tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse);
dengyihao's avatar
bugfix  
dengyihao 已提交
133
  taosCorEndWrite(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
134
}
H
Haojun Liao 已提交
135

136 137 138 139
void tscPrintMgmtEp() {
  SRpcEpSet dump;
  tscDumpMgmtEpSet(&dump);
  if (dump.numOfEps <= 0) {
dengyihao's avatar
dengyihao 已提交
140
    tscError("invalid mnode EP list:%d", dump.numOfEps);
S
slguan 已提交
141
  } else {
142
    for (int i = 0; i < dump.numOfEps; ++i) {
143
      tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]);
S
slguan 已提交
144
    }
S
slguan 已提交
145
  }
S
slguan 已提交
146 147
}

H
hzcheng 已提交
148 149 150
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
H
Haojun Liao 已提交
151

H
hzcheng 已提交
152
  if (pObj != pObj->signature) {
H
Haojun Liao 已提交
153
    tscError("heartbeat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
H
hzcheng 已提交
154 155 156
    return;
  }

H
Haojun Liao 已提交
157
  SSqlObj *pSql = tres;
H
hzcheng 已提交
158 159 160
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
161
    SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
dengyihao's avatar
dengyihao 已提交
162 163 164 165
    SRpcEpSet *      epSet = &pRsp->epSet;
    if (epSet->numOfEps > 0) {
      tscEpSetHtons(epSet);
      tscUpdateMgmtEpSet(epSet);
dengyihao's avatar
dengyihao 已提交
166
    } 
S
slguan 已提交
167

S
Shengliang Guan 已提交
168 169
    pSql->pTscObj->connId = htonl(pRsp->connId);

H
hzcheng 已提交
170 171
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
172
      return;
H
hzcheng 已提交
173
    } else {
S
slguan 已提交
174 175
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
176 177
    }
  } else {
H
Haojun Liao 已提交
178
    tscDebug("%p heartbeat failed, code:%s", pObj->pHb, tstrerror(code));
H
hzcheng 已提交
179 180
  }

H
Haojun Liao 已提交
181
  if (pObj->pHb != NULL) {
182
    int32_t waitingDuring = tsShellActivityTimer * 500;
H
Haojun Liao 已提交
183
    tscDebug("%p send heartbeat in %dms", pSql, waitingDuring);
H
Haojun Liao 已提交
184 185 186 187 188

    taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer);
  } else {
    tscDebug("%p start to close tscObj:%p, not send heartbeat again", pSql, pObj);
  }
H
hzcheng 已提交
189 190 191 192
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
193 194 195 196

  int ret = taosAcquireRef(tscRefId, pObj);
  if (ret < 0) {
    tscTrace("%p failed to acquire TSC obj, reason:%s", pObj, tstrerror(ret));
197 198
    return;
  }
H
hzcheng 已提交
199

200
  SSqlObj* pHB = pObj->pHb;
H
hzcheng 已提交
201

H
Haojun Liao 已提交
202
  void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE));
H
Haojun Liao 已提交
203 204
  if (p == NULL) {
    tscWarn("%p HB object has been released already", pHB);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
205
    taosReleaseRef(tscRefId, pObj);
H
Haojun Liao 已提交
206 207 208 209 210
    return;
  }

  assert(*pHB->self == pHB);

211
  pHB->retry = 0;
H
Haojun Liao 已提交
212 213 214 215 216
  int32_t code = tscProcessSql(pHB);
  taosCacheRelease(tscObjCache, (void**) &p, false);

  if (code != TSDB_CODE_SUCCESS) {
    tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
H
hzcheng 已提交
217
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
218 219

  taosReleaseRef(tscRefId, pObj);
H
hzcheng 已提交
220 221 222
}

int tscSendMsgToServer(SSqlObj *pSql) {
223
  STscObj* pObj = pSql->pTscObj;
H
hjxilinx 已提交
224 225 226
  SSqlCmd* pCmd = &pSql->cmd;
  
  char *pMsg = rpcMallocCont(pCmd->payloadLen);
S
slguan 已提交
227
  if (NULL == pMsg) {
228
    tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]);
229
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
230 231
  }

232 233
  // set the mgmt ip list
  if (pSql->cmd.command >= TSDB_SQL_MGMT) {
dengyihao's avatar
dengyihao 已提交
234
    tscDumpMgmtEpSet(&pSql->epSet);
J
jtao1735 已提交
235 236
  }

237
  memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
238

J
jtao1735 已提交
239
  SRpcMsg rpcMsg = {
240 241 242
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
      .contLen = pSql->cmd.payloadLen,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
243 244
      .ahandle = pSql,
      .handle  = &pSql->pRpcCtx,
H
hjxilinx 已提交
245
      .code    = 0
J
jtao1735 已提交
246
  };
H
Haojun Liao 已提交
247

H
Haojun Liao 已提交
248 249 250 251
  // NOTE: the rpc context should be acquired before sending data to server.
  // Otherwise, the pSql object may have been released already during the response function, which is
  // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
  // cause crash.
H
Haojun Liao 已提交
252 253
  rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg);
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
254 255
}

256
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
S
TD-1530  
Shengliang Guan 已提交
257 258
  TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
  void** p = taosCacheAcquireByKey(tscObjCache, &handle, sizeof(TSDB_CACHE_PTR_TYPE));
H
Haojun Liao 已提交
259 260
  if (p == NULL) {
    rpcFreeCont(rpcMsg->pCont);
261 262
    return;
  }
263

H
Haojun Liao 已提交
264 265 266
  SSqlObj* pSql = *p;
  assert(pSql != NULL);

H
Haojun Liao 已提交
267
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
268 269
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
270

H
Haojun Liao 已提交
271
  assert(*pSql->self == pSql);
H
Haojun Liao 已提交
272
  pSql->pRpcCtx = NULL;
H
Haojun Liao 已提交
273

H
Haojun Liao 已提交
274
  if (pObj->signature != pObj) {
275
    tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
H
Haojun Liao 已提交
276

H
Haojun Liao 已提交
277
    taosCacheRelease(tscObjCache, (void**) &p, true);
H
Haojun Liao 已提交
278 279 280 281 282 283
    rpcFreeCont(rpcMsg->pCont);
    return;
  }

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
284
    tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
285
        pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
H
Haojun Liao 已提交
286

H
Haojun Liao 已提交
287 288 289
    void** p1 = p;
    taosCacheRelease(tscObjCache, (void**) &p1, false);

H
Haojun Liao 已提交
290
    taosCacheRelease(tscObjCache, (void**) &p, true);
291
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
292
    return;
H
hzcheng 已提交
293 294
  }

H
Haojun Liao 已提交
295
  if (pEpSet) {
dengyihao's avatar
dengyihao 已提交
296
    if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
H
Haojun Liao 已提交
297 298
      if (pCmd->command < TSDB_SQL_MGMT) {
        tscUpdateVgroupInfo(pSql, pEpSet);
dengyihao's avatar
dengyihao 已提交
299
      } else {
dengyihao's avatar
dengyihao 已提交
300
        tscUpdateMgmtEpSet(pEpSet);
H
Haojun Liao 已提交
301
      }
dengyihao's avatar
dengyihao 已提交
302
    }
J
jtao1735 已提交
303 304
  }

305 306 307 308 309
  int32_t cmd = pCmd->command;
  if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
      (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
       rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
       rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL ||
S
Shengliang Guan 已提交
310
       rpcMsg->code == TSDB_CODE_APP_NOT_READY ||
311 312 313 314 315 316 317
       rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) {
    tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);

    // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
    if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
      pSql->cmd.submitSchema = 1;
    }
318

319 320 321 322
    pSql->res.code = rpcMsg->code;  // keep the previous error code
    if (pSql->retry > pSql->maxRetry) {
      tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
    } else {
H
Haojun Liao 已提交
323
      // wait for a little bit moment and then retry, todo do not sleep in rpc callback thread
H
Haojun Liao 已提交
324 325 326 327 328
      if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
        int32_t duration = getWaitingTimeInterval(pSql->retry);
        taosMsleep(duration);
      }

B
Bomin Zhang 已提交
329
      rpcMsg->code = tscRenewTableMeta(pSql, 0);
330 331 332

      // if there is an error occurring, proceed to the following error handling procedure.
      if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
H
Haojun Liao 已提交
333
        taosCacheRelease(tscObjCache, (void**) &p, false);
334 335
        rpcFreeCont(rpcMsg->pCont);
        return;
H
hzcheng 已提交
336 337
      }
    }
S
slguan 已提交
338
  }
339

H
hzcheng 已提交
340
  pRes->rspLen = 0;
341
  
H
Haojun Liao 已提交
342
  if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
343
    tscDebug("%p query is cancelled, code:%s", pSql, tstrerror(pRes->code));
H
Haojun Liao 已提交
344 345
  } else {
    pRes->code = rpcMsg->code;
H
hzcheng 已提交
346 347
  }

S
slguan 已提交
348
  if (pRes->code == TSDB_CODE_SUCCESS) {
349
    tscDebug("%p reset retry counter to be 0 due to success rsp, old:%d", pSql, pSql->retry);
S
slguan 已提交
350 351 352
    pSql->retry = 0;
  }

353
  if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
354
    assert(rpcMsg->msgType == pCmd->msgType + 1);
355
    pRes->code    = rpcMsg->code;
356
    pRes->rspType = rpcMsg->msgType;
357
    pRes->rspLen  = rpcMsg->contLen;
H
hzcheng 已提交
358

359
    if (pRes->rspLen > 0 && rpcMsg->pCont) {
360 361
      char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
      if (tmp == NULL) {
362
        pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
363 364
      } else {
        pRes->pRsp = tmp;
365
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
366
      }
367 368
    } else {
      pRes->pRsp = NULL;
S
slguan 已提交
369 370
    }

H
hzcheng 已提交
371 372 373 374
    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
375
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
376
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
377 378 379 380 381 382 383
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
384
      tscDebug("%p SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql, sqlCmd[pCmd->command], 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
385
          tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
S
slguan 已提交
386
    } else {
387
      tscDebug("%p SQL cmd:%s, code:%s rspLen:%d", pSql, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
H
hzcheng 已提交
388 389
    }
  }
390
  
H
Haojun Liao 已提交
391
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
392
    rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
Haojun Liao 已提交
393
  }
S
Shengliang Guan 已提交
394

H
Haojun Liao 已提交
395
  bool shouldFree = tscShouldBeFreed(pSql);
396
  if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
S
TD-1057  
Shengliang Guan 已提交
397
    rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
H
Haojun Liao 已提交
398
    (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
H
hzcheng 已提交
399 400
  }

H
Haojun Liao 已提交
401
  void** p1 = p;
H
Haojun Liao 已提交
402
  taosCacheRelease(tscObjCache, (void**) &p1, false);
H
Haojun Liao 已提交
403

H
Haojun Liao 已提交
404 405
  if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it
    taosCacheRelease(tscObjCache, (void **)&p, true);
H
Haojun Liao 已提交
406 407 408
    tscDebug("%p sqlObj is automatically freed", pSql);
  }

409
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
410 411
}

S
slguan 已提交
412 413 414
int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
415

H
hjxilinx 已提交
416 417 418 419 420 421 422
  if (pCmd->command == TSDB_SQL_SELECT ||
      pCmd->command == TSDB_SQL_FETCH ||
      pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT ||
      pCmd->command == TSDB_SQL_CONNECT ||
      pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META ||
H
hjxilinx 已提交
423
      pCmd->command == TSDB_SQL_STABLEVGROUP) {
424 425 426 427 428 429
    pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
  }
  
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return pRes->code;
S
slguan 已提交
430
  }
431

432 433 434
  int32_t code = tscSendMsgToServer(pSql);

  // NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads.
435
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
436
    pRes->code = code;
H
hjxilinx 已提交
437
    tscQueueAsyncRes(pSql);
H
Haojun Liao 已提交
438
    return code;
S
slguan 已提交
439
  }
H
hjxilinx 已提交
440 441
  
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
442 443 444
}

int tscProcessSql(SSqlObj *pSql) {
445
  char    *name = NULL;
446
  SSqlCmd *pCmd = &pSql->cmd;
447
  
448
  SQueryInfo     *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
449
  STableMetaInfo *pTableMetaInfo = NULL;
450
  uint32_t        type = 0;
451

452
  if (pQueryInfo != NULL) {
H
hjxilinx 已提交
453
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
454
    name = (pTableMetaInfo != NULL)? pTableMetaInfo->name:NULL;
455
    type = pQueryInfo->type;
456

H
hjxilinx 已提交
457
    // while numOfTables equals to 0, it must be Heartbeat
H
hjxilinx 已提交
458
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
459
  }
460

461
  tscDebug("%p SQL cmd:%s will be processed, name:%s, type:%d", pSql, sqlCmd[pCmd->command], name, type);
H
hjxilinx 已提交
462
  if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL
H
hjxilinx 已提交
463
    if (pTableMetaInfo == NULL) {
464
      pSql->res.code = TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
465 466
      return pSql->res.code;
    }
H
hjxilinx 已提交
467
  } else if (pCmd->command < TSDB_SQL_LOCAL) {
468
    //pSql->epSet = tscMgmtEpSet;
H
hzcheng 已提交
469 470 471
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }
472
  
S
slguan 已提交
473 474
  return doProcessSql(pSql);
}
H
hzcheng 已提交
475

J
jtao1735 已提交
476
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
477
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
S
slguan 已提交
478
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
S
slguan 已提交
479

H
Haojun Liao 已提交
480
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
S
slguan 已提交
481
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
482

483
  // todo valid the vgroupId at the client side
H
hjxilinx 已提交
484 485
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  
weixin_48148422's avatar
weixin_48148422 已提交
486
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
487
    int32_t vgIndex = pTableMetaInfo->vgroupIndex;
H
Haojun Liao 已提交
488 489 490 491 492 493 494 495 496
    if (pTableMetaInfo->pVgroupTables == NULL) {
      SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList;
      assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);

      pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
      tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex);
    } else {
      int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
      assert(vgIndex >= 0 && vgIndex < numOfVgroups);
H
Haojun Liao 已提交
497

H
Haojun Liao 已提交
498 499 500 501 502
      SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);

      pRetrieveMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId);
      tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex);
    }
503
  } else {
H
hjxilinx 已提交
504
    STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
505
    pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
H
Haojun Liao 已提交
506
    tscDebug("%p build fetch msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId);
507
  }
508 509

  pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg);
J
jtao1735 已提交
510
  pSql->cmd.msgType = TSDB_MSG_TYPE_FETCH;
511 512 513

  pRetrieveMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));

514
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
515 516
}

517
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hjxilinx 已提交
518
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
519
  STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
H
hjxilinx 已提交
520
  
521
  char* pMsg = pSql->cmd.payload;
522 523 524
  
  // NOTE: shell message size should not include SMsgDesc
  int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
525 526
  int32_t vgId = pTableMeta->vgroupInfo.vgId;

527
  SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
528 529
  pMsgDesc->numOfVnodes = htonl(1); // always one vnode

530
  pMsg += sizeof(SMsgDesc);
531
  SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
532

H
hjxilinx 已提交
533
  pShellMsg->header.vgId = htonl(vgId);
H
Haojun Liao 已提交
534
  pShellMsg->header.contLen = htonl(size);      // the length not includes the size of SMsgDesc
535
  pShellMsg->length = pShellMsg->header.contLen;
H
hjxilinx 已提交
536
  
H
Haojun Liao 已提交
537
  pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit);  // number of tables to be inserted
H
hzcheng 已提交
538

H
hjxilinx 已提交
539
  // pSql->cmd.payloadLen is set during copying data into payload
S
slguan 已提交
540
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
dengyihao's avatar
dengyihao 已提交
541
  tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet);
542

543 544
  tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
      pSql->epSet.numOfEps);
545
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
546 547 548
}

/*
549
 * for table query, simply return the size <= 1k
H
hzcheng 已提交
550
 */
551
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
552
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
553
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
554

S
TD-1057  
Shengliang Guan 已提交
555
  int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo));
H
Haojun Liao 已提交
556 557

  size_t  numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
S
TD-1057  
Shengliang Guan 已提交
558
  int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs);
H
Haojun Liao 已提交
559

H
Haojun Liao 已提交
560 561 562 563 564 565 566 567 568 569
  int32_t tsBufSize = (pQueryInfo->tsBuf != NULL) ? pQueryInfo->tsBuf->fileSize : 0;

  int32_t tableSerialize = 0;
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  if (pTableMetaInfo->pVgroupTables != NULL) {
    size_t numOfGroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables);

    int32_t totalTables = 0;
    for (int32_t i = 0; i < numOfGroups; ++i) {
      SVgroupTableInfo *pTableInfo = taosArrayGet(pTableMetaInfo->pVgroupTables, i);
H
Haojun Liao 已提交
570
      totalTables += (int32_t) taosArrayGetSize(pTableInfo->itemList);
H
Haojun Liao 已提交
571 572 573 574
    }

    tableSerialize = totalTables * sizeof(STableIdInfo);
  }
H
Haojun Liao 已提交
575

H
Haojun Liao 已提交
576 577
  return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + tsBufSize +
         tableSerialize + 4096;
H
hzcheng 已提交
578 579
}

580
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
581
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
B
Bomin Zhang 已提交
582
  TSKEY dfltKey = htobe64(pQueryMsg->window.skey);
583

H
hjxilinx 已提交
584
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
weixin_48148422's avatar
weixin_48148422 已提交
585
  if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
586 587
    
    SCMVgroupInfo* pVgroupInfo = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
588
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
589 590
      int32_t index = pTableMetaInfo->vgroupIndex;
      assert(index >= 0);
H
hjxilinx 已提交
591
  
B
Bomin Zhang 已提交
592
      if (pTableMetaInfo->vgroupList->numOfVgroups > 0) {
H
Haojun Liao 已提交
593
        assert(index < pTableMetaInfo->vgroupList->numOfVgroups);
B
Bomin Zhang 已提交
594 595
        pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
      }
H
Haojun Liao 已提交
596
      tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups);
H
hjxilinx 已提交
597 598
    } else {
      pVgroupInfo = &pTableMeta->vgroupInfo;
599
    }
weixin_48148422's avatar
weixin_48148422 已提交
600

601 602 603 604
    assert(pVgroupInfo != NULL);

    tscSetDnodeEpSet(pSql, pVgroupInfo);
    pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
weixin_48148422's avatar
weixin_48148422 已提交
605

606
    STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
607 608 609
    pTableIdInfo->tid = htonl(pTableMeta->id.tid);
    pTableIdInfo->uid = htobe64(pTableMeta->id.uid);
    pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->id.uid, dfltKey));
weixin_48148422's avatar
weixin_48148422 已提交
610

611 612
    pQueryMsg->numOfTables = htonl(1);  // set the number of tables
    pMsg += sizeof(STableIdInfo);
613
  } else { // it is a subquery of the super table query, this EP info is acquired from vgroupInfo
614
    int32_t index = pTableMetaInfo->vgroupIndex;
S
TD-1057  
Shengliang Guan 已提交
615
    int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
616
    assert(index >= 0 && index < numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
617

618
    tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
619

620
    SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
621

dengyihao's avatar
bugfix  
dengyihao 已提交
622
    // set the vgroup info 
623
    tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo);
624 625
    pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
    
S
TD-1057  
Shengliang Guan 已提交
626
    int32_t numOfTables = (int32_t)taosArrayGetSize(pTableIdList->itemList);
627 628 629 630 631 632 633 634 635
    pQueryMsg->numOfTables = htonl(numOfTables);  // set the number of tables
  
    // serialize each table id info
    for(int32_t i = 0; i < numOfTables; ++i) {
      STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i);
      
      STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
      pTableIdInfo->tid = htonl(pItem->tid);
      pTableIdInfo->uid = htobe64(pItem->uid);
B
Bomin Zhang 已提交
636
      pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pItem->uid, dfltKey));
637 638 639 640
      pMsg += sizeof(STableIdInfo);
    }
  }
  
H
Haojun Liao 已提交
641 642
  tscDebug("%p vgId:%d, query on table:%s, tid:%d, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name,
      pTableMeta->id.tid, pTableMeta->id.uid);
H
hjxilinx 已提交
643
  
644 645 646
  return pMsg;
}

647
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
648 649
  SSqlCmd *pCmd = &pSql->cmd;

650
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
651

S
slguan 已提交
652 653
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
H
Haojun Liao 已提交
654
    return TSDB_CODE_TSC_INVALID_SQL;  // todo add test for this
S
slguan 已提交
655
  }
656
  
657
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
658
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
659
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
660 661 662

  size_t numOfSrcCols = taosArrayGetSize(pQueryInfo->colList);
  if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo)) {
S
Shengliang Guan 已提交
663
    tscError("%p illegal value of numOfCols in query msg: %" PRIu64 ", table cols:%d", pSql, (uint64_t)numOfSrcCols,
664 665
        tscGetNumOfColumns(pTableMeta));

H
Haojun Liao 已提交
666
    return TSDB_CODE_TSC_INVALID_SQL;
667
  }
668
  
669
  if (pQueryInfo->interval.interval < 0) {
S
TD-1530  
Shengliang Guan 已提交
670
    tscError("%p illegal value of aggregation time interval in query msg: %" PRId64, pSql, (int64_t)pQueryInfo->interval.interval);
H
Haojun Liao 已提交
671
    return TSDB_CODE_TSC_INVALID_SQL;
672 673 674 675
  }
  
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
H
Haojun Liao 已提交
676
    return TSDB_CODE_TSC_INVALID_SQL;
677
  }
678

679
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
H
hzcheng 已提交
680

S
TD-1057  
Shengliang Guan 已提交
681
  int32_t numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList);
682
  
683
  if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
H
hjxilinx 已提交
684 685
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey);
H
hzcheng 已提交
686
  } else {
H
hjxilinx 已提交
687 688
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.ekey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey);
H
hzcheng 已提交
689 690
  }

691 692
  pQueryMsg->order          = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId     = htons(pQueryInfo->order.orderColId);
H
Haojun Liao 已提交
693
  pQueryMsg->fillType       = htons(pQueryInfo->fillType);
694 695
  pQueryMsg->limit          = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset         = htobe64(pQueryInfo->limit.offset);
S
TD-1057  
Shengliang Guan 已提交
696
  pQueryMsg->numOfCols      = htons((int16_t)taosArrayGetSize(pQueryInfo->colList));
H
Haojun Liao 已提交
697 698
  pQueryMsg->interval.interval = htobe64(pQueryInfo->interval.interval);
  pQueryMsg->interval.sliding  = htobe64(pQueryInfo->interval.sliding);
699 700
  pQueryMsg->interval.offset   = htobe64(pQueryInfo->interval.offset);
  pQueryMsg->interval.intervalUnit = pQueryInfo->interval.intervalUnit;
H
Haojun Liao 已提交
701 702
  pQueryMsg->interval.slidingUnit  = pQueryInfo->interval.slidingUnit;
  pQueryMsg->interval.offsetUnit   = pQueryInfo->interval.offsetUnit;
703
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
704
  pQueryMsg->numOfTags      = htonl(numOfTags);
weixin_48148422's avatar
weixin_48148422 已提交
705
  pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
H
Haojun Liao 已提交
706
  pQueryMsg->queryType      = htonl(pQueryInfo->type);
H
hjxilinx 已提交
707 708
  
  size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
S
TD-1057  
Shengliang Guan 已提交
709
  pQueryMsg->numOfOutput = htons((int16_t)numOfOutput);
H
hzcheng 已提交
710 711

  // set column list ids
712 713
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
H
hjxilinx 已提交
714
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
715
  
716 717 718
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
    SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
719

720
    if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
721
        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
722 723
      tscError("%p tid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
          pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
724 725
               pColSchema->name);

726
      return TSDB_CODE_TSC_INVALID_SQL;
727
    }
H
hzcheng 已提交
728 729 730

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
731
    pQueryMsg->colList[i].type  = htons(pColSchema->type);
S
slguan 已提交
732
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
733

S
slguan 已提交
734 735 736
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
737

S
slguan 已提交
738
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
739
      pFilterMsg->filterstr = htons(pColFilter->filterstr);
S
slguan 已提交
740 741

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
742

743
      if (pColFilter->filterstr) {
S
slguan 已提交
744
        pFilterMsg->len = htobe64(pColFilter->len);
745
        memcpy(pMsg, (void *)pColFilter->pz, (size_t)(pColFilter->len + 1));
S
slguan 已提交
746 747 748 749 750 751 752 753
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
754

S
slguan 已提交
755 756
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
H
Haojun Liao 已提交
757
        return TSDB_CODE_TSC_INVALID_SQL;
S
slguan 已提交
758 759
      }
    }
H
hzcheng 已提交
760 761
  }

H
hjxilinx 已提交
762
  SSqlFuncMsg *pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hjxilinx 已提交
763
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
764
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
765

766
    if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
H
hzcheng 已提交
767
      tscError("%p table schema is not matched with parsed sql", pSql);
H
Haojun Liao 已提交
768
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
769 770
    }

771 772 773
    pSqlFuncExpr->colInfo.colId    = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
    pSqlFuncExpr->colInfo.flag     = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
774

775
    pSqlFuncExpr->functionId  = htons(pExpr->functionId);
H
hzcheng 已提交
776
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
H
hjxilinx 已提交
777
    pMsg += sizeof(SSqlFuncMsg);
H
hzcheng 已提交
778 779

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
H
Haojun Liao 已提交
780
      // todo add log
H
hzcheng 已提交
781 782 783 784 785
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
786
        pMsg += pExpr->param[j].nLen;
H
hzcheng 已提交
787 788 789 790 791
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

H
hjxilinx 已提交
792
    pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hzcheng 已提交
793
  }
794
  
795
  // serialize the table info (sid, uid, tags)
796 797
  pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
  
798
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
799
  if (pGroupbyExpr->numOfGroupCols > 0) {
S
slguan 已提交
800
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
801 802
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
803
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
804 805
      SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
  
S
slguan 已提交
806 807 808
      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

809 810
      *((int16_t *)pMsg) += pCol->colIndex;
      pMsg += sizeof(pCol->colIndex);
S
slguan 已提交
811 812 813

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
814 815 816
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
817 818 819
    }
  }

820
  if (pQueryInfo->fillType != TSDB_FILL_NONE) {
H
hjxilinx 已提交
821
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
822 823
      *((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]);
      pMsg += sizeof(pQueryInfo->fillVal[0]);
S
slguan 已提交
824 825
    }
  }
826 827 828 829 830 831 832 833 834
  
  if (numOfTags != 0) {
    int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
    int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
    int32_t total = numOfTagColumns + numOfColumns;
    
    pSchema = tscGetTableTagSchema(pTableMeta);
    
    for (int32_t i = 0; i < numOfTags; ++i) {
H
hjxilinx 已提交
835
      SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
836 837 838 839
      SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];

      if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
          (pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
840 841
        tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
                 pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, total, numOfTagColumns,
B
Bomin Zhang 已提交
842
                 pCol->colIndex.columnIndex, pColSchema->name);
843

844
        return TSDB_CODE_TSC_INVALID_SQL;
845 846 847 848 849 850 851 852 853 854 855 856
      }
  
      SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
  
      pTagCol->colId = htons(pColSchema->colId);
      pTagCol->bytes = htons(pColSchema->bytes);
      pTagCol->type  = htons(pColSchema->type);
      pTagCol->numOfFilters = 0;
      
      pMsg += sizeof(SColumnInfo);
    }
  }
S
slguan 已提交
857

H
Haojun Liao 已提交
858 859 860 861
  // serialize tag column query condition
  if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
    STagCond* pTagCond = &pQueryInfo->tagCond;
    
862
    SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid);
H
Haojun Liao 已提交
863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878
    if (pCond != NULL && pCond->cond != NULL) {
      pQueryMsg->tagCondLen = htons(pCond->len);
      memcpy(pMsg, pCond->cond, pCond->len);
      
      pMsg += pCond->len;
    }
  }
  
  if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
    *pMsg = 0;
    pMsg++;
  } else {
    strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
    pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
  }

S
slguan 已提交
879
  // compressed ts block
S
TD-1057  
Shengliang Guan 已提交
880
  pQueryMsg->tsOffset = htonl((int32_t)(pMsg - pCmd->payload));
S
slguan 已提交
881

882
  if (pQueryInfo->tsBuf != NULL) {
H
Haojun Liao 已提交
883
    int32_t vnodeId = htonl(pQueryMsg->head.vgId);
884 885
    int32_t code = dumpFileBlockByVnodeId(pQueryInfo->tsBuf, vnodeId, pMsg, &pQueryMsg->tsLen, &pQueryMsg->tsNumOfBlocks);
    if (code != TSDB_CODE_SUCCESS) {
B
Bomin Zhang 已提交
886 887
      return code;
    }
S
slguan 已提交
888

889
    pMsg += pQueryMsg->tsLen;
H
hzcheng 已提交
890

891
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
892 893
    pQueryMsg->tsLen   = htonl(pQueryMsg->tsLen);
    pQueryMsg->tsNumOfBlocks = htonl(pQueryMsg->tsNumOfBlocks);
H
hzcheng 已提交
894 895
  }

S
TD-1057  
Shengliang Guan 已提交
896
  int32_t msgLen = (int32_t)(pMsg - pCmd->payload);
H
hzcheng 已提交
897

898
  tscDebug("%p msg built success,len:%d bytes", pSql, msgLen);
H
hzcheng 已提交
899
  pCmd->payloadLen = msgLen;
S
slguan 已提交
900
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hjxilinx 已提交
901
  
902
  pQueryMsg->head.contLen = htonl(msgLen);
S
TD-1057  
Shengliang Guan 已提交
903
  assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize);
904 905

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
906 907
}

908 909
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
910
  pCmd->payloadLen = sizeof(SCMCreateDbMsg);
S
slguan 已提交
911
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
H
hzcheng 已提交
912

913
  SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload;
914

915
  assert(pCmd->numOfClause == 1);
916
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
917
  tstrncpy(pCreateDbMsg->db, pTableMetaInfo->name, sizeof(pCreateDbMsg->db));
H
hzcheng 已提交
918

919
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
920 921
}

922 923
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
924
  pCmd->payloadLen = sizeof(SCMCreateDnodeMsg);
S
slguan 已提交
925 926
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
927
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
928
  }
H
hzcheng 已提交
929

930
  SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload;
J
jtao1735 已提交
931 932
  strncpy(pCreate->ep, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
  
S
slguan 已提交
933
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
H
hzcheng 已提交
934

935
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
936 937
}

938 939
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
940
  pCmd->payloadLen = sizeof(SCMCreateAcctMsg);
S
slguan 已提交
941 942
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
943
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
944
  }
H
hzcheng 已提交
945

946
  SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
947

H
Haojun Liao 已提交
948 949
  SStrToken *pName = &pInfo->pDCLInfo->user.user;
  SStrToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
950

951 952
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
953

954
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
955

956 957 958 959 960 961 962 963
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
964

965 966 967 968 969 970 971 972 973 974 975 976 977
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
978

S
slguan 已提交
979
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
980
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
981 982
}

983 984
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
985
  pCmd->payloadLen = sizeof(SCMCreateUserMsg);
S
slguan 已提交
986

S
slguan 已提交
987 988
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
989
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
990 991
  }

992
  SCMCreateUserMsg *pAlterMsg = (SCMCreateUserMsg*)pCmd->payload;
H
hzcheng 已提交
993

994 995
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
S
TD-1057  
Shengliang Guan 已提交
996
  pAlterMsg->flag = (int8_t)pUser->type;
H
hzcheng 已提交
997

998 999 1000 1001
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
1002 1003
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
1004
  }
H
hzcheng 已提交
1005

1006
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
S
slguan 已提交
1007
    pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
1008
  } else {
S
slguan 已提交
1009
    pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
1010
  }
H
hzcheng 已提交
1011

1012
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1013 1014
}

1015 1016
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1017
  pCmd->payloadLen = sizeof(SCMCfgDnodeMsg);
guanshengliang's avatar
guanshengliang 已提交
1018
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONFIG_DNODE;
1019 1020
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
1021

1022 1023
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1024
  pCmd->payloadLen = sizeof(SCMDropDbMsg);
H
hzcheng 已提交
1025

S
slguan 已提交
1026 1027
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1028
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1029 1030
  }

1031
  SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
1032

1033
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1034
  tstrncpy(pDropDbMsg->db, pTableMetaInfo->name, sizeof(pDropDbMsg->db));
1035
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1036

S
slguan 已提交
1037
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
1038
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1039 1040
}

1041 1042
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1043
  pCmd->payloadLen = sizeof(SCMDropTableMsg);
H
hzcheng 已提交
1044

S
slguan 已提交
1045 1046
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1047
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1048
  }
H
hzcheng 已提交
1049

1050
  SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
1051
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1052
  strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
S
slguan 已提交
1053
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1054

S
slguan 已提交
1055
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
1056
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1057 1058
}

1059
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1060
  SSqlCmd *pCmd = &pSql->cmd;
1061
  pCmd->payloadLen = sizeof(SCMDropDnodeMsg);
S
slguan 已提交
1062 1063
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1064
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1065
  }
H
hzcheng 已提交
1066

1067
  SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
1068
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1069
  tstrncpy(pDrop->ep, pTableMetaInfo->name, sizeof(pDrop->ep));
S
slguan 已提交
1070
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
H
hzcheng 已提交
1071

1072
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1073 1074
}

S
[TD-16]  
slguan 已提交
1075
int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1076
  SSqlCmd *pCmd = &pSql->cmd;
1077
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
S
slguan 已提交
1078
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
H
hzcheng 已提交
1079

S
slguan 已提交
1080 1081
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1082
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1083
  }
H
hzcheng 已提交
1084

1085
  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
1086
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1087
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
H
hzcheng 已提交
1088

1089
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1090 1091
}

S
[TD-16]  
slguan 已提交
1092 1093 1094 1095 1096 1097 1098
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_ACCT;

  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1099
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
[TD-16]  
slguan 已提交
1100 1101 1102 1103
  }

  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1104
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
S
[TD-16]  
slguan 已提交
1105 1106 1107 1108

  return TSDB_CODE_SUCCESS;
}

1109 1110
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1111
  pCmd->payloadLen = sizeof(SCMUseDbMsg);
H
hzcheng 已提交
1112

S
slguan 已提交
1113 1114
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1115
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1116
  }
1117

1118
  SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload;
1119
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1120
  strcpy(pUseDbMsg->db, pTableMetaInfo->name);
S
slguan 已提交
1121
  pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
H
hzcheng 已提交
1122

1123
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1124 1125
}

1126
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1127
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1128
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1129
  pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
1130
  pCmd->payloadLen = sizeof(SCMShowMsg) + 100;
H
hzcheng 已提交
1131

S
slguan 已提交
1132 1133
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1134
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1135
  }
H
hzcheng 已提交
1136

1137
  SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload;
S
slguan 已提交
1138

1139
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1140
  size_t nameLen = strlen(pTableMetaInfo->name);
S
slguan 已提交
1141
  if (nameLen > 0) {
B
Bomin Zhang 已提交
1142
    tstrncpy(pShowMsg->db, pTableMetaInfo->name, sizeof(pShowMsg->db));  // prefix is set here
H
hzcheng 已提交
1143
  } else {
B
Bomin Zhang 已提交
1144
    tstrncpy(pShowMsg->db, pObj->db, sizeof(pShowMsg->db));
H
hzcheng 已提交
1145 1146
  }

1147 1148 1149 1150
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
H
Haojun Liao 已提交
1151
    SStrToken *pPattern = &pShowInfo->pattern;
1152 1153 1154 1155 1156
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
H
Haojun Liao 已提交
1157
    SStrToken *pEpAddr = &pShowInfo->prefix;
dengyihao's avatar
dengyihao 已提交
1158
    assert(pEpAddr->n > 0 && pEpAddr->type > 0);
H
hzcheng 已提交
1159

dengyihao's avatar
dengyihao 已提交
1160 1161
    strncpy(pShowMsg->payload, pEpAddr->z, pEpAddr->n);
    pShowMsg->payloadLen = htons(pEpAddr->n);
1162 1163
  }

1164
  pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
1165
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1166 1167
}

1168
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1169
  SSqlCmd *pCmd = &pSql->cmd;
1170
  pCmd->payloadLen = sizeof(SCMKillQueryMsg);
H
hzcheng 已提交
1171

1172 1173
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
S
slguan 已提交
1174
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
1175 1176
      break;
    case TSDB_SQL_KILL_CONNECTION:
S
slguan 已提交
1177
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
1178 1179
      break;
    case TSDB_SQL_KILL_STREAM:
S
slguan 已提交
1180
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
1181 1182 1183
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1184 1185
}

1186
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1187 1188
  SSqlCmd *pCmd = &(pSql->cmd);

guanshengliang's avatar
guanshengliang 已提交
1189
  int32_t size = minMsgSize() + sizeof(SCMCreateTableMsg);
H
hzcheng 已提交
1190

1191
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1192
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
1193 1194
    size += sizeof(STagData);
  } else {
S
slguan 已提交
1195
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1196
  }
1197

1198 1199 1200
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1201 1202 1203 1204

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1205
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1206
  int              msgLen = 0;
S
slguan 已提交
1207
  SSchema *        pSchema;
H
hzcheng 已提交
1208
  int              size = 0;
1209 1210 1211
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1212
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1213 1214

  // Reallocate the payload size
1215
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1216 1217
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1218
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1219
  }
H
hzcheng 已提交
1220 1221


1222
  SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
H
hjxilinx 已提交
1223
  strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
1224 1225

  // use dbinfo from table id without modifying current db info
H
Haojun Liao 已提交
1226
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pCreateTableMsg->db);
H
hzcheng 已提交
1227

1228 1229 1230
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;
H
hzcheng 已提交
1231 1232 1233 1234
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
S
slguan 已提交
1235
  char *pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
1236

1237 1238
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
1239 1240 1241 1242 1243 1244 1245
    STagData* pTag = &pInfo->pCreateTableInfo->usingInfo.tagdata;
    *(int32_t*)pMsg = htonl(pTag->dataLen);
    pMsg += sizeof(int32_t);
    memcpy(pMsg, pTag->name, sizeof(pTag->name));
    pMsg += sizeof(pTag->name);
    memcpy(pMsg, pTag->data, pTag->dataLen);
    pMsg += pTag->dataLen;
1246
  } else {  // create (super) table
1247
    pSchema = (SSchema *)pCreateTableMsg->schema;
1248

H
hzcheng 已提交
1249
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
H
hjxilinx 已提交
1250
      TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hzcheng 已提交
1251 1252 1253 1254

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
1255

H
hzcheng 已提交
1256 1257 1258 1259
      pSchema++;
    }

    pMsg = (char *)pSchema;
1260 1261
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
1262

1263 1264 1265
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
1266 1267 1268
    }
  }

H
hjxilinx 已提交
1269
  tscFieldInfoClear(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
1270

S
TD-1057  
Shengliang Guan 已提交
1271
  msgLen = (int32_t)(pMsg - (char*)pCreateTableMsg);
S
slguan 已提交
1272
  pCreateTableMsg->contLen = htonl(msgLen);
H
hzcheng 已提交
1273
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1274
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
H
hzcheng 已提交
1275 1276

  assert(msgLen + minMsgSize() <= size);
1277
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1278 1279 1280
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
1281
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
guanshengliang's avatar
guanshengliang 已提交
1282
  return minMsgSize() + sizeof(SCMAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
1283 1284 1285
         TSDB_EXTRA_PAYLOAD_SIZE;
}

1286
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1287 1288
  char *pMsg;
  int   msgLen = 0;
H
hzcheng 已提交
1289

1290
  SSqlCmd    *pCmd = &pSql->cmd;
1291 1292
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
1293
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1294 1295 1296
  
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;
  int size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
1297 1298
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
H
Haojun Liao 已提交
1299
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1300
  }
1301 1302
  
  SCMAlterTableMsg *pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
H
Haojun Liao 已提交
1303
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
1304

H
hjxilinx 已提交
1305
  strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
1306
  pAlterTableMsg->type = htons(pAlterInfo->type);
1307

1308
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
S
slguan 已提交
1309
  SSchema *pSchema = pAlterTableMsg->schema;
1310
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
H
hjxilinx 已提交
1311
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
1312
  
H
hzcheng 已提交
1313 1314 1315 1316 1317 1318 1319
    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;
1320 1321 1322
  pAlterTableMsg->tagValLen = htonl(pAlterInfo->tagData.dataLen);
  memcpy(pMsg, pAlterInfo->tagData.data, pAlterInfo->tagData.dataLen);
  pMsg += pAlterInfo->tagData.dataLen;
H
hzcheng 已提交
1323

S
TD-1057  
Shengliang Guan 已提交
1324
  msgLen = (int32_t)(pMsg - (char*)pAlterTableMsg);
1325

H
hzcheng 已提交
1326
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1327
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
H
hzcheng 已提交
1328 1329

  assert(msgLen + minMsgSize() <= size);
1330

1331
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1332 1333
}

1334 1335 1336 1337
int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
  SSqlCmd* pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL;
  
1338
  SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
1339
  pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
1340

1341 1342
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1343

dengyihao's avatar
dengyihao 已提交
1344
  tscDumpEpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->epSet);
1345

1346 1347 1348
  return TSDB_CODE_SUCCESS;
}

1349
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1350
  SSqlCmd *pCmd = &pSql->cmd;
1351
  pCmd->payloadLen = sizeof(SCMAlterDbMsg);
S
slguan 已提交
1352
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
H
hzcheng 已提交
1353

1354
  SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
1355
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1356
  tstrncpy(pAlterDbMsg->db, pTableMetaInfo->name, sizeof(pAlterDbMsg->db));
H
hzcheng 已提交
1357

1358
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1359 1360
}

1361
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1362
  SSqlCmd *pCmd = &pSql->cmd;
J
jtao1735 已提交
1363
  pCmd->msgType = TSDB_MSG_TYPE_CM_RETRIEVE;
S
slguan 已提交
1364
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
1365

S
slguan 已提交
1366 1367
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1368
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
1369
  }
S
slguan 已提交
1370

S
slguan 已提交
1371 1372 1373 1374
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
1375

1376
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1377 1378
}

1379
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
1380
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1381 1382 1383
    return pRes->code;
  }

H
hjxilinx 已提交
1384
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
H
hjxilinx 已提交
1385
    int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
1386
    pRes->tsrow[i] = (unsigned char*)((char*) pRes->data + offset * pRes->numOfRows);
H
hzcheng 已提交
1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
1401

1402
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1403

H
hzcheng 已提交
1404 1405 1406 1407 1408 1409
  pRes->code = TSDB_CODE_SUCCESS;
  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

1410
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1411
  } else {
S
slguan 已提交
1412
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
1428
  SSqlCmd *       pCmd = &pSql->cmd;
1429
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
1430

H
hjxilinx 已提交
1431
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1432 1433
  
  int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
H
hzcheng 已提交
1434 1435 1436
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
Haojun Liao 已提交
1437 1438 1439
int tscProcessLocalRetrieveRsp(SSqlObj *pSql) {
  int32_t numOfRes = 1;
  pSql->res.completed = true;
H
hzcheng 已提交
1440 1441 1442
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
hjxilinx 已提交
1443
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
H
hzcheng 已提交
1444
  SSqlRes *pRes = &pSql->res;
H
Haojun Liao 已提交
1445
  SSqlCmd* pCmd = &pSql->cmd;
H
hzcheng 已提交
1446

H
Haojun Liao 已提交
1447 1448 1449 1450 1451 1452
  int32_t code = pRes->code;
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return code;
  }

1453
  pRes->code = tscDoLocalMerge(pSql);
H
hjxilinx 已提交
1454
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1455 1456

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
1457
    tscCreateResPointerInfo(pRes, pQueryInfo);
H
hzcheng 已提交
1458 1459 1460
  }

  pRes->row = 0;
1461
  pRes->completed = (pRes->numOfRows == 0);
H
hzcheng 已提交
1462

H
Haojun Liao 已提交
1463
  code = pRes->code;
H
hjxilinx 已提交
1464 1465 1466 1467
  if (pRes->code == TSDB_CODE_SUCCESS) {
    (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
  } else {
    tscQueueAsyncRes(pSql);
H
hzcheng 已提交
1468 1469 1470 1471 1472
  }

  return code;
}

S
slguan 已提交
1473
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
1474

1475
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1476
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1477
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1478
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
1479
  pCmd->payloadLen = sizeof(SCMConnectMsg);
H
hzcheng 已提交
1480

S
slguan 已提交
1481 1482
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1483
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1484 1485
  }

1486
  SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
H
hzcheng 已提交
1487

H
Haojun Liao 已提交
1488
  // TODO refactor full_name
H
hzcheng 已提交
1489 1490 1491
  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
B
Bomin Zhang 已提交
1492 1493 1494
  tstrncpy(pConnect->db, db, sizeof(pConnect->db));
  tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion));
  tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion));
H
hzcheng 已提交
1495

H
Haojun Liao 已提交
1496 1497 1498
  pConnect->pid = htonl(taosGetPId());
  taosGetCurrentAPPName(pConnect->appName, NULL);

1499
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1500 1501
}

H
hjxilinx 已提交
1502
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1503 1504 1505
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
1506
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1507

B
Bomin Zhang 已提交
1508
  SCMTableInfoMsg* pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
H
hjxilinx 已提交
1509
  strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
H
hjxilinx 已提交
1510
  pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
S
slguan 已提交
1511

B
Bomin Zhang 已提交
1512
  char* pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
H
hzcheng 已提交
1513

B
Bomin Zhang 已提交
1514 1515 1516 1517 1518 1519 1520
  size_t len = htonl(pCmd->tagData.dataLen);
  if (pSql->cmd.autoCreated) {
    if (len > 0) {
      len += sizeof(pCmd->tagData.name) + sizeof(pCmd->tagData.dataLen);
      memcpy(pInfoMsg->tags, &pCmd->tagData, len);
      pMsg += len;
    }
H
hzcheng 已提交
1521 1522
  }

S
TD-1057  
Shengliang Guan 已提交
1523
  pCmd->payloadLen = (int32_t)(pMsg - (char*)pInfoMsg);
S
slguan 已提交
1524
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
H
hzcheng 已提交
1525

1526
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1527 1528
}

S
slguan 已提交
1529
/**
1530
 *  multi table meta req pkg format:
1531
 *  | SMgmtHead | SCMMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
1532 1533
 *      no used         4B
 **/
1534
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
guanshengliang's avatar
guanshengliang 已提交
1535
#if 0
S
slguan 已提交
1536 1537 1538 1539 1540
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
H
Haojun Liao 已提交
1541
    if ((tmpData = calloc(1, pCmd->payloadLen + 1)) == NULL) return -1;
S
slguan 已提交
1542 1543 1544 1545 1546
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
H
Haojun Liao 已提交
1547
  memset(pMgmt->db, 0, TSDB_TABLE_FNAME_LEN);  // server don't need the db
S
slguan 已提交
1548

1549
  SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
S
slguan 已提交
1550
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
1551 1552

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
1553
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
1554 1555
  }

S
Shengliang Guan 已提交
1556
  taosTFree(tmpData);
S
slguan 已提交
1557

1558
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1559
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
S
slguan 已提交
1560 1561 1562

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

1563
  tscDebug("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
1564 1565 1566
           pCmd->payloadLen);

  return pCmd->payloadLen;
guanshengliang's avatar
guanshengliang 已提交
1567 1568
#endif
  return 0;  
S
slguan 已提交
1569 1570
}

H
hjxilinx 已提交
1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587
//static UNUSED_FUNC int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
////  const int32_t defaultSize =
////      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
////  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
////
////  int32_t n = 0;
////  size_t size = taosArrayGetSize(pQueryInfo->tagCond.pCond);
////  for (int32_t i = 0; i < size; ++i) {
////    assert(0);
//////    n += strlen(pQueryInfo->tagCond.cond[i].cond);
////  }
////
////  int32_t tagLen = n * TSDB_NCHAR_SIZE;
////  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
////    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
////  }
////
H
Haojun Liao 已提交
1588
////  int32_t joinCondLen = (TSDB_TABLE_FNAME_LEN + sizeof(int16_t)) * 2;
H
hjxilinx 已提交
1589 1590 1591 1592 1593 1594 1595 1596
////  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
////
////  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndex);
////
////  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
////
////  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
//}
H
hzcheng 已提交
1597

H
hjxilinx 已提交
1598 1599
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hjxilinx 已提交
1600 1601 1602 1603 1604 1605 1606 1607 1608 1609
  
  char* pMsg = pCmd->payload;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  
  SCMSTableVgroupMsg *pStableVgroupMsg = (SCMSTableVgroupMsg *) pMsg;
  pStableVgroupMsg->numOfTables = htonl(pQueryInfo->numOfTables);
  pMsg += sizeof(SCMSTableVgroupMsg);
  
  for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
1610 1611 1612
    size_t size = sizeof(pTableMetaInfo->name);
    tstrncpy(pMsg, pTableMetaInfo->name, size);
    pMsg += size;
H
hjxilinx 已提交
1613
  }
H
hjxilinx 已提交
1614 1615

  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
S
TD-1057  
Shengliang Guan 已提交
1616
  pCmd->payloadLen = (int32_t)(pMsg - pCmd->payload);
H
hjxilinx 已提交
1617

1618
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1619 1620
}

1621 1622
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1623 1624
  STscObj *pObj = pSql->pTscObj;

1625
  pthread_mutex_lock(&pObj->mutex);
H
hzcheng 已提交
1626

S
Shengliang Guan 已提交
1627
  int32_t numOfQueries = 2;
H
hzcheng 已提交
1628 1629 1630
  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
    tpSql = tpSql->next;
1631
    numOfQueries++;
H
hzcheng 已提交
1632 1633
  }

S
Shengliang Guan 已提交
1634
  int32_t numOfStreams = 2;
H
hzcheng 已提交
1635 1636 1637
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
    pStream = pStream->next;
1638
    numOfStreams++;
H
hzcheng 已提交
1639 1640
  }

1641
  int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100;
S
slguan 已提交
1642
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
1643
    pthread_mutex_unlock(&pObj->mutex);
H
Haojun Liao 已提交
1644
    tscError("%p failed to create heartbeat msg", pSql);
H
Haojun Liao 已提交
1645
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1646
  }
H
hzcheng 已提交
1647

H
Haojun Liao 已提交
1648
  // TODO the expired hb and client can not be identified by server till now.
1649
  SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
H
Haojun Liao 已提交
1650 1651
  tstrncpy(pHeartbeat->clientVer, version, tListLen(pHeartbeat->clientVer));

1652 1653
  pHeartbeat->numOfQueries = numOfQueries;
  pHeartbeat->numOfStreams = numOfStreams;
H
Haojun Liao 已提交
1654 1655 1656 1657

  pHeartbeat->pid = htonl(taosGetPId());
  taosGetCurrentAPPName(pHeartbeat->appName, NULL);

1658
  int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj);
H
hzcheng 已提交
1659 1660 1661 1662

  pthread_mutex_unlock(&pObj->mutex);

  pCmd->payloadLen = msgLen;
S
slguan 已提交
1663
  pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
H
hzcheng 已提交
1664

1665
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1666 1667
}

1668 1669
int tscProcessTableMetaRsp(SSqlObj *pSql) {
  STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
H
hzcheng 已提交
1670

H
Haojun Liao 已提交
1671
  pMetaMsg->tid = htonl(pMetaMsg->tid);
1672
  pMetaMsg->sversion = htons(pMetaMsg->sversion);
1673
  pMetaMsg->tversion = htons(pMetaMsg->tversion);
H
hjxilinx 已提交
1674 1675
  pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
  
1676 1677
  pMetaMsg->uid = htobe64(pMetaMsg->uid);
  pMetaMsg->contLen = htons(pMetaMsg->contLen);
1678
  pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1679

H
Haojun Liao 已提交
1680
  if ((pMetaMsg->tableType != TSDB_SUPER_TABLE) &&
H
Haojun Liao 已提交
1681
      (pMetaMsg->tid <= 0 || pMetaMsg->vgroup.vgId < 2 || pMetaMsg->vgroup.numOfEps <= 0)) {
H
Haojun Liao 已提交
1682
    tscError("invalid value in table numOfEps:%d, vgId:%d tid:%d, name:%s", pMetaMsg->vgroup.numOfEps, pMetaMsg->vgroup.vgId,
H
Haojun Liao 已提交
1683
             pMetaMsg->tid, pMetaMsg->tableId);
1684
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1685 1686
  }

B
Bomin Zhang 已提交
1687
  if (pMetaMsg->numOfTags > TSDB_MAX_TAGS) {
1688
    tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
1689
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1690 1691
  }

1692 1693
  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
1694
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1695 1696
  }

1697 1698
  for (int i = 0; i < pMetaMsg->vgroup.numOfEps; ++i) {
    pMetaMsg->vgroup.epAddr[i].port = htons(pMetaMsg->vgroup.epAddr[i].port);
H
hzcheng 已提交
1699 1700
  }

1701
  SSchema* pSchema = pMetaMsg->schema;
H
hzcheng 已提交
1702

1703
  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
H
hzcheng 已提交
1704 1705 1706
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);
1707 1708 1709 1710 1711

    if (pSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
      assert(i == 0);
    }

H
hjxilinx 已提交
1712
    assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
1713 1714 1715
    pSchema++;
  }

1716 1717
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
1718
  
H
hzcheng 已提交
1719
  // todo add one more function: taosAddDataIfNotExists();
1720
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1721
  assert(pTableMetaInfo->pTableMeta == NULL);
H
hzcheng 已提交
1722

H
Haojun Liao 已提交
1723
  pTableMetaInfo->pTableMeta = (STableMeta *) taosCachePut(tscMetaCache, pTableMetaInfo->name,
1724
      strlen(pTableMetaInfo->name), pTableMeta, size, tsTableMetaKeepTimer * 1000);
H
hjxilinx 已提交
1725
  
1726
  // todo handle out of memory case
1727
  if (pTableMetaInfo->pTableMeta == NULL) {
B
Bomin Zhang 已提交
1728
    free(pTableMeta);
1729
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1730
  }
H
hzcheng 已提交
1731

1732
  tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->id.uid, pTableMeta->id.tid, pTableMetaInfo->name);
1733
  free(pTableMeta);
1734
  
H
hjxilinx 已提交
1735
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1736 1737
}

S
slguan 已提交
1738
/**
1739
 *  multi table meta rsp pkg format:
1740
 *  | STaosRsp | ieType | SCMMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
1741 1742 1743
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
1744
#if 0
S
slguan 已提交
1745 1746 1747 1748 1749
  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
1750
    pSql->res.code = TSDB_CODE_TSC_INVALID_IE;
S
slguan 已提交
1751
    pSql->res.numOfTotal = 0;
1752
    return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1753 1754 1755 1756
  }

  rsp++;

1757
  SCMMultiTableInfoMsg *pInfo = (SCMMultiTableInfoMsg *)rsp;
S
slguan 已提交
1758
  totalNum = htonl(pInfo->numOfTables);
1759
  rsp += sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1760 1761

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
1762
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
1763
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
1764 1765 1766

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
H
hjxilinx 已提交
1767
    pMeta->vgId = htonl(pMeta->vgId);
S
slguan 已提交
1768 1769
    pMeta->uid = htobe64(pMeta->uid);

H
hjxilinx 已提交
1770 1771
    if (pMeta->sid <= 0 || pMeta->vgId < 0) {
      tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
1772
      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
S
slguan 已提交
1773
      pSql->res.numOfTotal = i;
1774
      return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1775 1776
    }

H
hjxilinx 已提交
1777 1778 1779 1780
    //    pMeta->numOfColumns = htons(pMeta->numOfColumns);
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid tag value count:%d", pMeta->numOfTags);
1781
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1782
    //      pSql->res.numOfTotal = i;
1783
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1784 1785 1786 1787
    //    }
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid numOfTags:%d", pMeta->numOfTags);
1788
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1789
    //      pSql->res.numOfTotal = i;
1790
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1791 1792 1793 1794
    //    }
    //
    //    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
    //      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
1795
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1796
    //      pSql->res.numOfTotal = i;
1797
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1798 1799
    //    }
    //
H
hjxilinx 已提交
1800
    //    for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) {
H
hjxilinx 已提交
1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834
    //      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    //    }
    //
    //    pMeta->rowSize = 0;
    //    rsp += sizeof(SMultiTableMeta);
    //    pSchema = (SSchema *)rsp;
    //
    //    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    //    for (int j = 0; j < numOfTotalCols; ++j) {
    //      pSchema->bytes = htons(pSchema->bytes);
    //      pSchema->colId = htons(pSchema->colId);
    //
    //      // ignore the tags length
    //      if (j < pMeta->numOfColumns) {
    //        pMeta->rowSize += pSchema->bytes;
    //      }
    //      pSchema++;
    //    }
    //
    //    rsp += numOfTotalCols * sizeof(SSchema);
    //
    //    int32_t  tagLen = 0;
    //    SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
    //
    //    if (pMeta->tableType == TSDB_CHILD_TABLE) {
    //      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
    //        tagLen += pTagsSchema[j].bytes;
    //      }
    //    }
    //
    //    rsp += tagLen;
    //    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
    //
    //    pMeta->index = 0;
H
Haojun Liao 已提交
1835
    //    (void)taosCachePut(tscMetaCache, pMeta->tableId, (char *)pMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1836
    //  }
S
slguan 已提交
1837
  }
H
hjxilinx 已提交
1838
  
S
slguan 已提交
1839 1840
  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
1841
  tscDebug("%p load multi-metermeta resp from complete num:%d", pSql, pSql->res.numOfTotal);
1842 1843
#endif
  
S
slguan 已提交
1844 1845 1846
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
1847
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
1848
  SSqlRes* pRes = &pSql->res;
H
hjxilinx 已提交
1849
  
H
hjxilinx 已提交
1850
  // NOTE: the order of several table must be preserved.
1851
  SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pRes->pRsp;
H
hjxilinx 已提交
1852 1853
  pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables);
  char* pMsg = pRes->pRsp + sizeof(SCMSTableVgroupRspMsg);
H
hjxilinx 已提交
1854
  
1855 1856 1857
  // master sqlObj locates in param
  SSqlObj* parent = pSql->param;
  assert(parent != NULL);
H
hjxilinx 已提交
1858
  
1859
  SSqlCmd* pCmd = &parent->cmd;
H
hjxilinx 已提交
1860 1861 1862
  for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) {
    STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);

H
Haojun Liao 已提交
1863 1864 1865
    SVgroupsMsg *  pVgroupMsg = (SVgroupsMsg *) pMsg;
    pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups);

H
Haojun Liao 已提交
1866 1867 1868 1869
    size_t size = sizeof(SCMVgroupMsg) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsMsg);

    size_t vgroupsz = sizeof(SCMVgroupInfo) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsInfo);
    pInfo->vgroupList = calloc(1, vgroupsz);
H
hjxilinx 已提交
1870 1871
    assert(pInfo->vgroupList != NULL);

H
Haojun Liao 已提交
1872
    pInfo->vgroupList->numOfVgroups = pVgroupMsg->numOfVgroups;
H
hjxilinx 已提交
1873
    for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
dengyihao's avatar
dengyihao 已提交
1874
      //just init, no need to lock
H
hjxilinx 已提交
1875
      SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
H
Haojun Liao 已提交
1876

H
Haojun Liao 已提交
1877 1878 1879
      SCMVgroupMsg *vmsg = &pVgroupMsg->vgroups[j];
      pVgroups->vgId = htonl(vmsg->vgId);
      pVgroups->numOfEps = vmsg->numOfEps;
H
Haojun Liao 已提交
1880

H
Haojun Liao 已提交
1881
      assert(pVgroups->numOfEps >= 1 && pVgroups->vgId >= 1);
H
hjxilinx 已提交
1882

1883
      for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
H
Haojun Liao 已提交
1884 1885
        pVgroups->epAddr[k].port = htons(vmsg->epAddr[k].port);
        pVgroups->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, tListLen(vmsg->epAddr[k].fqdn));
H
hjxilinx 已提交
1886
      }
1887
    }
1888 1889

    pMsg += size;
H
hjxilinx 已提交
1890 1891
  }
  
S
slguan 已提交
1892
  return pSql->res.code;
H
hzcheng 已提交
1893 1894 1895 1896 1897 1898
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
H
hjxilinx 已提交
1899
  STableMetaMsg * pMetaMsg;
1900
  SCMShowRsp *pShow;
S
slguan 已提交
1901
  SSchema *    pSchema;
H
hzcheng 已提交
1902 1903
  char         key[20];

1904 1905 1906
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
1907
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
1908

H
hjxilinx 已提交
1909
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1910

1911
  pShow = (SCMShowRsp *)pRes->pRsp;
S
slguan 已提交
1912
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
1913 1914
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
1915
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
1916
  pMetaMsg = &(pShow->tableMeta);
H
hzcheng 已提交
1917

H
hjxilinx 已提交
1918
  pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1919

H
hjxilinx 已提交
1920
  pSchema = pMetaMsg->schema;
H
Haojun Liao 已提交
1921
  pMetaMsg->tid = ntohs(pMetaMsg->tid);
H
hjxilinx 已提交
1922
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
hzcheng 已提交
1923 1924 1925 1926
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

1927
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
1928 1929
  strcpy(key + 1, "showlist");

H
Haojun Liao 已提交
1930 1931 1932 1933
  if (pTableMetaInfo->pTableMeta != NULL) {
    taosCacheRelease(tscMetaCache, (void *)&(pTableMetaInfo->pTableMeta), false);
  }

H
hjxilinx 已提交
1934 1935 1936
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
  
H
Haojun Liao 已提交
1937
  pTableMetaInfo->pTableMeta = taosCachePut(tscMetaCache, key, strlen(key), (char *)pTableMeta, size,
1938
      tsTableMetaKeepTimer * 1000);
H
hjxilinx 已提交
1939
  SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
1940

1941 1942 1943 1944
  if (pQueryInfo->colList == NULL) {
    pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
  }
  
H
hjxilinx 已提交
1945 1946
  SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
  
S
slguan 已提交
1947
  SColumnIndex index = {0};
H
hjxilinx 已提交
1948 1949 1950
  pSchema = pMetaMsg->schema;
  
  for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) {
S
slguan 已提交
1951
    index.columnIndex = i;
1952 1953
    tscColumnListInsert(pQueryInfo->colList, &index);
    
H
hjxilinx 已提交
1954
    TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes);
H
Haojun Liao 已提交
1955
    SInternalField* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
H
hjxilinx 已提交
1956
    
H
hjxilinx 已提交
1957
    pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
1958
                     pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false);
H
hzcheng 已提交
1959
  }
H
hjxilinx 已提交
1960 1961
  
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
1962
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
1963
  
S
Shengliang Guan 已提交
1964
  taosTFree(pTableMeta);
H
hzcheng 已提交
1965 1966 1967
  return 0;
}

1968
// TODO multithread problem
1969 1970 1971 1972 1973 1974 1975 1976 1977 1978
static void createHBObj(STscObj* pObj) {
  if (pObj->pHb != NULL) {
    return;
  }

  SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
  if (NULL == pSql) return;

  pSql->fp = tscProcessHeartBeatRsp;

1979 1980 1981 1982 1983 1984
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetailSafely(&pSql->cmd, 0);
  if (pQueryInfo == NULL) {
    pSql->res.code = terrno;
    return;
  }

1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996
  pQueryInfo->command = TSDB_SQL_HB;

  pSql->cmd.command = pQueryInfo->command;
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
    taosTFree(pSql);
    return;
  }

  pSql->param = pObj;
  pSql->pTscObj = pObj;
  pSql->signature = pSql;

1997 1998
  registerSqlObj(pSql);
  tscDebug("%p HB is allocated, pObj:%p", pSql, pObj);
1999

2000
  pObj->pHb = pSql;
2001 2002
}

H
hzcheng 已提交
2003 2004 2005 2006
int tscProcessConnectRsp(SSqlObj *pSql) {
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

H
Haojun Liao 已提交
2007 2008
  char temp[TSDB_TABLE_FNAME_LEN * 2] = {0};

2009
  SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
B
Bomin Zhang 已提交
2010
  tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId));  // copy acctId from response
2011 2012
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

B
Bomin Zhang 已提交
2013 2014
  assert(len <= sizeof(pObj->db));
  tstrncpy(pObj->db, temp, sizeof(pObj->db));
H
hjxilinx 已提交
2015
  
dengyihao's avatar
dengyihao 已提交
2016 2017 2018
  if (pConnect->epSet.numOfEps > 0) {
    tscEpSetHtons(&pConnect->epSet);
    tscUpdateMgmtEpSet(&pConnect->epSet);
dengyihao's avatar
dengyihao 已提交
2019
  } 
H
hzcheng 已提交
2020

S
slguan 已提交
2021
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
2022 2023
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
S
Shengliang Guan 已提交
2024
  pObj->connId = htonl(pConnect->connId);
2025 2026

  createHBObj(pObj);
H
Haojun Liao 已提交
2027 2028

  //launch a timer to send heartbeat to maintain the connection and send status to mnode
2029
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
H
hzcheng 已提交
2030 2031 2032 2033 2034

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
2035
  STscObj *       pObj = pSql->pTscObj;
2036
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2037

B
Bomin Zhang 已提交
2038
  tstrncpy(pObj->db, pTableMetaInfo->name, sizeof(pObj->db));
H
hzcheng 已提交
2039 2040 2041
  return 0;
}

Y
TD-1039  
yihaoDeng 已提交
2042 2043
int tscProcessDropDbRsp(SSqlObj *pSql) {
  pSql->pTscObj->db[0] = 0;
H
Haojun Liao 已提交
2044
  taosCacheEmpty(tscMetaCache);
H
hzcheng 已提交
2045 2046 2047 2048
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
2049
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2050

H
Haojun Liao 已提交
2051
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
Haojun Liao 已提交
2052
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2053 2054 2055 2056 2057 2058 2059
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
2060 2061
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
2062
   */
2063
  tscDebug("%p force release table meta after drop table:%s", pSql, pTableMetaInfo->name);
H
Haojun Liao 已提交
2064
  taosCacheRelease(tscMetaCache, (void **)&pTableMeta, true);
H
hzcheng 已提交
2065

H
hjxilinx 已提交
2066
  if (pTableMetaInfo->pTableMeta) {
H
Haojun Liao 已提交
2067
    taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2068 2069 2070 2071 2072 2073
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
2074
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2075

H
Haojun Liao 已提交
2076
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2077
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2078 2079 2080
    return 0;
  }

2081
  tscDebug("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
H
Haojun Liao 已提交
2082
  taosCacheRelease(tscMetaCache, (void **)&pTableMeta, true);
H
hzcheng 已提交
2083

H
hjxilinx 已提交
2084
  if (pTableMetaInfo->pTableMeta) {
weixin_48148422's avatar
weixin_48148422 已提交
2085
    bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
H
Haojun Liao 已提交
2086
    taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2087

2088
    if (isSuperTable) {  // if it is a super table, reset whole query cache
2089
      tscDebug("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
H
Haojun Liao 已提交
2090
      taosCacheEmpty(tscMetaCache);
H
hzcheng 已提交
2091 2092 2093 2094 2095 2096 2097 2098 2099 2100
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}
2101 2102 2103
int tscProcessShowCreateRsp(SSqlObj *pSql) {
  return tscLocalResultCommonBuilder(pSql, 1);
}
H
hzcheng 已提交
2104 2105 2106 2107

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

2108
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
2109 2110 2111
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
2112
  pRes->data = NULL;
S
slguan 已提交
2113
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2114 2115 2116
  return 0;
}

H
hjxilinx 已提交
2117
int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
S
slguan 已提交
2118 2119 2120
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
2121
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
2122 2123 2124 2125
  if (pRetrieve == NULL) {
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pRes->code;
  }
H
hzcheng 已提交
2126 2127 2128

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
2129 2130
  pRes->offset    = htobe64(pRetrieve->offset);
  pRes->useconds  = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
2131
  pRes->completed = (pRetrieve->completed == 1);
2132
  pRes->data      = pRetrieve->data;
H
hjxilinx 已提交
2133
  
2134
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2135 2136 2137 2138
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
    return pRes->code;
  }
  
weixin_48148422's avatar
weixin_48148422 已提交
2139
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
2140
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2141
    
H
hjxilinx 已提交
2142
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
H
hjxilinx 已提交
2143 2144
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
2145 2146
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
2147
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
2148
    p += sizeof(int32_t);
S
slguan 已提交
2149
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
2150 2151
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
weixin_48148422's avatar
weixin_48148422 已提交
2152
      p += sizeof(int32_t); // skip tid
weixin_48148422's avatar
weixin_48148422 已提交
2153 2154
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
2155
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
2156
    }
2157 2158
  }

H
hzcheng 已提交
2159
  pRes->row = 0;
2160
  tscDebug("%p numOfRows:%" PRId64 ", offset:%" PRId64 ", complete:%d", pSql, pRes->numOfRows, pRes->offset, pRes->completed);
H
hzcheng 已提交
2161 2162 2163 2164

  return 0;
}

H
hjxilinx 已提交
2165
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
H
hzcheng 已提交
2166

2167
static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
S
slguan 已提交
2168 2169
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
2170
    tscError("%p malloc failed for new sqlobj to get table meta", pSql);
2171
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2172
  }
2173

H
hzcheng 已提交
2174 2175 2176
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
2177

2178
  registerSqlObj(pNew);
H
Haojun Liao 已提交
2179

2180
  tscAddSubqueryInfo(&pNew->cmd);
2181

2182
  SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0);
2183

H
hjxilinx 已提交
2184
  pNew->cmd.autoCreated = pSql->cmd.autoCreated;  // create table if not exists
B
Bomin Zhang 已提交
2185
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) {
2186
    tscError("%p malloc failed for payload to get table meta", pSql);
S
slguan 已提交
2187
    free(pNew);
2188

2189
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2190 2191
  }

H
hjxilinx 已提交
2192
  STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
2193
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
2194

B
Bomin Zhang 已提交
2195
  tstrncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, sizeof(pNewMeterMetaInfo->name));
B
Bomin Zhang 已提交
2196
  memcpy(&pNew->cmd.tagData, &pSql->cmd.tagData, sizeof(pSql->cmd.tagData));
2197
  tscDebug("%p new pSqlObj:%p to get tableMeta, auto create:%d", pSql, pNew, pNew->cmd.autoCreated);
H
hzcheng 已提交
2198

H
hjxilinx 已提交
2199 2200
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
H
hzcheng 已提交
2201

H
hjxilinx 已提交
2202 2203
  int32_t code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2204
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;  // notify upper application that current process need to be terminated
H
hzcheng 已提交
2205 2206 2207 2208 2209
  }

  return code;
}

H
hjxilinx 已提交
2210
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
H
hjxilinx 已提交
2211
  assert(strlen(pTableMetaInfo->name) != 0);
S
slguan 已提交
2212

2213
  // If this STableMetaInfo owns a table meta, release it first
H
hjxilinx 已提交
2214
  if (pTableMetaInfo->pTableMeta != NULL) {
H
Haojun Liao 已提交
2215
    taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), false);
2216 2217
  }
  
H
Haojun Liao 已提交
2218
  pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2219
  if (pTableMetaInfo->pTableMeta != NULL) {
H
hjxilinx 已提交
2220
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
2221
    tscDebug("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
2222
             tinfo.numOfTags, pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2223 2224 2225

    return TSDB_CODE_SUCCESS;
  }
2226 2227
  
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2228 2229
}

H
hjxilinx 已提交
2230
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
H
hjxilinx 已提交
2231
  pSql->cmd.autoCreated = createIfNotExists;
H
hjxilinx 已提交
2232
  return tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
2233 2234 2235
}

/**
H
Haojun Liao 已提交
2236
 * retrieve table meta from mnode, and update the local table meta cache.
H
hzcheng 已提交
2237
 * @param pSql          sql object
B
Bomin Zhang 已提交
2238
 * @param tableIndex    table index
H
hzcheng 已提交
2239 2240
 * @return              status code
 */
B
Bomin Zhang 已提交
2241
int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
H
hzcheng 已提交
2242
  SSqlCmd *pCmd = &pSql->cmd;
2243 2244

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
B
Bomin Zhang 已提交
2245
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
H
hzcheng 已提交
2246

H
Haojun Liao 已提交
2247 2248
  STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
  if (pTableMetaInfo->pTableMeta) {
2249
    tscDebug("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
2250
             tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid, pTableMeta);
H
hzcheng 已提交
2251 2252
  }

H
Haojun Liao 已提交
2253
  taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
H
Haojun Liao 已提交
2254
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2255 2256
}

H
hjxilinx 已提交
2257
static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
2258
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2259
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2260
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2261 2262
    if (pTableMetaInfo->vgroupList == NULL) {
      return false;
S
slguan 已提交
2263 2264
    }
  }
H
hjxilinx 已提交
2265 2266 2267 2268
  
  // all super tables vgroupinfo are retrieved, no need to retrieve vgroup info anymore
  return true;
}
H
hzcheng 已提交
2269

H
hjxilinx 已提交
2270
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
2271
  int      code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hjxilinx 已提交
2272 2273 2274
  SSqlCmd *pCmd = &pSql->cmd;
  
  if (allVgroupInfoRetrieved(pCmd, clauseIndex)) {
H
hzcheng 已提交
2275 2276
    return TSDB_CODE_SUCCESS;
  }
H
hjxilinx 已提交
2277

S
slguan 已提交
2278
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
2279 2280 2281
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

H
hjxilinx 已提交
2282
  pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
H
Haojun Liao 已提交
2283 2284

  // TODO TEST IT
2285 2286
  SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0);
  if (pNewQueryInfo == NULL) {
B
Bomin Zhang 已提交
2287
    tscFreeSqlObj(pNew);
2288 2289
    return code;
  }
2290
  
H
hjxilinx 已提交
2291
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2292
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2293
    STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
H
Haojun Liao 已提交
2294
    STableMeta *pTableMeta = taosCacheAcquireByData(tscMetaCache, pMInfo->pTableMeta);
H
Haojun Liao 已提交
2295
    tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList, pMInfo->pVgroupTables);
S
slguan 已提交
2296 2297 2298 2299 2300 2301
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
2302

2303
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
2304
  registerSqlObj(pNew);
H
Haojun Liao 已提交
2305

2306
  tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
H
hzcheng 已提交
2307

2308 2309 2310 2311
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
  code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
2312
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2313 2314 2315 2316 2317
  }

  return code;
}

2318
void tscInitMsgsFp() {
S
slguan 已提交
2319 2320
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
J
jtao1735 已提交
2321
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;
H
hzcheng 已提交
2322 2323

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
2324
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
2325

2326 2327
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
2328 2329

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
S
[TD-16]  
slguan 已提交
2330
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserMsg;
H
hzcheng 已提交
2331 2332 2333
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
2334
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
2335 2336 2337
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
2338
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
2339
  tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
H
hzcheng 已提交
2340 2341 2342 2343
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
H
hjxilinx 已提交
2344
  tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
H
hjxilinx 已提交
2345
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
S
slguan 已提交
2346
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
2347 2348 2349 2350

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
2351 2352 2353
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
2354 2355

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
H
hjxilinx 已提交
2356
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
H
hzcheng 已提交
2357 2358 2359 2360 2361

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
2362
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
H
hjxilinx 已提交
2363
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
S
slguan 已提交
2364
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
2365 2366

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
H
hjxilinx 已提交
2367
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode;  // rsp handled by same function.
H
hzcheng 已提交
2368
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
2369

H
Haojun Liao 已提交
2370 2371 2372 2373 2374
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB]   = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION]  = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS]  = tscProcessLocalRetrieveRsp;
2375

H
hzcheng 已提交
2376 2377
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

H
hjxilinx 已提交
2378
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp;
H
hzcheng 已提交
2379 2380 2381 2382

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

2383 2384 2385 2386
  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp;
  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp;
  

H
hzcheng 已提交
2387 2388 2389 2390 2391 2392
  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;
}