tscServer.c 81.2 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17
#include "tcache.h"
H
Haojun Liao 已提交
18
#include "tcmdtype.h"
H
hzcheng 已提交
19
#include "trpc.h"
20 21
#include "tscLocalMerge.h"
#include "tscLog.h"
H
hzcheng 已提交
22 23 24 25 26
#include "tscProfile.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "ttimer.h"
dengyihao's avatar
dengyihao 已提交
27
#include "tlockfree.h"
H
hzcheng 已提交
28

29
SRpcCorEpSet  tscMgmtEpSet;
S
slguan 已提交
30

31 32
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
33 34 35
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
36

B
Bomin Zhang 已提交
37
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt);
38 39
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
40

S
slguan 已提交
41
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
Haojun Liao 已提交
42 43 44 45 46 47 48 49
static int32_t getWaitingTimeInterval(int32_t count) {
  int32_t initial = 100; // 100 ms by default
  if (count <= 1) {
    return 0;
  }

  return initial * (2<<(count - 2));
}
H
hzcheng 已提交
50

51
static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
52 53
  assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);

54
  SRpcEpSet* pEpSet = &pSql->epSet;
H
Haojun Liao 已提交
55 56 57 58

  // Issue the query to one of the vnode among a vgroup randomly.
  // change the inUse property would not affect the isUse attribute of STableMeta
  pEpSet->inUse = rand() % pVgroupInfo->numOfEps;
59 60 61

  // apply the FQDN string length check here
  bool hasFqdn = false;
dengyihao's avatar
bugfix  
dengyihao 已提交
62

63 64 65 66
  pEpSet->numOfEps = pVgroupInfo->numOfEps;
  for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
    strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn);
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
67 68 69 70

    if (!hasFqdn) {
      hasFqdn = (strlen(pEpSet->fqdn[i]) > 0);
    }
71
  }
72 73

  assert(hasFqdn);
74
}
75

dengyihao's avatar
dengyihao 已提交
76 77 78 79
static void tscDumpMgmtEpSet(SRpcEpSet *epSet) {
  taosCorBeginRead(&tscMgmtEpSet.version);
  *epSet = tscMgmtEpSet.epSet;
  taosCorEndRead(&tscMgmtEpSet.version);
80
}  
dengyihao's avatar
dengyihao 已提交
81 82
static void tscEpSetHtons(SRpcEpSet *s) {
   for (int32_t i = 0; i < s->numOfEps; i++) {
dengyihao's avatar
fixbug  
dengyihao 已提交
83 84 85
      s->port[i] = htons(s->port[i]);    
   }
} 
dengyihao's avatar
dengyihao 已提交
86 87
bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
   if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
88 89
     return false;
   } 
dengyihao's avatar
dengyihao 已提交
90
   for (int32_t i = 0; i < s1->numOfEps; i++) {
91 92 93 94 95
     if (s1->port[i] != s2->port[i] 
        || strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0)
        return false;
   }
   return true;
S
slguan 已提交
96
}
dengyihao's avatar
dengyihao 已提交
97
void tscUpdateMgmtEpSet(SRpcEpSet *pEpSet) {
98
  // no need to update if equal
dengyihao's avatar
dengyihao 已提交
99 100 101
  taosCorBeginWrite(&tscMgmtEpSet.version);
  tscMgmtEpSet.epSet = *pEpSet;
  taosCorEndWrite(&tscMgmtEpSet.version);
102
}
dengyihao's avatar
dengyihao 已提交
103
static void tscDumpEpSetFromVgroupInfo(SCMCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
104
  if (pVgroupInfo == NULL) { return;}
dengyihao's avatar
bugfix  
dengyihao 已提交
105
  taosCorBeginRead(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
106
  int8_t inUse = pVgroupInfo->inUse;
dengyihao's avatar
dengyihao 已提交
107 108 109
  pEpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0; 
  pEpSet->numOfEps = pVgroupInfo->numOfEps;  
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
Y
yihaoDeng 已提交
110
    tstrncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, sizeof(pEpSet->fqdn[i]));
dengyihao's avatar
dengyihao 已提交
111
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
S
slguan 已提交
112
  }
dengyihao's avatar
bugfix  
dengyihao 已提交
113
  taosCorEndRead(&pVgroupInfo->version);
S
slguan 已提交
114 115
}

dengyihao's avatar
dengyihao 已提交
116
static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
117 118
  SSqlCmd *pCmd = &pObj->cmd;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
dengyihao's avatar
dengyihao 已提交
119
  if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;}
dengyihao's avatar
bugfix  
dengyihao 已提交
120
  SCMCorVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->corVgroupInfo;
dengyihao's avatar
dengyihao 已提交
121

dengyihao's avatar
bugfix  
dengyihao 已提交
122
  taosCorBeginWrite(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
123
  tscDebug("before: Endpoint in use: %d", pVgroupInfo->inUse);
124 125
  pVgroupInfo->inUse = pEpSet->inUse;
  pVgroupInfo->numOfEps = pEpSet->numOfEps;
dengyihao's avatar
dengyihao 已提交
126
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; i++) {
H
Haojun Liao 已提交
127 128
    taosTFree(pVgroupInfo->epAddr[i].fqdn);
    pVgroupInfo->epAddr[i].fqdn = strndup(pEpSet->fqdn[i], tListLen(pEpSet->fqdn[i]));
129
    pVgroupInfo->epAddr[i].port = pEpSet->port[i];
dengyihao's avatar
dengyihao 已提交
130
  }
H
Haojun Liao 已提交
131

dengyihao's avatar
dengyihao 已提交
132
  tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse);
dengyihao's avatar
bugfix  
dengyihao 已提交
133
  taosCorEndWrite(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
134
}
H
Haojun Liao 已提交
135

136 137 138 139
void tscPrintMgmtEp() {
  SRpcEpSet dump;
  tscDumpMgmtEpSet(&dump);
  if (dump.numOfEps <= 0) {
dengyihao's avatar
dengyihao 已提交
140
    tscError("invalid mnode EP list:%d", dump.numOfEps);
S
slguan 已提交
141
  } else {
142
    for (int i = 0; i < dump.numOfEps; ++i) {
143
      tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]);
S
slguan 已提交
144
    }
S
slguan 已提交
145
  }
S
slguan 已提交
146 147
}

H
hzcheng 已提交
148 149 150
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
H
Haojun Liao 已提交
151

H
hzcheng 已提交
152 153 154 155 156
  if (pObj != pObj->signature) {
    tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
    return;
  }

H
Haojun Liao 已提交
157
  SSqlObj *pSql = tres;
H
hzcheng 已提交
158 159 160
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
161
    SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
dengyihao's avatar
dengyihao 已提交
162 163 164 165
    SRpcEpSet *      epSet = &pRsp->epSet;
    if (epSet->numOfEps > 0) {
      tscEpSetHtons(epSet);
      tscUpdateMgmtEpSet(epSet);
dengyihao's avatar
dengyihao 已提交
166
    } 
S
slguan 已提交
167

S
Shengliang Guan 已提交
168 169
    pSql->pTscObj->connId = htonl(pRsp->connId);

H
hzcheng 已提交
170 171
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
172
      return;
H
hzcheng 已提交
173
    } else {
S
slguan 已提交
174 175
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
176 177
    }
  } else {
H
Haojun Liao 已提交
178
    tscDebug("heartbeat failed, code:%s", tstrerror(code));
H
hzcheng 已提交
179 180
  }

H
Haojun Liao 已提交
181
  if (pObj->pHb != NULL) {
182
    int32_t waitingDuring = tsShellActivityTimer * 500;
183
    tscDebug("%p start heartbeat in %dms", pSql, waitingDuring);
H
Haojun Liao 已提交
184 185 186 187 188

    taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer);
  } else {
    tscDebug("%p start to close tscObj:%p, not send heartbeat again", pSql, pObj);
  }
H
hzcheng 已提交
189 190 191 192
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;
193 194 195
  if (pObj == NULL || pObj->signature != pObj) {
    return;
  }
H
hzcheng 已提交
196

197 198 199
  SSqlObj* pHB = pObj->pHb;
  if (pObj->pTimer != tmrId || pHB == NULL) {
    return;
H
hzcheng 已提交
200 201
  }

H
Haojun Liao 已提交
202
  void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE));
H
Haojun Liao 已提交
203 204 205 206 207 208 209
  if (p == NULL) {
    tscWarn("%p HB object has been released already", pHB);
    return;
  }

  assert(*pHB->self == pHB);

210
  pHB->retry = 0;
H
Haojun Liao 已提交
211 212 213 214 215
  int32_t code = tscProcessSql(pHB);
  taosCacheRelease(tscObjCache, (void**) &p, false);

  if (code != TSDB_CODE_SUCCESS) {
    tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
H
hzcheng 已提交
216 217 218 219
  }
}

int tscSendMsgToServer(SSqlObj *pSql) {
220
  STscObj* pObj = pSql->pTscObj;
H
hjxilinx 已提交
221 222 223
  SSqlCmd* pCmd = &pSql->cmd;
  
  char *pMsg = rpcMallocCont(pCmd->payloadLen);
S
slguan 已提交
224
  if (NULL == pMsg) {
225
    tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]);
226
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
227 228
  }

229 230
  // set the mgmt ip list
  if (pSql->cmd.command >= TSDB_SQL_MGMT) {
dengyihao's avatar
dengyihao 已提交
231
    tscDumpMgmtEpSet(&pSql->epSet);
J
jtao1735 已提交
232 233
  }

234
  memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
235

J
jtao1735 已提交
236
  SRpcMsg rpcMsg = {
237 238 239
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
      .contLen = pSql->cmd.payloadLen,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
240 241
      .ahandle = pSql,
      .handle  = &pSql->pRpcCtx,
H
hjxilinx 已提交
242
      .code    = 0
J
jtao1735 已提交
243
  };
H
Haojun Liao 已提交
244

H
Haojun Liao 已提交
245 246 247 248
  // NOTE: the rpc context should be acquired before sending data to server.
  // Otherwise, the pSql object may have been released already during the response function, which is
  // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
  // cause crash.
H
Haojun Liao 已提交
249 250
  rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg);
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
251 252
}

253
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
S
TD-1530  
Shengliang Guan 已提交
254 255
  TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
  void** p = taosCacheAcquireByKey(tscObjCache, &handle, sizeof(TSDB_CACHE_PTR_TYPE));
H
Haojun Liao 已提交
256 257
  if (p == NULL) {
    rpcFreeCont(rpcMsg->pCont);
258 259
    return;
  }
260

H
Haojun Liao 已提交
261 262 263
  SSqlObj* pSql = *p;
  assert(pSql != NULL);

H
Haojun Liao 已提交
264
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
265 266
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
267

H
Haojun Liao 已提交
268
  assert(*pSql->self == pSql);
H
Haojun Liao 已提交
269
  pSql->pRpcCtx = NULL;
H
Haojun Liao 已提交
270

H
Haojun Liao 已提交
271
  if (pObj->signature != pObj) {
272
    tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
H
Haojun Liao 已提交
273

H
Haojun Liao 已提交
274
    taosCacheRelease(tscObjCache, (void**) &p, true);
H
Haojun Liao 已提交
275 276 277 278 279 280
    rpcFreeCont(rpcMsg->pCont);
    return;
  }

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
281
    tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
282
        pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
H
Haojun Liao 已提交
283

H
Haojun Liao 已提交
284 285 286
    void** p1 = p;
    taosCacheRelease(tscObjCache, (void**) &p1, false);

H
Haojun Liao 已提交
287
    taosCacheRelease(tscObjCache, (void**) &p, true);
288
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
289
    return;
H
hzcheng 已提交
290 291
  }

H
Haojun Liao 已提交
292
  if (pEpSet) {
dengyihao's avatar
dengyihao 已提交
293
    if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
H
Haojun Liao 已提交
294 295
      if (pCmd->command < TSDB_SQL_MGMT) {
        tscUpdateVgroupInfo(pSql, pEpSet);
dengyihao's avatar
dengyihao 已提交
296
      } else {
dengyihao's avatar
dengyihao 已提交
297
        tscUpdateMgmtEpSet(pEpSet);
H
Haojun Liao 已提交
298
      }
dengyihao's avatar
dengyihao 已提交
299
    }
J
jtao1735 已提交
300 301
  }

302 303 304 305 306
  int32_t cmd = pCmd->command;
  if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
      (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
       rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
       rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL ||
S
Shengliang Guan 已提交
307
       rpcMsg->code == TSDB_CODE_APP_NOT_READY ||
308 309 310 311 312 313 314
       rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) {
    tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);

    // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
    if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
      pSql->cmd.submitSchema = 1;
    }
315

316 317 318 319
    pSql->res.code = rpcMsg->code;  // keep the previous error code
    if (pSql->retry > pSql->maxRetry) {
      tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
    } else {
H
Haojun Liao 已提交
320
      // wait for a little bit moment and then retry, todo do not sleep in rpc callback thread
H
Haojun Liao 已提交
321 322 323 324 325
      if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
        int32_t duration = getWaitingTimeInterval(pSql->retry);
        taosMsleep(duration);
      }

B
Bomin Zhang 已提交
326
      rpcMsg->code = tscRenewTableMeta(pSql, 0);
327 328 329

      // if there is an error occurring, proceed to the following error handling procedure.
      if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
H
Haojun Liao 已提交
330
        taosCacheRelease(tscObjCache, (void**) &p, false);
331 332
        rpcFreeCont(rpcMsg->pCont);
        return;
H
hzcheng 已提交
333 334
      }
    }
S
slguan 已提交
335
  }
336

H
hzcheng 已提交
337
  pRes->rspLen = 0;
338
  
H
Haojun Liao 已提交
339
  if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
340
    tscDebug("%p query is cancelled, code:%s", pSql, tstrerror(pRes->code));
H
Haojun Liao 已提交
341 342
  } else {
    pRes->code = rpcMsg->code;
H
hzcheng 已提交
343 344
  }

S
slguan 已提交
345
  if (pRes->code == TSDB_CODE_SUCCESS) {
346
    tscDebug("%p reset retry counter to be 0 due to success rsp, old:%d", pSql, pSql->retry);
S
slguan 已提交
347 348 349
    pSql->retry = 0;
  }

350
  if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
351
    assert(rpcMsg->msgType == pCmd->msgType + 1);
352
    pRes->code    = rpcMsg->code;
353
    pRes->rspType = rpcMsg->msgType;
354
    pRes->rspLen  = rpcMsg->contLen;
H
hzcheng 已提交
355

356
    if (pRes->rspLen > 0 && rpcMsg->pCont) {
357 358
      char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
      if (tmp == NULL) {
359
        pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
360 361
      } else {
        pRes->pRsp = tmp;
362
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
363
      }
364 365
    } else {
      pRes->pRsp = NULL;
S
slguan 已提交
366 367
    }

H
hzcheng 已提交
368 369 370 371
    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
372
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
373
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
374 375 376 377 378 379 380
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
381
      tscDebug("%p SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql, sqlCmd[pCmd->command], 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
382
          tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
S
slguan 已提交
383
    } else {
384
      tscDebug("%p SQL cmd:%s, code:%s rspLen:%d", pSql, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
H
hzcheng 已提交
385 386
    }
  }
387
  
H
Haojun Liao 已提交
388
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
389
    rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
Haojun Liao 已提交
390
  }
S
Shengliang Guan 已提交
391

H
Haojun Liao 已提交
392
  bool shouldFree = tscShouldBeFreed(pSql);
393
  if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
S
TD-1057  
Shengliang Guan 已提交
394
    rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
H
Haojun Liao 已提交
395
    (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
H
hzcheng 已提交
396 397
  }

H
Haojun Liao 已提交
398
  void** p1 = p;
H
Haojun Liao 已提交
399
  taosCacheRelease(tscObjCache, (void**) &p1, false);
H
Haojun Liao 已提交
400

H
Haojun Liao 已提交
401 402
  if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it
    taosCacheRelease(tscObjCache, (void **)&p, true);
H
Haojun Liao 已提交
403 404 405
    tscDebug("%p sqlObj is automatically freed", pSql);
  }

406
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
407 408
}

S
slguan 已提交
409 410 411
int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
412

H
hjxilinx 已提交
413 414 415 416 417 418 419
  if (pCmd->command == TSDB_SQL_SELECT ||
      pCmd->command == TSDB_SQL_FETCH ||
      pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT ||
      pCmd->command == TSDB_SQL_CONNECT ||
      pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META ||
H
hjxilinx 已提交
420
      pCmd->command == TSDB_SQL_STABLEVGROUP) {
421 422 423 424 425 426
    pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
  }
  
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return pRes->code;
S
slguan 已提交
427
  }
428

429 430 431
  int32_t code = tscSendMsgToServer(pSql);

  // NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads.
432
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
433
    pRes->code = code;
H
hjxilinx 已提交
434
    tscQueueAsyncRes(pSql);
H
Haojun Liao 已提交
435
    return code;
S
slguan 已提交
436
  }
H
hjxilinx 已提交
437 438
  
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
439 440 441
}

int tscProcessSql(SSqlObj *pSql) {
442
  char    *name = NULL;
443
  SSqlCmd *pCmd = &pSql->cmd;
444
  
445
  SQueryInfo     *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
446
  STableMetaInfo *pTableMetaInfo = NULL;
447
  uint32_t        type = 0;
448

449
  if (pQueryInfo != NULL) {
H
hjxilinx 已提交
450
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
451
    name = (pTableMetaInfo != NULL)? pTableMetaInfo->name:NULL;
452
    type = pQueryInfo->type;
453

H
hjxilinx 已提交
454
    // while numOfTables equals to 0, it must be Heartbeat
H
hjxilinx 已提交
455
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
456
  }
457

458
  tscDebug("%p SQL cmd:%s will be processed, name:%s, type:%d", pSql, sqlCmd[pCmd->command], name, type);
H
hjxilinx 已提交
459
  if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL
H
hjxilinx 已提交
460
    if (pTableMetaInfo == NULL) {
461
      pSql->res.code = TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
462 463
      return pSql->res.code;
    }
H
hjxilinx 已提交
464
  } else if (pCmd->command < TSDB_SQL_LOCAL) {
465
    //pSql->epSet = tscMgmtEpSet;
H
hzcheng 已提交
466 467 468
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }
469
  
S
slguan 已提交
470 471
  return doProcessSql(pSql);
}
H
hzcheng 已提交
472

J
jtao1735 已提交
473
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
474
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
S
slguan 已提交
475
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
S
slguan 已提交
476

H
Haojun Liao 已提交
477
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
S
slguan 已提交
478
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
479

480
  // todo valid the vgroupId at the client side
H
hjxilinx 已提交
481 482
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  
weixin_48148422's avatar
weixin_48148422 已提交
483
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
484
    int32_t vgIndex = pTableMetaInfo->vgroupIndex;
H
Haojun Liao 已提交
485 486 487 488 489 490 491 492 493
    if (pTableMetaInfo->pVgroupTables == NULL) {
      SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList;
      assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);

      pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
      tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex);
    } else {
      int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
      assert(vgIndex >= 0 && vgIndex < numOfVgroups);
H
Haojun Liao 已提交
494

H
Haojun Liao 已提交
495 496 497 498 499
      SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);

      pRetrieveMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId);
      tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex);
    }
500
  } else {
H
hjxilinx 已提交
501
    STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
502
    pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
H
Haojun Liao 已提交
503
    tscDebug("%p build fetch msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId);
504
  }
505 506

  pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg);
J
jtao1735 已提交
507
  pSql->cmd.msgType = TSDB_MSG_TYPE_FETCH;
508 509 510

  pRetrieveMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));

511
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
512 513
}

514
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hjxilinx 已提交
515
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
516
  STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
H
hjxilinx 已提交
517
  
518
  char* pMsg = pSql->cmd.payload;
519 520 521
  
  // NOTE: shell message size should not include SMsgDesc
  int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
522 523
  int32_t vgId = pTableMeta->vgroupInfo.vgId;

524
  SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
525 526
  pMsgDesc->numOfVnodes = htonl(1); // always one vnode

527
  pMsg += sizeof(SMsgDesc);
528
  SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
529

H
hjxilinx 已提交
530
  pShellMsg->header.vgId = htonl(vgId);
H
Haojun Liao 已提交
531
  pShellMsg->header.contLen = htonl(size);      // the length not includes the size of SMsgDesc
532
  pShellMsg->length = pShellMsg->header.contLen;
H
hjxilinx 已提交
533
  
H
Haojun Liao 已提交
534
  pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit);  // number of tables to be inserted
H
hzcheng 已提交
535

H
hjxilinx 已提交
536
  // pSql->cmd.payloadLen is set during copying data into payload
S
slguan 已提交
537
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
dengyihao's avatar
dengyihao 已提交
538
  tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet);
539

540 541
  tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
      pSql->epSet.numOfEps);
542
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
543 544 545
}

/*
546
 * for table query, simply return the size <= 1k
H
hzcheng 已提交
547
 */
548
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
549
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
550
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
551

S
TD-1057  
Shengliang Guan 已提交
552
  int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo));
H
hjxilinx 已提交
553 554
  
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
S
TD-1057  
Shengliang Guan 已提交
555
  int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs);
H
hjxilinx 已提交
556
  
557
  return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + 4096;
H
hzcheng 已提交
558 559
}

560
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
561
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
B
Bomin Zhang 已提交
562
  TSKEY dfltKey = htobe64(pQueryMsg->window.skey);
563

H
hjxilinx 已提交
564
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
weixin_48148422's avatar
weixin_48148422 已提交
565
  if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
566 567
    
    SCMVgroupInfo* pVgroupInfo = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
568
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
569 570
      int32_t index = pTableMetaInfo->vgroupIndex;
      assert(index >= 0);
H
hjxilinx 已提交
571
  
B
Bomin Zhang 已提交
572
      if (pTableMetaInfo->vgroupList->numOfVgroups > 0) {
H
Haojun Liao 已提交
573
        assert(index < pTableMetaInfo->vgroupList->numOfVgroups);
B
Bomin Zhang 已提交
574 575
        pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
      }
H
Haojun Liao 已提交
576
      tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups);
H
hjxilinx 已提交
577 578
    } else {
      pVgroupInfo = &pTableMeta->vgroupInfo;
579
    }
weixin_48148422's avatar
weixin_48148422 已提交
580

581 582 583 584
    assert(pVgroupInfo != NULL);

    tscSetDnodeEpSet(pSql, pVgroupInfo);
    pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
weixin_48148422's avatar
weixin_48148422 已提交
585

586
    STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
587 588 589
    pTableIdInfo->tid = htonl(pTableMeta->id.tid);
    pTableIdInfo->uid = htobe64(pTableMeta->id.uid);
    pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->id.uid, dfltKey));
weixin_48148422's avatar
weixin_48148422 已提交
590

591 592
    pQueryMsg->numOfTables = htonl(1);  // set the number of tables
    pMsg += sizeof(STableIdInfo);
593
  } else { // it is a subquery of the super table query, this EP info is acquired from vgroupInfo
594
    int32_t index = pTableMetaInfo->vgroupIndex;
S
TD-1057  
Shengliang Guan 已提交
595
    int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
596
    assert(index >= 0 && index < numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
597

598
    tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
599

600
    SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
601

dengyihao's avatar
bugfix  
dengyihao 已提交
602
    // set the vgroup info 
603
    tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo);
604 605
    pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
    
S
TD-1057  
Shengliang Guan 已提交
606
    int32_t numOfTables = (int32_t)taosArrayGetSize(pTableIdList->itemList);
607 608 609 610 611 612 613 614 615
    pQueryMsg->numOfTables = htonl(numOfTables);  // set the number of tables
  
    // serialize each table id info
    for(int32_t i = 0; i < numOfTables; ++i) {
      STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i);
      
      STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
      pTableIdInfo->tid = htonl(pItem->tid);
      pTableIdInfo->uid = htobe64(pItem->uid);
B
Bomin Zhang 已提交
616
      pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pItem->uid, dfltKey));
617 618 619 620
      pMsg += sizeof(STableIdInfo);
    }
  }
  
H
Haojun Liao 已提交
621 622
  tscDebug("%p vgId:%d, query on table:%s, tid:%d, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name,
      pTableMeta->id.tid, pTableMeta->id.uid);
H
hjxilinx 已提交
623
  
624 625 626
  return pMsg;
}

627
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
628 629
  SSqlCmd *pCmd = &pSql->cmd;

630
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
631

S
slguan 已提交
632 633
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
H
Haojun Liao 已提交
634
    return TSDB_CODE_TSC_INVALID_SQL;  // todo add test for this
S
slguan 已提交
635
  }
636
  
637
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
638
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
639
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
640 641 642

  size_t numOfSrcCols = taosArrayGetSize(pQueryInfo->colList);
  if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo)) {
S
Shengliang Guan 已提交
643
    tscError("%p illegal value of numOfCols in query msg: %" PRIu64 ", table cols:%d", pSql, (uint64_t)numOfSrcCols,
644 645
        tscGetNumOfColumns(pTableMeta));

H
Haojun Liao 已提交
646
    return TSDB_CODE_TSC_INVALID_SQL;
647
  }
648
  
649
  if (pQueryInfo->interval.interval < 0) {
S
TD-1530  
Shengliang Guan 已提交
650
    tscError("%p illegal value of aggregation time interval in query msg: %" PRId64, pSql, (int64_t)pQueryInfo->interval.interval);
H
Haojun Liao 已提交
651
    return TSDB_CODE_TSC_INVALID_SQL;
652 653 654 655
  }
  
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
H
Haojun Liao 已提交
656
    return TSDB_CODE_TSC_INVALID_SQL;
657
  }
658

659
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
H
hzcheng 已提交
660

S
TD-1057  
Shengliang Guan 已提交
661
  int32_t numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList);
662
  
663
  if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
H
hjxilinx 已提交
664 665
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey);
H
hzcheng 已提交
666
  } else {
H
hjxilinx 已提交
667 668
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.ekey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey);
H
hzcheng 已提交
669 670
  }

671 672
  pQueryMsg->order          = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId     = htons(pQueryInfo->order.orderColId);
H
Haojun Liao 已提交
673
  pQueryMsg->fillType       = htons(pQueryInfo->fillType);
674 675
  pQueryMsg->limit          = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset         = htobe64(pQueryInfo->limit.offset);
S
TD-1057  
Shengliang Guan 已提交
676
  pQueryMsg->numOfCols      = htons((int16_t)taosArrayGetSize(pQueryInfo->colList));
H
Haojun Liao 已提交
677 678
  pQueryMsg->interval.interval = htobe64(pQueryInfo->interval.interval);
  pQueryMsg->interval.sliding  = htobe64(pQueryInfo->interval.sliding);
679 680
  pQueryMsg->interval.offset   = htobe64(pQueryInfo->interval.offset);
  pQueryMsg->interval.intervalUnit = pQueryInfo->interval.intervalUnit;
H
Haojun Liao 已提交
681 682
  pQueryMsg->interval.slidingUnit  = pQueryInfo->interval.slidingUnit;
  pQueryMsg->interval.offsetUnit   = pQueryInfo->interval.offsetUnit;
683
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
684
  pQueryMsg->numOfTags      = htonl(numOfTags);
weixin_48148422's avatar
weixin_48148422 已提交
685
  pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
H
Haojun Liao 已提交
686
  pQueryMsg->queryType      = htonl(pQueryInfo->type);
H
hjxilinx 已提交
687 688
  
  size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
S
TD-1057  
Shengliang Guan 已提交
689
  pQueryMsg->numOfOutput = htons((int16_t)numOfOutput);
H
hzcheng 已提交
690 691

  // set column list ids
692 693
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
H
hjxilinx 已提交
694
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
695
  
696 697 698
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
    SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
699

700
    if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
701
        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
702 703
      tscError("%p tid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
          pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
704 705
               pColSchema->name);

706
      return TSDB_CODE_TSC_INVALID_SQL;
707
    }
H
hzcheng 已提交
708 709 710

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
711
    pQueryMsg->colList[i].type  = htons(pColSchema->type);
S
slguan 已提交
712
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
713

S
slguan 已提交
714 715 716
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
717

S
slguan 已提交
718
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
719
      pFilterMsg->filterstr = htons(pColFilter->filterstr);
S
slguan 已提交
720 721

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
722

723
      if (pColFilter->filterstr) {
S
slguan 已提交
724
        pFilterMsg->len = htobe64(pColFilter->len);
725
        memcpy(pMsg, (void *)pColFilter->pz, (size_t)(pColFilter->len + 1));
S
slguan 已提交
726 727 728 729 730 731 732 733
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
734

S
slguan 已提交
735 736
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
H
Haojun Liao 已提交
737
        return TSDB_CODE_TSC_INVALID_SQL;
S
slguan 已提交
738 739
      }
    }
H
hzcheng 已提交
740 741
  }

H
hjxilinx 已提交
742
  SSqlFuncMsg *pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hjxilinx 已提交
743
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
744
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
745

746
    if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
H
hzcheng 已提交
747
      tscError("%p table schema is not matched with parsed sql", pSql);
H
Haojun Liao 已提交
748
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
749 750
    }

751 752 753
    pSqlFuncExpr->colInfo.colId    = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
    pSqlFuncExpr->colInfo.flag     = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
754

755
    pSqlFuncExpr->functionId  = htons(pExpr->functionId);
H
hzcheng 已提交
756
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
H
hjxilinx 已提交
757
    pMsg += sizeof(SSqlFuncMsg);
H
hzcheng 已提交
758 759

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
H
Haojun Liao 已提交
760
      // todo add log
H
hzcheng 已提交
761 762 763 764 765
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
766
        pMsg += pExpr->param[j].nLen;
H
hzcheng 已提交
767 768 769 770 771
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

H
hjxilinx 已提交
772
    pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hzcheng 已提交
773
  }
774
  
775
  // serialize the table info (sid, uid, tags)
776 777
  pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
  
778
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
779
  if (pGroupbyExpr->numOfGroupCols > 0) {
S
slguan 已提交
780
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
781 782
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
783
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
784 785
      SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
  
S
slguan 已提交
786 787 788
      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

789 790
      *((int16_t *)pMsg) += pCol->colIndex;
      pMsg += sizeof(pCol->colIndex);
S
slguan 已提交
791 792 793

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
794 795 796
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
797 798 799
    }
  }

800
  if (pQueryInfo->fillType != TSDB_FILL_NONE) {
H
hjxilinx 已提交
801
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
802 803
      *((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]);
      pMsg += sizeof(pQueryInfo->fillVal[0]);
S
slguan 已提交
804 805
    }
  }
806 807 808 809 810 811 812 813 814
  
  if (numOfTags != 0) {
    int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
    int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
    int32_t total = numOfTagColumns + numOfColumns;
    
    pSchema = tscGetTableTagSchema(pTableMeta);
    
    for (int32_t i = 0; i < numOfTags; ++i) {
H
hjxilinx 已提交
815
      SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
816 817 818 819
      SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];

      if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
          (pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
820 821
        tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
                 pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, total, numOfTagColumns,
B
Bomin Zhang 已提交
822
                 pCol->colIndex.columnIndex, pColSchema->name);
823

824
        return TSDB_CODE_TSC_INVALID_SQL;
825 826 827 828 829 830 831 832 833 834 835 836
      }
  
      SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
  
      pTagCol->colId = htons(pColSchema->colId);
      pTagCol->bytes = htons(pColSchema->bytes);
      pTagCol->type  = htons(pColSchema->type);
      pTagCol->numOfFilters = 0;
      
      pMsg += sizeof(SColumnInfo);
    }
  }
S
slguan 已提交
837

H
Haojun Liao 已提交
838 839 840 841
  // serialize tag column query condition
  if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
    STagCond* pTagCond = &pQueryInfo->tagCond;
    
842
    SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid);
H
Haojun Liao 已提交
843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858
    if (pCond != NULL && pCond->cond != NULL) {
      pQueryMsg->tagCondLen = htons(pCond->len);
      memcpy(pMsg, pCond->cond, pCond->len);
      
      pMsg += pCond->len;
    }
  }
  
  if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
    *pMsg = 0;
    pMsg++;
  } else {
    strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
    pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
  }

S
slguan 已提交
859
  // compressed ts block
S
TD-1057  
Shengliang Guan 已提交
860
  pQueryMsg->tsOffset = htonl((int32_t)(pMsg - pCmd->payload));
S
slguan 已提交
861 862 863
  int32_t tsLen = 0;
  int32_t numOfBlocks = 0;

864
  if (pQueryInfo->tsBuf != NULL) {
H
Haojun Liao 已提交
865 866
    int32_t vnodeId = htonl(pQueryMsg->head.vgId);
    STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, vnodeId);
867
    assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL);  // this query should not be sent
S
slguan 已提交
868 869

    // todo refactor
B
Bomin Zhang 已提交
870 871 872 873 874
    if (fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET) != 0) {
      int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
      tscError("%p: fseek failed: %s", pSql, tstrerror(code));
      return code;
    }
H
Haojun Liao 已提交
875 876 877

    size_t s = fread(pMsg, 1, pBlockInfo->compLen, pQueryInfo->tsBuf->f);
    if (s != pBlockInfo->compLen) {
B
Bomin Zhang 已提交
878 879 880 881
      int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
      tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
      return code;
    }
S
slguan 已提交
882 883 884 885

    pMsg += pBlockInfo->compLen;
    tsLen = pBlockInfo->compLen;
    numOfBlocks = pBlockInfo->numOfBlocks;
H
hzcheng 已提交
886 887
  }

S
slguan 已提交
888 889
  pQueryMsg->tsLen = htonl(tsLen);
  pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
890 891
  if (pQueryInfo->tsBuf != NULL) {
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
H
hzcheng 已提交
892 893
  }

S
TD-1057  
Shengliang Guan 已提交
894
  int32_t msgLen = (int32_t)(pMsg - pCmd->payload);
H
hzcheng 已提交
895

896
  tscDebug("%p msg built success,len:%d bytes", pSql, msgLen);
H
hzcheng 已提交
897
  pCmd->payloadLen = msgLen;
S
slguan 已提交
898
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hjxilinx 已提交
899
  
900
  pQueryMsg->head.contLen = htonl(msgLen);
S
TD-1057  
Shengliang Guan 已提交
901
  assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize);
902 903

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
904 905
}

906 907
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
908
  pCmd->payloadLen = sizeof(SCMCreateDbMsg);
S
slguan 已提交
909
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
H
hzcheng 已提交
910

911
  SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload;
912

913
  assert(pCmd->numOfClause == 1);
914
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
915
  tstrncpy(pCreateDbMsg->db, pTableMetaInfo->name, sizeof(pCreateDbMsg->db));
H
hzcheng 已提交
916

917
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
918 919
}

920 921
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
922
  pCmd->payloadLen = sizeof(SCMCreateDnodeMsg);
S
slguan 已提交
923 924
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
925
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
926
  }
H
hzcheng 已提交
927

928
  SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload;
J
jtao1735 已提交
929 930
  strncpy(pCreate->ep, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
  
S
slguan 已提交
931
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
H
hzcheng 已提交
932

933
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
934 935
}

936 937
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
938
  pCmd->payloadLen = sizeof(SCMCreateAcctMsg);
S
slguan 已提交
939 940
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
941
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
942
  }
H
hzcheng 已提交
943

944
  SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
945

H
Haojun Liao 已提交
946 947
  SStrToken *pName = &pInfo->pDCLInfo->user.user;
  SStrToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
948

949 950
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
951

952
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
953

954 955 956 957 958 959 960 961
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
962

963 964 965 966 967 968 969 970 971 972 973 974 975
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
976

S
slguan 已提交
977
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
978
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
979 980
}

981 982
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
983
  pCmd->payloadLen = sizeof(SCMCreateUserMsg);
S
slguan 已提交
984

S
slguan 已提交
985 986
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
987
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
988 989
  }

990
  SCMCreateUserMsg *pAlterMsg = (SCMCreateUserMsg*)pCmd->payload;
H
hzcheng 已提交
991

992 993
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
S
TD-1057  
Shengliang Guan 已提交
994
  pAlterMsg->flag = (int8_t)pUser->type;
H
hzcheng 已提交
995

996 997 998 999
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
1000 1001
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
1002
  }
H
hzcheng 已提交
1003

1004
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
S
slguan 已提交
1005
    pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
1006
  } else {
S
slguan 已提交
1007
    pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
1008
  }
H
hzcheng 已提交
1009

1010
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1011 1012
}

1013 1014
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1015
  pCmd->payloadLen = sizeof(SCMCfgDnodeMsg);
guanshengliang's avatar
guanshengliang 已提交
1016
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONFIG_DNODE;
1017 1018
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
1019

1020 1021
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1022
  pCmd->payloadLen = sizeof(SCMDropDbMsg);
H
hzcheng 已提交
1023

S
slguan 已提交
1024 1025
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1026
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1027 1028
  }

1029
  SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
1030

1031
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1032
  tstrncpy(pDropDbMsg->db, pTableMetaInfo->name, sizeof(pDropDbMsg->db));
1033
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1034

S
slguan 已提交
1035
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
1036
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1037 1038
}

1039 1040
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1041
  pCmd->payloadLen = sizeof(SCMDropTableMsg);
H
hzcheng 已提交
1042

S
slguan 已提交
1043 1044
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1045
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1046
  }
H
hzcheng 已提交
1047

1048
  SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
1049
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1050
  strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
S
slguan 已提交
1051
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1052

S
slguan 已提交
1053
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
1054
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1055 1056
}

1057
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1058
  SSqlCmd *pCmd = &pSql->cmd;
1059
  pCmd->payloadLen = sizeof(SCMDropDnodeMsg);
S
slguan 已提交
1060 1061
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1062
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1063
  }
H
hzcheng 已提交
1064

1065
  SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
1066
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1067
  tstrncpy(pDrop->ep, pTableMetaInfo->name, sizeof(pDrop->ep));
S
slguan 已提交
1068
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
H
hzcheng 已提交
1069

1070
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1071 1072
}

S
[TD-16]  
slguan 已提交
1073
int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1074
  SSqlCmd *pCmd = &pSql->cmd;
1075
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
S
slguan 已提交
1076
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
H
hzcheng 已提交
1077

S
slguan 已提交
1078 1079
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1080
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1081
  }
H
hzcheng 已提交
1082

1083
  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
1084
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1085
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
H
hzcheng 已提交
1086

1087
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1088 1089
}

S
[TD-16]  
slguan 已提交
1090 1091 1092 1093 1094 1095 1096
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_ACCT;

  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1097
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
[TD-16]  
slguan 已提交
1098 1099 1100 1101
  }

  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1102
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
S
[TD-16]  
slguan 已提交
1103 1104 1105 1106

  return TSDB_CODE_SUCCESS;
}

1107 1108
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1109
  pCmd->payloadLen = sizeof(SCMUseDbMsg);
H
hzcheng 已提交
1110

S
slguan 已提交
1111 1112
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1113
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1114
  }
1115

1116
  SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload;
1117
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1118
  strcpy(pUseDbMsg->db, pTableMetaInfo->name);
S
slguan 已提交
1119
  pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
H
hzcheng 已提交
1120

1121
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1122 1123
}

1124
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1125
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1126
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1127
  pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
1128
  pCmd->payloadLen = sizeof(SCMShowMsg) + 100;
H
hzcheng 已提交
1129

S
slguan 已提交
1130 1131
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1132
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1133
  }
H
hzcheng 已提交
1134

1135
  SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload;
S
slguan 已提交
1136

1137
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1138
  size_t nameLen = strlen(pTableMetaInfo->name);
S
slguan 已提交
1139
  if (nameLen > 0) {
B
Bomin Zhang 已提交
1140
    tstrncpy(pShowMsg->db, pTableMetaInfo->name, sizeof(pShowMsg->db));  // prefix is set here
H
hzcheng 已提交
1141
  } else {
B
Bomin Zhang 已提交
1142
    tstrncpy(pShowMsg->db, pObj->db, sizeof(pShowMsg->db));
H
hzcheng 已提交
1143 1144
  }

1145 1146 1147 1148
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
H
Haojun Liao 已提交
1149
    SStrToken *pPattern = &pShowInfo->pattern;
1150 1151 1152 1153 1154
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
H
Haojun Liao 已提交
1155
    SStrToken *pEpAddr = &pShowInfo->prefix;
dengyihao's avatar
dengyihao 已提交
1156
    assert(pEpAddr->n > 0 && pEpAddr->type > 0);
H
hzcheng 已提交
1157

dengyihao's avatar
dengyihao 已提交
1158 1159
    strncpy(pShowMsg->payload, pEpAddr->z, pEpAddr->n);
    pShowMsg->payloadLen = htons(pEpAddr->n);
1160 1161
  }

1162
  pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
1163
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1164 1165
}

1166
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1167
  SSqlCmd *pCmd = &pSql->cmd;
1168
  pCmd->payloadLen = sizeof(SCMKillQueryMsg);
H
hzcheng 已提交
1169

1170 1171
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
S
slguan 已提交
1172
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
1173 1174
      break;
    case TSDB_SQL_KILL_CONNECTION:
S
slguan 已提交
1175
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
1176 1177
      break;
    case TSDB_SQL_KILL_STREAM:
S
slguan 已提交
1178
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
1179 1180 1181
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1182 1183
}

1184
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1185 1186
  SSqlCmd *pCmd = &(pSql->cmd);

guanshengliang's avatar
guanshengliang 已提交
1187
  int32_t size = minMsgSize() + sizeof(SCMCreateTableMsg);
H
hzcheng 已提交
1188

1189
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1190
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
1191 1192
    size += sizeof(STagData);
  } else {
S
slguan 已提交
1193
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1194
  }
1195

1196 1197 1198
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1199 1200 1201 1202

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1203
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1204
  int              msgLen = 0;
S
slguan 已提交
1205
  SSchema *        pSchema;
H
hzcheng 已提交
1206
  int              size = 0;
1207 1208 1209
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1210
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1211 1212

  // Reallocate the payload size
1213
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1214 1215
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1216
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1217
  }
H
hzcheng 已提交
1218 1219


1220
  SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
H
hjxilinx 已提交
1221
  strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
1222 1223

  // use dbinfo from table id without modifying current db info
H
Haojun Liao 已提交
1224
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pCreateTableMsg->db);
H
hzcheng 已提交
1225

1226 1227 1228
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;
H
hzcheng 已提交
1229 1230 1231 1232
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
S
slguan 已提交
1233
  char *pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
1234

1235 1236
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
1237 1238 1239 1240 1241 1242 1243
    STagData* pTag = &pInfo->pCreateTableInfo->usingInfo.tagdata;
    *(int32_t*)pMsg = htonl(pTag->dataLen);
    pMsg += sizeof(int32_t);
    memcpy(pMsg, pTag->name, sizeof(pTag->name));
    pMsg += sizeof(pTag->name);
    memcpy(pMsg, pTag->data, pTag->dataLen);
    pMsg += pTag->dataLen;
1244
  } else {  // create (super) table
1245
    pSchema = (SSchema *)pCreateTableMsg->schema;
1246

H
hzcheng 已提交
1247
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
H
hjxilinx 已提交
1248
      TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hzcheng 已提交
1249 1250 1251 1252

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
1253

H
hzcheng 已提交
1254 1255 1256 1257
      pSchema++;
    }

    pMsg = (char *)pSchema;
1258 1259
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
1260

1261 1262 1263
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
1264 1265 1266
    }
  }

H
hjxilinx 已提交
1267
  tscFieldInfoClear(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
1268

S
TD-1057  
Shengliang Guan 已提交
1269
  msgLen = (int32_t)(pMsg - (char*)pCreateTableMsg);
S
slguan 已提交
1270
  pCreateTableMsg->contLen = htonl(msgLen);
H
hzcheng 已提交
1271
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1272
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
H
hzcheng 已提交
1273 1274

  assert(msgLen + minMsgSize() <= size);
1275
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1276 1277 1278
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
1279
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
guanshengliang's avatar
guanshengliang 已提交
1280
  return minMsgSize() + sizeof(SCMAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
1281 1282 1283
         TSDB_EXTRA_PAYLOAD_SIZE;
}

1284
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1285 1286
  char *pMsg;
  int   msgLen = 0;
H
hzcheng 已提交
1287

1288
  SSqlCmd    *pCmd = &pSql->cmd;
1289 1290
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
1291
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1292 1293 1294
  
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;
  int size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
1295 1296
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
H
Haojun Liao 已提交
1297
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1298
  }
1299 1300
  
  SCMAlterTableMsg *pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
H
Haojun Liao 已提交
1301
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
1302

H
hjxilinx 已提交
1303
  strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
1304
  pAlterTableMsg->type = htons(pAlterInfo->type);
1305

1306
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
S
slguan 已提交
1307
  SSchema *pSchema = pAlterTableMsg->schema;
1308
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
H
hjxilinx 已提交
1309
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
1310
  
H
hzcheng 已提交
1311 1312 1313 1314 1315 1316 1317
    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;
1318 1319 1320
  pAlterTableMsg->tagValLen = htonl(pAlterInfo->tagData.dataLen);
  memcpy(pMsg, pAlterInfo->tagData.data, pAlterInfo->tagData.dataLen);
  pMsg += pAlterInfo->tagData.dataLen;
H
hzcheng 已提交
1321

S
TD-1057  
Shengliang Guan 已提交
1322
  msgLen = (int32_t)(pMsg - (char*)pAlterTableMsg);
1323

H
hzcheng 已提交
1324
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1325
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
H
hzcheng 已提交
1326 1327

  assert(msgLen + minMsgSize() <= size);
1328

1329
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1330 1331
}

1332 1333 1334 1335
int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
  SSqlCmd* pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL;
  
1336
  SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
1337
  pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
1338

1339 1340
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1341

dengyihao's avatar
dengyihao 已提交
1342
  tscDumpEpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->epSet);
1343

1344 1345 1346
  return TSDB_CODE_SUCCESS;
}

1347
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1348
  SSqlCmd *pCmd = &pSql->cmd;
1349
  pCmd->payloadLen = sizeof(SCMAlterDbMsg);
S
slguan 已提交
1350
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
H
hzcheng 已提交
1351

1352
  SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
1353
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1354
  tstrncpy(pAlterDbMsg->db, pTableMetaInfo->name, sizeof(pAlterDbMsg->db));
H
hzcheng 已提交
1355

1356
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1357 1358
}

1359
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1360
  SSqlCmd *pCmd = &pSql->cmd;
J
jtao1735 已提交
1361
  pCmd->msgType = TSDB_MSG_TYPE_CM_RETRIEVE;
S
slguan 已提交
1362
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
1363

S
slguan 已提交
1364 1365
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1366
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
1367
  }
S
slguan 已提交
1368

S
slguan 已提交
1369 1370 1371 1372
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
1373

1374
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1375 1376
}

1377
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
1378
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1379 1380 1381
    return pRes->code;
  }

H
hjxilinx 已提交
1382
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
H
hjxilinx 已提交
1383
    int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
1384
    pRes->tsrow[i] = (unsigned char*)((char*) pRes->data + offset * pRes->numOfRows);
H
hzcheng 已提交
1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
1399

1400
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1401

H
hzcheng 已提交
1402 1403 1404 1405 1406 1407
  pRes->code = TSDB_CODE_SUCCESS;
  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

1408
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1409
  } else {
S
slguan 已提交
1410
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
1426
  SSqlCmd *       pCmd = &pSql->cmd;
1427
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
1428

H
hjxilinx 已提交
1429
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1430 1431
  
  int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
H
hzcheng 已提交
1432 1433 1434
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
Haojun Liao 已提交
1435 1436 1437
int tscProcessLocalRetrieveRsp(SSqlObj *pSql) {
  int32_t numOfRes = 1;
  pSql->res.completed = true;
H
hzcheng 已提交
1438 1439 1440
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
hjxilinx 已提交
1441
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
H
hzcheng 已提交
1442
  SSqlRes *pRes = &pSql->res;
H
Haojun Liao 已提交
1443
  SSqlCmd* pCmd = &pSql->cmd;
H
hzcheng 已提交
1444

H
Haojun Liao 已提交
1445 1446 1447 1448 1449 1450
  int32_t code = pRes->code;
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return code;
  }

1451
  pRes->code = tscDoLocalMerge(pSql);
H
hjxilinx 已提交
1452
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1453 1454

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
1455
    tscCreateResPointerInfo(pRes, pQueryInfo);
H
hzcheng 已提交
1456 1457 1458
  }

  pRes->row = 0;
1459
  pRes->completed = (pRes->numOfRows == 0);
H
hzcheng 已提交
1460

H
Haojun Liao 已提交
1461
  code = pRes->code;
H
hjxilinx 已提交
1462 1463 1464 1465
  if (pRes->code == TSDB_CODE_SUCCESS) {
    (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
  } else {
    tscQueueAsyncRes(pSql);
H
hzcheng 已提交
1466 1467 1468 1469 1470
  }

  return code;
}

S
slguan 已提交
1471
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
1472

1473
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1474
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1475
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1476
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
1477
  pCmd->payloadLen = sizeof(SCMConnectMsg);
H
hzcheng 已提交
1478

S
slguan 已提交
1479 1480
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1481
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1482 1483
  }

1484
  SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
H
hzcheng 已提交
1485

H
Haojun Liao 已提交
1486
  // TODO refactor full_name
H
hzcheng 已提交
1487 1488 1489
  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
B
Bomin Zhang 已提交
1490 1491 1492
  tstrncpy(pConnect->db, db, sizeof(pConnect->db));
  tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion));
  tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion));
H
hzcheng 已提交
1493

H
Haojun Liao 已提交
1494 1495 1496
  pConnect->pid = htonl(taosGetPId());
  taosGetCurrentAPPName(pConnect->appName, NULL);

1497
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1498 1499
}

H
hjxilinx 已提交
1500
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1501 1502 1503
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
1504
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1505

B
Bomin Zhang 已提交
1506
  SCMTableInfoMsg* pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
H
hjxilinx 已提交
1507
  strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
H
hjxilinx 已提交
1508
  pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
S
slguan 已提交
1509

B
Bomin Zhang 已提交
1510
  char* pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
H
hzcheng 已提交
1511

B
Bomin Zhang 已提交
1512 1513 1514 1515 1516 1517 1518
  size_t len = htonl(pCmd->tagData.dataLen);
  if (pSql->cmd.autoCreated) {
    if (len > 0) {
      len += sizeof(pCmd->tagData.name) + sizeof(pCmd->tagData.dataLen);
      memcpy(pInfoMsg->tags, &pCmd->tagData, len);
      pMsg += len;
    }
H
hzcheng 已提交
1519 1520
  }

S
TD-1057  
Shengliang Guan 已提交
1521
  pCmd->payloadLen = (int32_t)(pMsg - (char*)pInfoMsg);
S
slguan 已提交
1522
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
H
hzcheng 已提交
1523

1524
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1525 1526
}

S
slguan 已提交
1527
/**
1528
 *  multi table meta req pkg format:
1529
 *  | SMgmtHead | SCMMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
1530 1531
 *      no used         4B
 **/
1532
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
guanshengliang's avatar
guanshengliang 已提交
1533
#if 0
S
slguan 已提交
1534 1535 1536 1537 1538
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
H
Haojun Liao 已提交
1539
    if ((tmpData = calloc(1, pCmd->payloadLen + 1)) == NULL) return -1;
S
slguan 已提交
1540 1541 1542 1543 1544
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
H
Haojun Liao 已提交
1545
  memset(pMgmt->db, 0, TSDB_TABLE_FNAME_LEN);  // server don't need the db
S
slguan 已提交
1546

1547
  SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
S
slguan 已提交
1548
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
1549 1550

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
1551
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
1552 1553
  }

S
Shengliang Guan 已提交
1554
  taosTFree(tmpData);
S
slguan 已提交
1555

1556
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1557
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
S
slguan 已提交
1558 1559 1560

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

1561
  tscDebug("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
1562 1563 1564
           pCmd->payloadLen);

  return pCmd->payloadLen;
guanshengliang's avatar
guanshengliang 已提交
1565 1566
#endif
  return 0;  
S
slguan 已提交
1567 1568
}

H
hjxilinx 已提交
1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585
//static UNUSED_FUNC int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
////  const int32_t defaultSize =
////      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
////  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
////
////  int32_t n = 0;
////  size_t size = taosArrayGetSize(pQueryInfo->tagCond.pCond);
////  for (int32_t i = 0; i < size; ++i) {
////    assert(0);
//////    n += strlen(pQueryInfo->tagCond.cond[i].cond);
////  }
////
////  int32_t tagLen = n * TSDB_NCHAR_SIZE;
////  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
////    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
////  }
////
H
Haojun Liao 已提交
1586
////  int32_t joinCondLen = (TSDB_TABLE_FNAME_LEN + sizeof(int16_t)) * 2;
H
hjxilinx 已提交
1587 1588 1589 1590 1591 1592 1593 1594
////  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
////
////  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndex);
////
////  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
////
////  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
//}
H
hzcheng 已提交
1595

H
hjxilinx 已提交
1596 1597
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hjxilinx 已提交
1598 1599 1600 1601 1602 1603 1604 1605 1606 1607
  
  char* pMsg = pCmd->payload;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  
  SCMSTableVgroupMsg *pStableVgroupMsg = (SCMSTableVgroupMsg *) pMsg;
  pStableVgroupMsg->numOfTables = htonl(pQueryInfo->numOfTables);
  pMsg += sizeof(SCMSTableVgroupMsg);
  
  for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
1608 1609 1610
    size_t size = sizeof(pTableMetaInfo->name);
    tstrncpy(pMsg, pTableMetaInfo->name, size);
    pMsg += size;
H
hjxilinx 已提交
1611
  }
H
hjxilinx 已提交
1612 1613

  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
S
TD-1057  
Shengliang Guan 已提交
1614
  pCmd->payloadLen = (int32_t)(pMsg - pCmd->payload);
H
hjxilinx 已提交
1615

1616
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1617 1618
}

1619 1620
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1621 1622
  STscObj *pObj = pSql->pTscObj;

1623
  pthread_mutex_lock(&pObj->mutex);
H
hzcheng 已提交
1624

S
Shengliang Guan 已提交
1625
  int32_t numOfQueries = 2;
H
hzcheng 已提交
1626 1627 1628
  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
    tpSql = tpSql->next;
1629
    numOfQueries++;
H
hzcheng 已提交
1630 1631
  }

S
Shengliang Guan 已提交
1632
  int32_t numOfStreams = 2;
H
hzcheng 已提交
1633 1634 1635
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
    pStream = pStream->next;
1636
    numOfStreams++;
H
hzcheng 已提交
1637 1638
  }

1639
  int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100;
S
slguan 已提交
1640
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
1641
    pthread_mutex_unlock(&pObj->mutex);
S
slguan 已提交
1642
    tscError("%p failed to malloc for heartbeat msg", pSql);
H
Haojun Liao 已提交
1643
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1644
  }
H
hzcheng 已提交
1645

1646
  SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
1647 1648
  pHeartbeat->numOfQueries = numOfQueries;
  pHeartbeat->numOfStreams = numOfStreams;
H
Haojun Liao 已提交
1649 1650 1651 1652

  pHeartbeat->pid = htonl(taosGetPId());
  taosGetCurrentAPPName(pHeartbeat->appName, NULL);

1653
  int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj);
H
hzcheng 已提交
1654 1655 1656 1657

  pthread_mutex_unlock(&pObj->mutex);

  pCmd->payloadLen = msgLen;
S
slguan 已提交
1658
  pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
H
hzcheng 已提交
1659

1660
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1661 1662
}

1663 1664
int tscProcessTableMetaRsp(SSqlObj *pSql) {
  STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
H
hzcheng 已提交
1665

H
Haojun Liao 已提交
1666
  pMetaMsg->tid = htonl(pMetaMsg->tid);
1667
  pMetaMsg->sversion = htons(pMetaMsg->sversion);
1668
  pMetaMsg->tversion = htons(pMetaMsg->tversion);
H
hjxilinx 已提交
1669 1670
  pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
  
1671 1672
  pMetaMsg->uid = htobe64(pMetaMsg->uid);
  pMetaMsg->contLen = htons(pMetaMsg->contLen);
1673
  pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1674

H
Haojun Liao 已提交
1675
  if ((pMetaMsg->tableType != TSDB_SUPER_TABLE) &&
H
Haojun Liao 已提交
1676
      (pMetaMsg->tid <= 0 || pMetaMsg->vgroup.vgId < 2 || pMetaMsg->vgroup.numOfEps <= 0)) {
H
Haojun Liao 已提交
1677
    tscError("invalid value in table numOfEps:%d, vgId:%d tid:%d, name:%s", pMetaMsg->vgroup.numOfEps, pMetaMsg->vgroup.vgId,
H
Haojun Liao 已提交
1678
             pMetaMsg->tid, pMetaMsg->tableId);
1679
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1680 1681
  }

B
Bomin Zhang 已提交
1682
  if (pMetaMsg->numOfTags > TSDB_MAX_TAGS) {
1683
    tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
1684
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1685 1686
  }

1687 1688
  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
1689
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1690 1691
  }

1692 1693
  for (int i = 0; i < pMetaMsg->vgroup.numOfEps; ++i) {
    pMetaMsg->vgroup.epAddr[i].port = htons(pMetaMsg->vgroup.epAddr[i].port);
H
hzcheng 已提交
1694 1695
  }

1696
  SSchema* pSchema = pMetaMsg->schema;
H
hzcheng 已提交
1697

1698
  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
H
hzcheng 已提交
1699 1700 1701
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);
1702 1703 1704 1705 1706

    if (pSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
      assert(i == 0);
    }

H
hjxilinx 已提交
1707
    assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
1708 1709 1710
    pSchema++;
  }

1711 1712
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
1713
  
H
hzcheng 已提交
1714
  // todo add one more function: taosAddDataIfNotExists();
1715
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1716
  assert(pTableMetaInfo->pTableMeta == NULL);
H
hzcheng 已提交
1717

H
Haojun Liao 已提交
1718
  pTableMetaInfo->pTableMeta = (STableMeta *) taosCachePut(tscMetaCache, pTableMetaInfo->name,
1719
      strlen(pTableMetaInfo->name), pTableMeta, size, tsTableMetaKeepTimer * 1000);
H
hjxilinx 已提交
1720
  
1721
  // todo handle out of memory case
1722
  if (pTableMetaInfo->pTableMeta == NULL) {
B
Bomin Zhang 已提交
1723
    free(pTableMeta);
1724
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1725
  }
H
hzcheng 已提交
1726

1727
  tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->id.uid, pTableMeta->id.tid, pTableMetaInfo->name);
1728
  free(pTableMeta);
1729
  
H
hjxilinx 已提交
1730
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1731 1732
}

S
slguan 已提交
1733
/**
1734
 *  multi table meta rsp pkg format:
1735
 *  | STaosRsp | ieType | SCMMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
1736 1737 1738
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
1739
#if 0
S
slguan 已提交
1740 1741 1742 1743 1744
  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
1745
    pSql->res.code = TSDB_CODE_TSC_INVALID_IE;
S
slguan 已提交
1746
    pSql->res.numOfTotal = 0;
1747
    return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1748 1749 1750 1751
  }

  rsp++;

1752
  SCMMultiTableInfoMsg *pInfo = (SCMMultiTableInfoMsg *)rsp;
S
slguan 已提交
1753
  totalNum = htonl(pInfo->numOfTables);
1754
  rsp += sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1755 1756

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
1757
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
1758
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
1759 1760 1761

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
H
hjxilinx 已提交
1762
    pMeta->vgId = htonl(pMeta->vgId);
S
slguan 已提交
1763 1764
    pMeta->uid = htobe64(pMeta->uid);

H
hjxilinx 已提交
1765 1766
    if (pMeta->sid <= 0 || pMeta->vgId < 0) {
      tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
1767
      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
S
slguan 已提交
1768
      pSql->res.numOfTotal = i;
1769
      return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1770 1771
    }

H
hjxilinx 已提交
1772 1773 1774 1775
    //    pMeta->numOfColumns = htons(pMeta->numOfColumns);
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid tag value count:%d", pMeta->numOfTags);
1776
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1777
    //      pSql->res.numOfTotal = i;
1778
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1779 1780 1781 1782
    //    }
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid numOfTags:%d", pMeta->numOfTags);
1783
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1784
    //      pSql->res.numOfTotal = i;
1785
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1786 1787 1788 1789
    //    }
    //
    //    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
    //      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
1790
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1791
    //      pSql->res.numOfTotal = i;
1792
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1793 1794
    //    }
    //
H
hjxilinx 已提交
1795
    //    for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) {
H
hjxilinx 已提交
1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829
    //      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    //    }
    //
    //    pMeta->rowSize = 0;
    //    rsp += sizeof(SMultiTableMeta);
    //    pSchema = (SSchema *)rsp;
    //
    //    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    //    for (int j = 0; j < numOfTotalCols; ++j) {
    //      pSchema->bytes = htons(pSchema->bytes);
    //      pSchema->colId = htons(pSchema->colId);
    //
    //      // ignore the tags length
    //      if (j < pMeta->numOfColumns) {
    //        pMeta->rowSize += pSchema->bytes;
    //      }
    //      pSchema++;
    //    }
    //
    //    rsp += numOfTotalCols * sizeof(SSchema);
    //
    //    int32_t  tagLen = 0;
    //    SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
    //
    //    if (pMeta->tableType == TSDB_CHILD_TABLE) {
    //      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
    //        tagLen += pTagsSchema[j].bytes;
    //      }
    //    }
    //
    //    rsp += tagLen;
    //    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
    //
    //    pMeta->index = 0;
H
Haojun Liao 已提交
1830
    //    (void)taosCachePut(tscMetaCache, pMeta->tableId, (char *)pMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1831
    //  }
S
slguan 已提交
1832
  }
H
hjxilinx 已提交
1833
  
S
slguan 已提交
1834 1835
  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
1836
  tscDebug("%p load multi-metermeta resp from complete num:%d", pSql, pSql->res.numOfTotal);
1837 1838
#endif
  
S
slguan 已提交
1839 1840 1841
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
1842
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
1843
  SSqlRes* pRes = &pSql->res;
H
hjxilinx 已提交
1844
  
H
hjxilinx 已提交
1845
  // NOTE: the order of several table must be preserved.
1846
  SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pRes->pRsp;
H
hjxilinx 已提交
1847 1848
  pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables);
  char* pMsg = pRes->pRsp + sizeof(SCMSTableVgroupRspMsg);
H
hjxilinx 已提交
1849
  
1850 1851 1852
  // master sqlObj locates in param
  SSqlObj* parent = pSql->param;
  assert(parent != NULL);
H
hjxilinx 已提交
1853
  
1854
  SSqlCmd* pCmd = &parent->cmd;
H
hjxilinx 已提交
1855 1856 1857
  for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) {
    STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);

H
Haojun Liao 已提交
1858 1859 1860
    SVgroupsMsg *  pVgroupMsg = (SVgroupsMsg *) pMsg;
    pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups);

H
Haojun Liao 已提交
1861 1862 1863 1864
    size_t size = sizeof(SCMVgroupMsg) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsMsg);

    size_t vgroupsz = sizeof(SCMVgroupInfo) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsInfo);
    pInfo->vgroupList = calloc(1, vgroupsz);
H
hjxilinx 已提交
1865 1866
    assert(pInfo->vgroupList != NULL);

H
Haojun Liao 已提交
1867
    pInfo->vgroupList->numOfVgroups = pVgroupMsg->numOfVgroups;
H
hjxilinx 已提交
1868
    for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
dengyihao's avatar
dengyihao 已提交
1869
      //just init, no need to lock
H
hjxilinx 已提交
1870
      SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
H
Haojun Liao 已提交
1871

H
Haojun Liao 已提交
1872 1873 1874
      SCMVgroupMsg *vmsg = &pVgroupMsg->vgroups[j];
      pVgroups->vgId = htonl(vmsg->vgId);
      pVgroups->numOfEps = vmsg->numOfEps;
H
Haojun Liao 已提交
1875

H
Haojun Liao 已提交
1876
      assert(pVgroups->numOfEps >= 1 && pVgroups->vgId >= 1);
H
hjxilinx 已提交
1877

1878
      for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
H
Haojun Liao 已提交
1879 1880
        pVgroups->epAddr[k].port = htons(vmsg->epAddr[k].port);
        pVgroups->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, tListLen(vmsg->epAddr[k].fqdn));
H
hjxilinx 已提交
1881
      }
1882
    }
1883 1884

    pMsg += size;
H
hjxilinx 已提交
1885 1886
  }
  
S
slguan 已提交
1887
  return pSql->res.code;
H
hzcheng 已提交
1888 1889 1890 1891 1892 1893
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
H
hjxilinx 已提交
1894
  STableMetaMsg * pMetaMsg;
1895
  SCMShowRsp *pShow;
S
slguan 已提交
1896
  SSchema *    pSchema;
H
hzcheng 已提交
1897 1898
  char         key[20];

1899 1900 1901
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
1902
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
1903

H
hjxilinx 已提交
1904
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1905

1906
  pShow = (SCMShowRsp *)pRes->pRsp;
S
slguan 已提交
1907
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
1908 1909
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
1910
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
1911
  pMetaMsg = &(pShow->tableMeta);
H
hzcheng 已提交
1912

H
hjxilinx 已提交
1913
  pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1914

H
hjxilinx 已提交
1915
  pSchema = pMetaMsg->schema;
H
Haojun Liao 已提交
1916
  pMetaMsg->tid = ntohs(pMetaMsg->tid);
H
hjxilinx 已提交
1917
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
hzcheng 已提交
1918 1919 1920 1921
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

1922
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
1923 1924
  strcpy(key + 1, "showlist");

H
Haojun Liao 已提交
1925 1926 1927 1928
  if (pTableMetaInfo->pTableMeta != NULL) {
    taosCacheRelease(tscMetaCache, (void *)&(pTableMetaInfo->pTableMeta), false);
  }

H
hjxilinx 已提交
1929 1930 1931
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
  
H
Haojun Liao 已提交
1932
  pTableMetaInfo->pTableMeta = taosCachePut(tscMetaCache, key, strlen(key), (char *)pTableMeta, size,
1933
      tsTableMetaKeepTimer * 1000);
H
hjxilinx 已提交
1934
  SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
1935

1936 1937 1938 1939
  if (pQueryInfo->colList == NULL) {
    pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
  }
  
H
hjxilinx 已提交
1940 1941
  SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
  
S
slguan 已提交
1942
  SColumnIndex index = {0};
H
hjxilinx 已提交
1943 1944 1945
  pSchema = pMetaMsg->schema;
  
  for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) {
S
slguan 已提交
1946
    index.columnIndex = i;
1947 1948
    tscColumnListInsert(pQueryInfo->colList, &index);
    
H
hjxilinx 已提交
1949
    TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes);
H
Haojun Liao 已提交
1950
    SInternalField* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
H
hjxilinx 已提交
1951
    
H
hjxilinx 已提交
1952
    pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
1953
                     pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false);
H
hzcheng 已提交
1954
  }
H
hjxilinx 已提交
1955 1956
  
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
1957
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
1958
  
S
Shengliang Guan 已提交
1959
  taosTFree(pTableMeta);
H
hzcheng 已提交
1960 1961 1962
  return 0;
}

1963
// TODO multithread problem
1964 1965 1966 1967 1968 1969 1970 1971 1972 1973
static void createHBObj(STscObj* pObj) {
  if (pObj->pHb != NULL) {
    return;
  }

  SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
  if (NULL == pSql) return;

  pSql->fp = tscProcessHeartBeatRsp;

1974 1975 1976 1977 1978 1979
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetailSafely(&pSql->cmd, 0);
  if (pQueryInfo == NULL) {
    pSql->res.code = terrno;
    return;
  }

1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991
  pQueryInfo->command = TSDB_SQL_HB;

  pSql->cmd.command = pQueryInfo->command;
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
    taosTFree(pSql);
    return;
  }

  pSql->param = pObj;
  pSql->pTscObj = pObj;
  pSql->signature = pSql;

1992 1993
  registerSqlObj(pSql);
  tscDebug("%p HB is allocated, pObj:%p", pSql, pObj);
1994

1995
  pObj->pHb = pSql;
1996 1997
}

H
hzcheng 已提交
1998
int tscProcessConnectRsp(SSqlObj *pSql) {
H
Haojun Liao 已提交
1999
  char temp[TSDB_TABLE_FNAME_LEN * 2];
H
hzcheng 已提交
2000 2001 2002
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

2003
  SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
B
Bomin Zhang 已提交
2004
  tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId));  // copy acctId from response
2005 2006
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

B
Bomin Zhang 已提交
2007 2008
  assert(len <= sizeof(pObj->db));
  tstrncpy(pObj->db, temp, sizeof(pObj->db));
H
hjxilinx 已提交
2009
  
dengyihao's avatar
dengyihao 已提交
2010 2011 2012
  if (pConnect->epSet.numOfEps > 0) {
    tscEpSetHtons(&pConnect->epSet);
    tscUpdateMgmtEpSet(&pConnect->epSet);
dengyihao's avatar
dengyihao 已提交
2013
  } 
H
hzcheng 已提交
2014

S
slguan 已提交
2015
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
2016 2017
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
S
Shengliang Guan 已提交
2018
  pObj->connId = htonl(pConnect->connId);
2019 2020

  createHBObj(pObj);
2021
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
H
hzcheng 已提交
2022 2023 2024 2025 2026

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
2027
  STscObj *       pObj = pSql->pTscObj;
2028
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2029

B
Bomin Zhang 已提交
2030
  tstrncpy(pObj->db, pTableMetaInfo->name, sizeof(pObj->db));
H
hzcheng 已提交
2031 2032 2033
  return 0;
}

Y
TD-1039  
yihaoDeng 已提交
2034 2035
int tscProcessDropDbRsp(SSqlObj *pSql) {
  pSql->pTscObj->db[0] = 0;
H
Haojun Liao 已提交
2036
  taosCacheEmpty(tscMetaCache);
H
hzcheng 已提交
2037 2038 2039 2040
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
2041
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2042

H
Haojun Liao 已提交
2043
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
Haojun Liao 已提交
2044
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2045 2046 2047 2048 2049 2050 2051
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
2052 2053
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
2054
   */
2055
  tscDebug("%p force release table meta after drop table:%s", pSql, pTableMetaInfo->name);
H
Haojun Liao 已提交
2056
  taosCacheRelease(tscMetaCache, (void **)&pTableMeta, true);
H
hzcheng 已提交
2057

H
hjxilinx 已提交
2058
  if (pTableMetaInfo->pTableMeta) {
H
Haojun Liao 已提交
2059
    taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2060 2061 2062 2063 2064 2065
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
2066
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2067

H
Haojun Liao 已提交
2068
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2069
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2070 2071 2072
    return 0;
  }

2073
  tscDebug("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
H
Haojun Liao 已提交
2074
  taosCacheRelease(tscMetaCache, (void **)&pTableMeta, true);
H
hzcheng 已提交
2075

H
hjxilinx 已提交
2076
  if (pTableMetaInfo->pTableMeta) {
weixin_48148422's avatar
weixin_48148422 已提交
2077
    bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
H
Haojun Liao 已提交
2078
    taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2079

2080
    if (isSuperTable) {  // if it is a super table, reset whole query cache
2081
      tscDebug("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
H
Haojun Liao 已提交
2082
      taosCacheEmpty(tscMetaCache);
H
hzcheng 已提交
2083 2084 2085 2086 2087 2088 2089 2090 2091 2092
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}
2093 2094 2095
int tscProcessShowCreateRsp(SSqlObj *pSql) {
  return tscLocalResultCommonBuilder(pSql, 1);
}
H
hzcheng 已提交
2096 2097 2098 2099

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

2100
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
2101 2102 2103
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
2104
  pRes->data = NULL;
S
slguan 已提交
2105
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2106 2107 2108
  return 0;
}

H
hjxilinx 已提交
2109
int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
S
slguan 已提交
2110 2111 2112
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
2113
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
2114 2115 2116 2117
  if (pRetrieve == NULL) {
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pRes->code;
  }
H
hzcheng 已提交
2118 2119 2120

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
2121 2122
  pRes->offset    = htobe64(pRetrieve->offset);
  pRes->useconds  = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
2123
  pRes->completed = (pRetrieve->completed == 1);
2124
  pRes->data      = pRetrieve->data;
H
hjxilinx 已提交
2125
  
2126
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2127 2128 2129 2130
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
    return pRes->code;
  }
  
weixin_48148422's avatar
weixin_48148422 已提交
2131
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
2132
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2133
    
H
hjxilinx 已提交
2134
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
H
hjxilinx 已提交
2135 2136
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
2137 2138
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
2139
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
2140
    p += sizeof(int32_t);
S
slguan 已提交
2141
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
2142 2143
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
weixin_48148422's avatar
weixin_48148422 已提交
2144
      p += sizeof(int32_t); // skip tid
weixin_48148422's avatar
weixin_48148422 已提交
2145 2146
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
2147
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
2148
    }
2149 2150
  }

H
hzcheng 已提交
2151
  pRes->row = 0;
2152
  tscDebug("%p numOfRows:%" PRId64 ", offset:%" PRId64 ", complete:%d", pSql, pRes->numOfRows, pRes->offset, pRes->completed);
H
hzcheng 已提交
2153 2154 2155 2156

  return 0;
}

H
hjxilinx 已提交
2157
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
H
hzcheng 已提交
2158

2159
static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
S
slguan 已提交
2160 2161
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
2162
    tscError("%p malloc failed for new sqlobj to get table meta", pSql);
2163
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2164
  }
2165

H
hzcheng 已提交
2166 2167 2168
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
2169

2170
  registerSqlObj(pNew);
H
Haojun Liao 已提交
2171

2172
  tscAddSubqueryInfo(&pNew->cmd);
2173

2174
  SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0);
2175

H
hjxilinx 已提交
2176
  pNew->cmd.autoCreated = pSql->cmd.autoCreated;  // create table if not exists
B
Bomin Zhang 已提交
2177
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) {
2178
    tscError("%p malloc failed for payload to get table meta", pSql);
S
slguan 已提交
2179
    free(pNew);
2180

2181
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2182 2183
  }

H
hjxilinx 已提交
2184
  STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
2185
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
2186

B
Bomin Zhang 已提交
2187
  tstrncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, sizeof(pNewMeterMetaInfo->name));
B
Bomin Zhang 已提交
2188
  memcpy(&pNew->cmd.tagData, &pSql->cmd.tagData, sizeof(pSql->cmd.tagData));
2189
  tscDebug("%p new pSqlObj:%p to get tableMeta, auto create:%d", pSql, pNew, pNew->cmd.autoCreated);
H
hzcheng 已提交
2190

H
hjxilinx 已提交
2191 2192
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
H
hzcheng 已提交
2193

H
hjxilinx 已提交
2194 2195
  int32_t code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2196
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;  // notify upper application that current process need to be terminated
H
hzcheng 已提交
2197 2198 2199 2200 2201
  }

  return code;
}

H
hjxilinx 已提交
2202
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
H
hjxilinx 已提交
2203
  assert(strlen(pTableMetaInfo->name) != 0);
S
slguan 已提交
2204

2205
  // If this STableMetaInfo owns a table meta, release it first
H
hjxilinx 已提交
2206
  if (pTableMetaInfo->pTableMeta != NULL) {
H
Haojun Liao 已提交
2207
    taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), false);
2208 2209
  }
  
H
Haojun Liao 已提交
2210
  pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2211
  if (pTableMetaInfo->pTableMeta != NULL) {
H
hjxilinx 已提交
2212
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
2213
    tscDebug("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
2214
             tinfo.numOfTags, pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2215 2216 2217

    return TSDB_CODE_SUCCESS;
  }
2218 2219
  
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2220 2221
}

H
hjxilinx 已提交
2222
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
H
hjxilinx 已提交
2223
  pSql->cmd.autoCreated = createIfNotExists;
H
hjxilinx 已提交
2224
  return tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
2225 2226 2227
}

/**
H
Haojun Liao 已提交
2228
 * retrieve table meta from mnode, and update the local table meta cache.
H
hzcheng 已提交
2229
 * @param pSql          sql object
B
Bomin Zhang 已提交
2230
 * @param tableIndex    table index
H
hzcheng 已提交
2231 2232
 * @return              status code
 */
B
Bomin Zhang 已提交
2233
int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
H
hzcheng 已提交
2234
  SSqlCmd *pCmd = &pSql->cmd;
2235 2236

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
B
Bomin Zhang 已提交
2237
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
H
hzcheng 已提交
2238

H
Haojun Liao 已提交
2239 2240
  STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
  if (pTableMetaInfo->pTableMeta) {
2241
    tscDebug("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
2242
             tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid, pTableMeta);
H
hzcheng 已提交
2243 2244
  }

H
Haojun Liao 已提交
2245
  taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
H
Haojun Liao 已提交
2246
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2247 2248
}

H
hjxilinx 已提交
2249
static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
2250
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2251
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2252
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2253 2254
    if (pTableMetaInfo->vgroupList == NULL) {
      return false;
S
slguan 已提交
2255 2256
    }
  }
H
hjxilinx 已提交
2257 2258 2259 2260
  
  // all super tables vgroupinfo are retrieved, no need to retrieve vgroup info anymore
  return true;
}
H
hzcheng 已提交
2261

H
hjxilinx 已提交
2262
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
2263
  int      code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hjxilinx 已提交
2264 2265 2266
  SSqlCmd *pCmd = &pSql->cmd;
  
  if (allVgroupInfoRetrieved(pCmd, clauseIndex)) {
H
hzcheng 已提交
2267 2268
    return TSDB_CODE_SUCCESS;
  }
H
hjxilinx 已提交
2269

S
slguan 已提交
2270
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
2271 2272 2273
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

H
hjxilinx 已提交
2274
  pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
H
Haojun Liao 已提交
2275 2276

  // TODO TEST IT
2277 2278
  SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0);
  if (pNewQueryInfo == NULL) {
B
Bomin Zhang 已提交
2279
    tscFreeSqlObj(pNew);
2280 2281
    return code;
  }
2282
  
H
hjxilinx 已提交
2283
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2284
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2285
    STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
H
Haojun Liao 已提交
2286
    STableMeta *pTableMeta = taosCacheAcquireByData(tscMetaCache, pMInfo->pTableMeta);
H
Haojun Liao 已提交
2287
    tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList, pMInfo->pVgroupTables);
S
slguan 已提交
2288 2289 2290 2291 2292 2293
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
2294

2295
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
2296
  registerSqlObj(pNew);
H
Haojun Liao 已提交
2297

2298
  tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
H
hzcheng 已提交
2299

2300 2301 2302 2303
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
  code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
2304
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2305 2306 2307 2308 2309
  }

  return code;
}

2310
void tscInitMsgsFp() {
S
slguan 已提交
2311 2312
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
J
jtao1735 已提交
2313
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;
H
hzcheng 已提交
2314 2315

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
2316
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
2317

2318 2319
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
2320 2321

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
S
[TD-16]  
slguan 已提交
2322
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserMsg;
H
hzcheng 已提交
2323 2324 2325
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
2326
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
2327 2328 2329
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
2330
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
2331
  tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
H
hzcheng 已提交
2332 2333 2334 2335
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
H
hjxilinx 已提交
2336
  tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
H
hjxilinx 已提交
2337
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
S
slguan 已提交
2338
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
2339 2340 2341 2342

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
2343 2344 2345
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
2346 2347

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
H
hjxilinx 已提交
2348
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
H
hzcheng 已提交
2349 2350 2351 2352 2353

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
2354
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
H
hjxilinx 已提交
2355
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
S
slguan 已提交
2356
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
2357 2358

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
H
hjxilinx 已提交
2359
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode;  // rsp handled by same function.
H
hzcheng 已提交
2360
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
2361

H
Haojun Liao 已提交
2362 2363 2364 2365 2366
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB]   = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION]  = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS]  = tscProcessLocalRetrieveRsp;
2367

H
hzcheng 已提交
2368 2369
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

H
hjxilinx 已提交
2370
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp;
H
hzcheng 已提交
2371 2372 2373 2374

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

2375 2376 2377 2378
  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp;
  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp;
  

H
hzcheng 已提交
2379 2380 2381 2382 2383 2384
  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;
}