tscServer.c 82.1 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17
#include "tcache.h"
H
Haojun Liao 已提交
18
#include "tcmdtype.h"
H
hzcheng 已提交
19
#include "trpc.h"
20 21
#include "tscLocalMerge.h"
#include "tscLog.h"
H
hzcheng 已提交
22 23 24 25 26
#include "tscProfile.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "ttimer.h"
dengyihao's avatar
dengyihao 已提交
27
#include "tlockfree.h"
H
hzcheng 已提交
28

29
SRpcCorEpSet  tscMgmtEpSet;
S
slguan 已提交
30

31 32
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
33 34 35
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
36

B
Bomin Zhang 已提交
37
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt);
38 39
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
40

S
slguan 已提交
41
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
Haojun Liao 已提交
42 43 44 45 46 47 48 49
static int32_t getWaitingTimeInterval(int32_t count) {
  int32_t initial = 100; // 100 ms by default
  if (count <= 1) {
    return 0;
  }

  return initial * (2<<(count - 2));
}
H
hzcheng 已提交
50

51
static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
52 53
  assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);

54
  SRpcEpSet* pEpSet = &pSql->epSet;
H
Haojun Liao 已提交
55 56 57 58

  // Issue the query to one of the vnode among a vgroup randomly.
  // change the inUse property would not affect the isUse attribute of STableMeta
  pEpSet->inUse = rand() % pVgroupInfo->numOfEps;
59 60 61

  // apply the FQDN string length check here
  bool hasFqdn = false;
dengyihao's avatar
bugfix  
dengyihao 已提交
62

63 64 65 66
  pEpSet->numOfEps = pVgroupInfo->numOfEps;
  for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
    strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn);
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
67 68 69 70

    if (!hasFqdn) {
      hasFqdn = (strlen(pEpSet->fqdn[i]) > 0);
    }
71
  }
72 73

  assert(hasFqdn);
74
}
75

dengyihao's avatar
dengyihao 已提交
76 77 78 79
static void tscDumpMgmtEpSet(SRpcEpSet *epSet) {
  taosCorBeginRead(&tscMgmtEpSet.version);
  *epSet = tscMgmtEpSet.epSet;
  taosCorEndRead(&tscMgmtEpSet.version);
80
}  
dengyihao's avatar
dengyihao 已提交
81 82
static void tscEpSetHtons(SRpcEpSet *s) {
   for (int32_t i = 0; i < s->numOfEps; i++) {
dengyihao's avatar
fixbug  
dengyihao 已提交
83 84 85
      s->port[i] = htons(s->port[i]);    
   }
} 
dengyihao's avatar
dengyihao 已提交
86 87
bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
   if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
88 89
     return false;
   } 
dengyihao's avatar
dengyihao 已提交
90
   for (int32_t i = 0; i < s1->numOfEps; i++) {
91 92 93 94 95
     if (s1->port[i] != s2->port[i] 
        || strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0)
        return false;
   }
   return true;
S
slguan 已提交
96
}
dengyihao's avatar
dengyihao 已提交
97
void tscUpdateMgmtEpSet(SRpcEpSet *pEpSet) {
98
  // no need to update if equal
dengyihao's avatar
dengyihao 已提交
99 100 101
  taosCorBeginWrite(&tscMgmtEpSet.version);
  tscMgmtEpSet.epSet = *pEpSet;
  taosCorEndWrite(&tscMgmtEpSet.version);
102
}
dengyihao's avatar
dengyihao 已提交
103
static void tscDumpEpSetFromVgroupInfo(SCMCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
104
  if (pVgroupInfo == NULL) { return;}
dengyihao's avatar
bugfix  
dengyihao 已提交
105
  taosCorBeginRead(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
106
  int8_t inUse = pVgroupInfo->inUse;
dengyihao's avatar
dengyihao 已提交
107 108 109
  pEpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0; 
  pEpSet->numOfEps = pVgroupInfo->numOfEps;  
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
Y
yihaoDeng 已提交
110
    tstrncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, sizeof(pEpSet->fqdn[i]));
dengyihao's avatar
dengyihao 已提交
111
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
S
slguan 已提交
112
  }
dengyihao's avatar
bugfix  
dengyihao 已提交
113
  taosCorEndRead(&pVgroupInfo->version);
S
slguan 已提交
114 115
}

dengyihao's avatar
dengyihao 已提交
116
static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
117 118
  SSqlCmd *pCmd = &pObj->cmd;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
dengyihao's avatar
dengyihao 已提交
119
  if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;}
dengyihao's avatar
bugfix  
dengyihao 已提交
120
  SCMCorVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->corVgroupInfo;
dengyihao's avatar
dengyihao 已提交
121

dengyihao's avatar
bugfix  
dengyihao 已提交
122
  taosCorBeginWrite(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
123
  tscDebug("before: Endpoint in use: %d", pVgroupInfo->inUse);
124 125
  pVgroupInfo->inUse = pEpSet->inUse;
  pVgroupInfo->numOfEps = pEpSet->numOfEps;
dengyihao's avatar
dengyihao 已提交
126
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; i++) {
H
Haojun Liao 已提交
127 128
    taosTFree(pVgroupInfo->epAddr[i].fqdn);
    pVgroupInfo->epAddr[i].fqdn = strndup(pEpSet->fqdn[i], tListLen(pEpSet->fqdn[i]));
129
    pVgroupInfo->epAddr[i].port = pEpSet->port[i];
dengyihao's avatar
dengyihao 已提交
130
  }
H
Haojun Liao 已提交
131

dengyihao's avatar
dengyihao 已提交
132
  tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse);
dengyihao's avatar
bugfix  
dengyihao 已提交
133
  taosCorEndWrite(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
134
}
H
Haojun Liao 已提交
135

136 137 138 139
void tscPrintMgmtEp() {
  SRpcEpSet dump;
  tscDumpMgmtEpSet(&dump);
  if (dump.numOfEps <= 0) {
dengyihao's avatar
dengyihao 已提交
140
    tscError("invalid mnode EP list:%d", dump.numOfEps);
S
slguan 已提交
141
  } else {
142
    for (int i = 0; i < dump.numOfEps; ++i) {
143
      tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]);
S
slguan 已提交
144
    }
S
slguan 已提交
145
  }
S
slguan 已提交
146 147
}

H
hzcheng 已提交
148 149 150
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
H
Haojun Liao 已提交
151

H
hzcheng 已提交
152
  if (pObj != pObj->signature) {
H
Haojun Liao 已提交
153
    tscError("heartbeat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
H
hzcheng 已提交
154 155 156
    return;
  }

H
Haojun Liao 已提交
157
  SSqlObj *pSql = tres;
H
hzcheng 已提交
158 159 160
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
161
    SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
dengyihao's avatar
dengyihao 已提交
162 163 164 165
    SRpcEpSet *      epSet = &pRsp->epSet;
    if (epSet->numOfEps > 0) {
      tscEpSetHtons(epSet);
      tscUpdateMgmtEpSet(epSet);
dengyihao's avatar
dengyihao 已提交
166
    } 
S
slguan 已提交
167

S
Shengliang Guan 已提交
168 169
    pSql->pTscObj->connId = htonl(pRsp->connId);

H
hzcheng 已提交
170 171
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
172
      return;
H
hzcheng 已提交
173
    } else {
S
slguan 已提交
174 175
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
176 177
    }
  } else {
H
Haojun Liao 已提交
178
    tscDebug("%p heartbeat failed, code:%s", pObj->pHb, tstrerror(code));
H
hzcheng 已提交
179 180
  }

H
Haojun Liao 已提交
181
  if (pObj->pHb != NULL) {
182
    int32_t waitingDuring = tsShellActivityTimer * 500;
H
Haojun Liao 已提交
183
    tscDebug("%p send heartbeat in %dms", pSql, waitingDuring);
H
Haojun Liao 已提交
184 185 186 187 188

    taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer);
  } else {
    tscDebug("%p start to close tscObj:%p, not send heartbeat again", pSql, pObj);
  }
H
hzcheng 已提交
189 190 191 192
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
  STscObj *pObj = (STscObj *)handle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
193 194 195 196

  int ret = taosAcquireRef(tscRefId, pObj);
  if (ret < 0) {
    tscTrace("%p failed to acquire TSC obj, reason:%s", pObj, tstrerror(ret));
197 198
    return;
  }
H
hzcheng 已提交
199

200
  SSqlObj* pHB = pObj->pHb;
H
hzcheng 已提交
201

H
Haojun Liao 已提交
202
  void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE));
H
Haojun Liao 已提交
203 204
  if (p == NULL) {
    tscWarn("%p HB object has been released already", pHB);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
205
    taosReleaseRef(tscRefId, pObj);
H
Haojun Liao 已提交
206 207 208 209 210
    return;
  }

  assert(*pHB->self == pHB);

211
  pHB->retry = 0;
H
Haojun Liao 已提交
212 213 214 215 216
  int32_t code = tscProcessSql(pHB);
  taosCacheRelease(tscObjCache, (void**) &p, false);

  if (code != TSDB_CODE_SUCCESS) {
    tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
H
hzcheng 已提交
217
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
218 219

  taosReleaseRef(tscRefId, pObj);
H
hzcheng 已提交
220 221 222
}

int tscSendMsgToServer(SSqlObj *pSql) {
223
  STscObj* pObj = pSql->pTscObj;
H
hjxilinx 已提交
224 225 226
  SSqlCmd* pCmd = &pSql->cmd;
  
  char *pMsg = rpcMallocCont(pCmd->payloadLen);
S
slguan 已提交
227
  if (NULL == pMsg) {
228
    tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]);
229
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
230 231
  }

232 233
  // set the mgmt ip list
  if (pSql->cmd.command >= TSDB_SQL_MGMT) {
dengyihao's avatar
dengyihao 已提交
234
    tscDumpMgmtEpSet(&pSql->epSet);
J
jtao1735 已提交
235 236
  }

237
  memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
238

J
jtao1735 已提交
239
  SRpcMsg rpcMsg = {
240 241 242
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
      .contLen = pSql->cmd.payloadLen,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
243 244
      .ahandle = pSql,
      .handle  = &pSql->pRpcCtx,
H
hjxilinx 已提交
245
      .code    = 0
J
jtao1735 已提交
246
  };
H
Haojun Liao 已提交
247

H
Haojun Liao 已提交
248 249 250 251
  // NOTE: the rpc context should be acquired before sending data to server.
  // Otherwise, the pSql object may have been released already during the response function, which is
  // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
  // cause crash.
H
Haojun Liao 已提交
252 253
  rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg);
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
254 255
}

256
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
S
TD-1530  
Shengliang Guan 已提交
257 258
  TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
  void** p = taosCacheAcquireByKey(tscObjCache, &handle, sizeof(TSDB_CACHE_PTR_TYPE));
H
Haojun Liao 已提交
259 260
  if (p == NULL) {
    rpcFreeCont(rpcMsg->pCont);
261 262
    return;
  }
263

H
Haojun Liao 已提交
264 265 266
  SSqlObj* pSql = *p;
  assert(pSql != NULL);

H
Haojun Liao 已提交
267
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
268 269
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
270

H
Haojun Liao 已提交
271
  assert(*pSql->self == pSql);
H
Haojun Liao 已提交
272
  pSql->pRpcCtx = NULL;
H
Haojun Liao 已提交
273

H
Haojun Liao 已提交
274
  if (pObj->signature != pObj) {
275
    tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
H
Haojun Liao 已提交
276

H
Haojun Liao 已提交
277
    taosCacheRelease(tscObjCache, (void**) &p, true);
H
Haojun Liao 已提交
278 279 280 281 282 283
    rpcFreeCont(rpcMsg->pCont);
    return;
  }

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
284
    tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
285
        pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
H
Haojun Liao 已提交
286

H
Haojun Liao 已提交
287 288 289
    void** p1 = p;
    taosCacheRelease(tscObjCache, (void**) &p1, false);

H
Haojun Liao 已提交
290
    taosCacheRelease(tscObjCache, (void**) &p, true);
291
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
292
    return;
H
hzcheng 已提交
293 294
  }

H
Haojun Liao 已提交
295
  if (pEpSet) {
dengyihao's avatar
dengyihao 已提交
296
    if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
H
Haojun Liao 已提交
297 298
      if (pCmd->command < TSDB_SQL_MGMT) {
        tscUpdateVgroupInfo(pSql, pEpSet);
dengyihao's avatar
dengyihao 已提交
299
      } else {
dengyihao's avatar
dengyihao 已提交
300
        tscUpdateMgmtEpSet(pEpSet);
H
Haojun Liao 已提交
301
      }
dengyihao's avatar
dengyihao 已提交
302
    }
J
jtao1735 已提交
303 304
  }

305 306 307 308 309
  int32_t cmd = pCmd->command;
  if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
      (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
       rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
       rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL ||
S
Shengliang Guan 已提交
310
       rpcMsg->code == TSDB_CODE_APP_NOT_READY ||
311 312 313 314 315 316 317
       rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) {
    tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);

    // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
    if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
      pSql->cmd.submitSchema = 1;
    }
318

319 320 321 322
    pSql->res.code = rpcMsg->code;  // keep the previous error code
    if (pSql->retry > pSql->maxRetry) {
      tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
    } else {
H
Haojun Liao 已提交
323
      // wait for a little bit moment and then retry, todo do not sleep in rpc callback thread
H
Haojun Liao 已提交
324 325 326 327 328
      if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
        int32_t duration = getWaitingTimeInterval(pSql->retry);
        taosMsleep(duration);
      }

B
Bomin Zhang 已提交
329
      rpcMsg->code = tscRenewTableMeta(pSql, 0);
330 331 332

      // if there is an error occurring, proceed to the following error handling procedure.
      if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
H
Haojun Liao 已提交
333
        taosCacheRelease(tscObjCache, (void**) &p, false);
334 335
        rpcFreeCont(rpcMsg->pCont);
        return;
H
hzcheng 已提交
336 337
      }
    }
S
slguan 已提交
338
  }
339

H
hzcheng 已提交
340
  pRes->rspLen = 0;
341
  
H
Haojun Liao 已提交
342
  if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
343
    tscDebug("%p query is cancelled, code:%s", pSql, tstrerror(pRes->code));
H
Haojun Liao 已提交
344 345
  } else {
    pRes->code = rpcMsg->code;
H
hzcheng 已提交
346 347
  }

S
slguan 已提交
348
  if (pRes->code == TSDB_CODE_SUCCESS) {
349
    tscDebug("%p reset retry counter to be 0 due to success rsp, old:%d", pSql, pSql->retry);
S
slguan 已提交
350 351 352
    pSql->retry = 0;
  }

353
  if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
354
    assert(rpcMsg->msgType == pCmd->msgType + 1);
355
    pRes->code    = rpcMsg->code;
356
    pRes->rspType = rpcMsg->msgType;
357
    pRes->rspLen  = rpcMsg->contLen;
H
hzcheng 已提交
358

359
    if (pRes->rspLen > 0 && rpcMsg->pCont) {
360 361
      char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
      if (tmp == NULL) {
362
        pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
363 364
      } else {
        pRes->pRsp = tmp;
365
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
366
      }
367 368
    } else {
      pRes->pRsp = NULL;
S
slguan 已提交
369 370
    }

H
hzcheng 已提交
371 372 373 374
    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
375
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
376
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
377 378 379 380 381 382 383
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
384
      tscDebug("%p SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql, sqlCmd[pCmd->command], 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
385
          tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
S
slguan 已提交
386
    } else {
387
      tscDebug("%p SQL cmd:%s, code:%s rspLen:%d", pSql, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
H
hzcheng 已提交
388 389
    }
  }
390
  
H
Haojun Liao 已提交
391
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
392
    rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
Haojun Liao 已提交
393
  }
S
Shengliang Guan 已提交
394

H
Haojun Liao 已提交
395
  bool shouldFree = tscShouldBeFreed(pSql);
396
  if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
S
TD-1057  
Shengliang Guan 已提交
397
    rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
H
Haojun Liao 已提交
398
    (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
H
hzcheng 已提交
399 400
  }

H
Haojun Liao 已提交
401
  void** p1 = p;
H
Haojun Liao 已提交
402
  taosCacheRelease(tscObjCache, (void**) &p1, false);
H
Haojun Liao 已提交
403

H
Haojun Liao 已提交
404 405
  if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it
    taosCacheRelease(tscObjCache, (void **)&p, true);
H
Haojun Liao 已提交
406 407 408
    tscDebug("%p sqlObj is automatically freed", pSql);
  }

409
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
410 411
}

S
slguan 已提交
412 413 414
int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
415

H
hjxilinx 已提交
416 417 418 419 420 421 422
  if (pCmd->command == TSDB_SQL_SELECT ||
      pCmd->command == TSDB_SQL_FETCH ||
      pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT ||
      pCmd->command == TSDB_SQL_CONNECT ||
      pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META ||
H
hjxilinx 已提交
423
      pCmd->command == TSDB_SQL_STABLEVGROUP) {
424 425 426 427 428 429
    pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
  }
  
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return pRes->code;
S
slguan 已提交
430
  }
431

432 433 434
  int32_t code = tscSendMsgToServer(pSql);

  // NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads.
435
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
436
    pRes->code = code;
H
hjxilinx 已提交
437
    tscQueueAsyncRes(pSql);
H
Haojun Liao 已提交
438
    return code;
S
slguan 已提交
439
  }
H
hjxilinx 已提交
440 441
  
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
442 443 444
}

int tscProcessSql(SSqlObj *pSql) {
445
  char    *name = NULL;
446
  SSqlCmd *pCmd = &pSql->cmd;
447
  
448
  SQueryInfo     *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
449
  STableMetaInfo *pTableMetaInfo = NULL;
450
  uint32_t        type = 0;
451

452
  if (pQueryInfo != NULL) {
H
hjxilinx 已提交
453
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
454
    name = (pTableMetaInfo != NULL)? pTableMetaInfo->name:NULL;
455
    type = pQueryInfo->type;
456

H
hjxilinx 已提交
457
    // while numOfTables equals to 0, it must be Heartbeat
H
hjxilinx 已提交
458
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
459
  }
460

461
  tscDebug("%p SQL cmd:%s will be processed, name:%s, type:%d", pSql, sqlCmd[pCmd->command], name, type);
H
hjxilinx 已提交
462
  if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL
H
hjxilinx 已提交
463
    if (pTableMetaInfo == NULL) {
464
      pSql->res.code = TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
465 466
      return pSql->res.code;
    }
H
hjxilinx 已提交
467
  } else if (pCmd->command < TSDB_SQL_LOCAL) {
468
    //pSql->epSet = tscMgmtEpSet;
H
hzcheng 已提交
469 470 471
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }
472
  
S
slguan 已提交
473 474
  return doProcessSql(pSql);
}
H
hzcheng 已提交
475

J
jtao1735 已提交
476
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
477
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
S
slguan 已提交
478
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
S
slguan 已提交
479

H
Haojun Liao 已提交
480
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
S
slguan 已提交
481
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
482

483
  // todo valid the vgroupId at the client side
H
hjxilinx 已提交
484 485
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  
weixin_48148422's avatar
weixin_48148422 已提交
486
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
487
    int32_t vgIndex = pTableMetaInfo->vgroupIndex;
H
Haojun Liao 已提交
488 489 490 491 492 493 494 495 496
    if (pTableMetaInfo->pVgroupTables == NULL) {
      SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList;
      assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);

      pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
      tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex);
    } else {
      int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
      assert(vgIndex >= 0 && vgIndex < numOfVgroups);
H
Haojun Liao 已提交
497

H
Haojun Liao 已提交
498 499 500 501 502
      SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);

      pRetrieveMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId);
      tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex);
    }
503
  } else {
H
hjxilinx 已提交
504
    STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
505
    pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
H
Haojun Liao 已提交
506
    tscDebug("%p build fetch msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId);
507
  }
508 509

  pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg);
J
jtao1735 已提交
510
  pSql->cmd.msgType = TSDB_MSG_TYPE_FETCH;
511 512 513

  pRetrieveMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));

514
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
515 516
}

517
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hjxilinx 已提交
518
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
519
  STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
H
hjxilinx 已提交
520
  
521
  char* pMsg = pSql->cmd.payload;
522 523 524
  
  // NOTE: shell message size should not include SMsgDesc
  int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
525 526
  int32_t vgId = pTableMeta->vgroupInfo.vgId;

527
  SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
528 529
  pMsgDesc->numOfVnodes = htonl(1); // always one vnode

530
  pMsg += sizeof(SMsgDesc);
531
  SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
532

H
hjxilinx 已提交
533
  pShellMsg->header.vgId = htonl(vgId);
H
Haojun Liao 已提交
534
  pShellMsg->header.contLen = htonl(size);      // the length not includes the size of SMsgDesc
535
  pShellMsg->length = pShellMsg->header.contLen;
H
hjxilinx 已提交
536
  
H
Haojun Liao 已提交
537
  pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit);  // number of tables to be inserted
H
hzcheng 已提交
538

H
hjxilinx 已提交
539
  // pSql->cmd.payloadLen is set during copying data into payload
S
slguan 已提交
540
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
dengyihao's avatar
dengyihao 已提交
541
  tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet);
542

543 544
  tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
      pSql->epSet.numOfEps);
545
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
546 547 548
}

/*
549
 * for table query, simply return the size <= 1k
H
hzcheng 已提交
550
 */
551
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
552
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
553
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
554

S
TD-1057  
Shengliang Guan 已提交
555
  int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo));
H
Haojun Liao 已提交
556 557

  size_t  numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
S
TD-1057  
Shengliang Guan 已提交
558
  int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs);
H
Haojun Liao 已提交
559

H
Haojun Liao 已提交
560 561 562 563 564 565 566 567 568 569 570 571 572 573 574
  int32_t tsBufSize = (pQueryInfo->tsBuf != NULL) ? pQueryInfo->tsBuf->fileSize : 0;

  int32_t tableSerialize = 0;
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  if (pTableMetaInfo->pVgroupTables != NULL) {
    size_t numOfGroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables);

    int32_t totalTables = 0;
    for (int32_t i = 0; i < numOfGroups; ++i) {
      SVgroupTableInfo *pTableInfo = taosArrayGet(pTableMetaInfo->pVgroupTables, i);
      totalTables += taosArrayGetSize(pTableInfo->itemList);
    }

    tableSerialize = totalTables * sizeof(STableIdInfo);
  }
H
Haojun Liao 已提交
575

H
Haojun Liao 已提交
576 577
  return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + tsBufSize +
         tableSerialize + 4096;
H
hzcheng 已提交
578 579
}

580
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
581
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
B
Bomin Zhang 已提交
582
  TSKEY dfltKey = htobe64(pQueryMsg->window.skey);
583

H
hjxilinx 已提交
584
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
weixin_48148422's avatar
weixin_48148422 已提交
585
  if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
586 587
    
    SCMVgroupInfo* pVgroupInfo = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
588
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
589 590
      int32_t index = pTableMetaInfo->vgroupIndex;
      assert(index >= 0);
H
hjxilinx 已提交
591
  
B
Bomin Zhang 已提交
592
      if (pTableMetaInfo->vgroupList->numOfVgroups > 0) {
H
Haojun Liao 已提交
593
        assert(index < pTableMetaInfo->vgroupList->numOfVgroups);
B
Bomin Zhang 已提交
594 595
        pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
      }
H
Haojun Liao 已提交
596
      tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups);
H
hjxilinx 已提交
597 598
    } else {
      pVgroupInfo = &pTableMeta->vgroupInfo;
599
    }
weixin_48148422's avatar
weixin_48148422 已提交
600

601 602 603 604
    assert(pVgroupInfo != NULL);

    tscSetDnodeEpSet(pSql, pVgroupInfo);
    pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
weixin_48148422's avatar
weixin_48148422 已提交
605

606
    STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
607 608 609
    pTableIdInfo->tid = htonl(pTableMeta->id.tid);
    pTableIdInfo->uid = htobe64(pTableMeta->id.uid);
    pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->id.uid, dfltKey));
weixin_48148422's avatar
weixin_48148422 已提交
610

611 612
    pQueryMsg->numOfTables = htonl(1);  // set the number of tables
    pMsg += sizeof(STableIdInfo);
613
  } else { // it is a subquery of the super table query, this EP info is acquired from vgroupInfo
614
    int32_t index = pTableMetaInfo->vgroupIndex;
S
TD-1057  
Shengliang Guan 已提交
615
    int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
616
    assert(index >= 0 && index < numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
617

618
    tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
619

620
    SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
621

dengyihao's avatar
bugfix  
dengyihao 已提交
622
    // set the vgroup info 
623
    tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo);
624 625
    pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
    
S
TD-1057  
Shengliang Guan 已提交
626
    int32_t numOfTables = (int32_t)taosArrayGetSize(pTableIdList->itemList);
627 628 629 630 631 632 633 634 635
    pQueryMsg->numOfTables = htonl(numOfTables);  // set the number of tables
  
    // serialize each table id info
    for(int32_t i = 0; i < numOfTables; ++i) {
      STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i);
      
      STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
      pTableIdInfo->tid = htonl(pItem->tid);
      pTableIdInfo->uid = htobe64(pItem->uid);
B
Bomin Zhang 已提交
636
      pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pItem->uid, dfltKey));
637 638 639 640
      pMsg += sizeof(STableIdInfo);
    }
  }
  
H
Haojun Liao 已提交
641 642
  tscDebug("%p vgId:%d, query on table:%s, tid:%d, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name,
      pTableMeta->id.tid, pTableMeta->id.uid);
H
hjxilinx 已提交
643
  
644 645 646
  return pMsg;
}

647
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
648 649
  SSqlCmd *pCmd = &pSql->cmd;

650
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
651

S
slguan 已提交
652 653
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
H
Haojun Liao 已提交
654
    return TSDB_CODE_TSC_INVALID_SQL;  // todo add test for this
S
slguan 已提交
655
  }
656
  
657
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
658
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
659
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
660 661 662

  size_t numOfSrcCols = taosArrayGetSize(pQueryInfo->colList);
  if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo)) {
S
Shengliang Guan 已提交
663
    tscError("%p illegal value of numOfCols in query msg: %" PRIu64 ", table cols:%d", pSql, (uint64_t)numOfSrcCols,
664 665
        tscGetNumOfColumns(pTableMeta));

H
Haojun Liao 已提交
666
    return TSDB_CODE_TSC_INVALID_SQL;
667
  }
668
  
669
  if (pQueryInfo->interval.interval < 0) {
S
TD-1530  
Shengliang Guan 已提交
670
    tscError("%p illegal value of aggregation time interval in query msg: %" PRId64, pSql, (int64_t)pQueryInfo->interval.interval);
H
Haojun Liao 已提交
671
    return TSDB_CODE_TSC_INVALID_SQL;
672 673 674 675
  }
  
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
H
Haojun Liao 已提交
676
    return TSDB_CODE_TSC_INVALID_SQL;
677
  }
678

679
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
H
hzcheng 已提交
680

S
TD-1057  
Shengliang Guan 已提交
681
  int32_t numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList);
682
  
683
  if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
H
hjxilinx 已提交
684 685
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey);
H
hzcheng 已提交
686
  } else {
H
hjxilinx 已提交
687 688
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.ekey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey);
H
hzcheng 已提交
689 690
  }

691 692
  pQueryMsg->order          = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId     = htons(pQueryInfo->order.orderColId);
H
Haojun Liao 已提交
693
  pQueryMsg->fillType       = htons(pQueryInfo->fillType);
694 695
  pQueryMsg->limit          = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset         = htobe64(pQueryInfo->limit.offset);
S
TD-1057  
Shengliang Guan 已提交
696
  pQueryMsg->numOfCols      = htons((int16_t)taosArrayGetSize(pQueryInfo->colList));
H
Haojun Liao 已提交
697 698
  pQueryMsg->interval.interval = htobe64(pQueryInfo->interval.interval);
  pQueryMsg->interval.sliding  = htobe64(pQueryInfo->interval.sliding);
699 700
  pQueryMsg->interval.offset   = htobe64(pQueryInfo->interval.offset);
  pQueryMsg->interval.intervalUnit = pQueryInfo->interval.intervalUnit;
H
Haojun Liao 已提交
701 702
  pQueryMsg->interval.slidingUnit  = pQueryInfo->interval.slidingUnit;
  pQueryMsg->interval.offsetUnit   = pQueryInfo->interval.offsetUnit;
703
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
704
  pQueryMsg->numOfTags      = htonl(numOfTags);
weixin_48148422's avatar
weixin_48148422 已提交
705
  pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
H
Haojun Liao 已提交
706
  pQueryMsg->queryType      = htonl(pQueryInfo->type);
H
hjxilinx 已提交
707 708
  
  size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
S
TD-1057  
Shengliang Guan 已提交
709
  pQueryMsg->numOfOutput = htons((int16_t)numOfOutput);
H
hzcheng 已提交
710 711

  // set column list ids
712 713
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
H
hjxilinx 已提交
714
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
715
  
716 717 718
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
    SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
719

720
    if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
721
        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
722 723
      tscError("%p tid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
          pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
724 725
               pColSchema->name);

726
      return TSDB_CODE_TSC_INVALID_SQL;
727
    }
H
hzcheng 已提交
728 729 730

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
731
    pQueryMsg->colList[i].type  = htons(pColSchema->type);
S
slguan 已提交
732
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
733

S
slguan 已提交
734 735 736
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
737

S
slguan 已提交
738
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
739
      pFilterMsg->filterstr = htons(pColFilter->filterstr);
S
slguan 已提交
740 741

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
742

743
      if (pColFilter->filterstr) {
S
slguan 已提交
744
        pFilterMsg->len = htobe64(pColFilter->len);
745
        memcpy(pMsg, (void *)pColFilter->pz, (size_t)(pColFilter->len + 1));
S
slguan 已提交
746 747 748 749 750 751 752 753
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
754

S
slguan 已提交
755 756
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
H
Haojun Liao 已提交
757
        return TSDB_CODE_TSC_INVALID_SQL;
S
slguan 已提交
758 759
      }
    }
H
hzcheng 已提交
760 761
  }

H
hjxilinx 已提交
762
  SSqlFuncMsg *pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hjxilinx 已提交
763
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
764
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
765

766
    if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
H
hzcheng 已提交
767
      tscError("%p table schema is not matched with parsed sql", pSql);
H
Haojun Liao 已提交
768
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
769 770
    }

771 772 773
    pSqlFuncExpr->colInfo.colId    = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
    pSqlFuncExpr->colInfo.flag     = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
774

775
    pSqlFuncExpr->functionId  = htons(pExpr->functionId);
H
hzcheng 已提交
776
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
H
hjxilinx 已提交
777
    pMsg += sizeof(SSqlFuncMsg);
H
hzcheng 已提交
778 779

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
H
Haojun Liao 已提交
780
      // todo add log
H
hzcheng 已提交
781 782 783 784 785
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
786
        pMsg += pExpr->param[j].nLen;
H
hzcheng 已提交
787 788 789 790 791
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

H
hjxilinx 已提交
792
    pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hzcheng 已提交
793
  }
794
  
795
  // serialize the table info (sid, uid, tags)
796 797
  pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
  
798
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
799
  if (pGroupbyExpr->numOfGroupCols > 0) {
S
slguan 已提交
800
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
801 802
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
803
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
804 805
      SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
  
S
slguan 已提交
806 807 808
      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

809 810
      *((int16_t *)pMsg) += pCol->colIndex;
      pMsg += sizeof(pCol->colIndex);
S
slguan 已提交
811 812 813

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
814 815 816
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
817 818 819
    }
  }

820
  if (pQueryInfo->fillType != TSDB_FILL_NONE) {
H
hjxilinx 已提交
821
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
822 823
      *((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]);
      pMsg += sizeof(pQueryInfo->fillVal[0]);
S
slguan 已提交
824 825
    }
  }
826 827 828 829 830 831 832 833 834
  
  if (numOfTags != 0) {
    int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
    int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
    int32_t total = numOfTagColumns + numOfColumns;
    
    pSchema = tscGetTableTagSchema(pTableMeta);
    
    for (int32_t i = 0; i < numOfTags; ++i) {
H
hjxilinx 已提交
835
      SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
836 837 838 839
      SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];

      if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
          (pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
840 841
        tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
                 pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, total, numOfTagColumns,
B
Bomin Zhang 已提交
842
                 pCol->colIndex.columnIndex, pColSchema->name);
843

844
        return TSDB_CODE_TSC_INVALID_SQL;
845 846 847 848 849 850 851 852 853 854 855 856
      }
  
      SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
  
      pTagCol->colId = htons(pColSchema->colId);
      pTagCol->bytes = htons(pColSchema->bytes);
      pTagCol->type  = htons(pColSchema->type);
      pTagCol->numOfFilters = 0;
      
      pMsg += sizeof(SColumnInfo);
    }
  }
S
slguan 已提交
857

H
Haojun Liao 已提交
858 859 860 861
  // serialize tag column query condition
  if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
    STagCond* pTagCond = &pQueryInfo->tagCond;
    
862
    SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid);
H
Haojun Liao 已提交
863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878
    if (pCond != NULL && pCond->cond != NULL) {
      pQueryMsg->tagCondLen = htons(pCond->len);
      memcpy(pMsg, pCond->cond, pCond->len);
      
      pMsg += pCond->len;
    }
  }
  
  if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
    *pMsg = 0;
    pMsg++;
  } else {
    strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
    pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
  }

S
slguan 已提交
879
  // compressed ts block
S
TD-1057  
Shengliang Guan 已提交
880
  pQueryMsg->tsOffset = htonl((int32_t)(pMsg - pCmd->payload));
S
slguan 已提交
881 882 883
  int32_t tsLen = 0;
  int32_t numOfBlocks = 0;

884
  if (pQueryInfo->tsBuf != NULL) {
H
Haojun Liao 已提交
885 886
    int32_t vnodeId = htonl(pQueryMsg->head.vgId);
    STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, vnodeId);
887
    assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL);  // this query should not be sent
S
slguan 已提交
888 889

    // todo refactor
B
Bomin Zhang 已提交
890 891 892 893 894
    if (fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET) != 0) {
      int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
      tscError("%p: fseek failed: %s", pSql, tstrerror(code));
      return code;
    }
H
Haojun Liao 已提交
895 896 897

    size_t s = fread(pMsg, 1, pBlockInfo->compLen, pQueryInfo->tsBuf->f);
    if (s != pBlockInfo->compLen) {
B
Bomin Zhang 已提交
898 899 900 901
      int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
      tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
      return code;
    }
S
slguan 已提交
902 903 904 905

    pMsg += pBlockInfo->compLen;
    tsLen = pBlockInfo->compLen;
    numOfBlocks = pBlockInfo->numOfBlocks;
H
hzcheng 已提交
906 907
  }

S
slguan 已提交
908 909
  pQueryMsg->tsLen = htonl(tsLen);
  pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
910 911
  if (pQueryInfo->tsBuf != NULL) {
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
H
hzcheng 已提交
912 913
  }

S
TD-1057  
Shengliang Guan 已提交
914
  int32_t msgLen = (int32_t)(pMsg - pCmd->payload);
H
hzcheng 已提交
915

916
  tscDebug("%p msg built success,len:%d bytes", pSql, msgLen);
H
hzcheng 已提交
917
  pCmd->payloadLen = msgLen;
S
slguan 已提交
918
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hjxilinx 已提交
919
  
920
  pQueryMsg->head.contLen = htonl(msgLen);
S
TD-1057  
Shengliang Guan 已提交
921
  assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize);
922 923

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
924 925
}

926 927
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
928
  pCmd->payloadLen = sizeof(SCMCreateDbMsg);
S
slguan 已提交
929
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
H
hzcheng 已提交
930

931
  SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload;
932

933
  assert(pCmd->numOfClause == 1);
934
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
935
  tstrncpy(pCreateDbMsg->db, pTableMetaInfo->name, sizeof(pCreateDbMsg->db));
H
hzcheng 已提交
936

937
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
938 939
}

940 941
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
942
  pCmd->payloadLen = sizeof(SCMCreateDnodeMsg);
S
slguan 已提交
943 944
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
945
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
946
  }
H
hzcheng 已提交
947

948
  SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload;
J
jtao1735 已提交
949 950
  strncpy(pCreate->ep, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
  
S
slguan 已提交
951
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
H
hzcheng 已提交
952

953
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
954 955
}

956 957
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
958
  pCmd->payloadLen = sizeof(SCMCreateAcctMsg);
S
slguan 已提交
959 960
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
961
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
962
  }
H
hzcheng 已提交
963

964
  SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
965

H
Haojun Liao 已提交
966 967
  SStrToken *pName = &pInfo->pDCLInfo->user.user;
  SStrToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
968

969 970
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
971

972
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
973

974 975 976 977 978 979 980 981
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
982

983 984 985 986 987 988 989 990 991 992 993 994 995
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
996

S
slguan 已提交
997
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
998
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
999 1000
}

1001 1002
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1003
  pCmd->payloadLen = sizeof(SCMCreateUserMsg);
S
slguan 已提交
1004

S
slguan 已提交
1005 1006
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1007
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1008 1009
  }

1010
  SCMCreateUserMsg *pAlterMsg = (SCMCreateUserMsg*)pCmd->payload;
H
hzcheng 已提交
1011

1012 1013
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
S
TD-1057  
Shengliang Guan 已提交
1014
  pAlterMsg->flag = (int8_t)pUser->type;
H
hzcheng 已提交
1015

1016 1017 1018 1019
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
1020 1021
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
1022
  }
H
hzcheng 已提交
1023

1024
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
S
slguan 已提交
1025
    pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
1026
  } else {
S
slguan 已提交
1027
    pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
1028
  }
H
hzcheng 已提交
1029

1030
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1031 1032
}

1033 1034
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1035
  pCmd->payloadLen = sizeof(SCMCfgDnodeMsg);
guanshengliang's avatar
guanshengliang 已提交
1036
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONFIG_DNODE;
1037 1038
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
1039

1040 1041
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1042
  pCmd->payloadLen = sizeof(SCMDropDbMsg);
H
hzcheng 已提交
1043

S
slguan 已提交
1044 1045
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1046
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1047 1048
  }

1049
  SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
1050

1051
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1052
  tstrncpy(pDropDbMsg->db, pTableMetaInfo->name, sizeof(pDropDbMsg->db));
1053
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1054

S
slguan 已提交
1055
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
1056
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1057 1058
}

1059 1060
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1061
  pCmd->payloadLen = sizeof(SCMDropTableMsg);
H
hzcheng 已提交
1062

S
slguan 已提交
1063 1064
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1065
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1066
  }
H
hzcheng 已提交
1067

1068
  SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
1069
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1070
  strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
S
slguan 已提交
1071
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1072

S
slguan 已提交
1073
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
1074
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1075 1076
}

1077
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1078
  SSqlCmd *pCmd = &pSql->cmd;
1079
  pCmd->payloadLen = sizeof(SCMDropDnodeMsg);
S
slguan 已提交
1080 1081
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1082
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1083
  }
H
hzcheng 已提交
1084

1085
  SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
1086
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1087
  tstrncpy(pDrop->ep, pTableMetaInfo->name, sizeof(pDrop->ep));
S
slguan 已提交
1088
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
H
hzcheng 已提交
1089

1090
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1091 1092
}

S
[TD-16]  
slguan 已提交
1093
int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1094
  SSqlCmd *pCmd = &pSql->cmd;
1095
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
S
slguan 已提交
1096
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
H
hzcheng 已提交
1097

S
slguan 已提交
1098 1099
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1100
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1101
  }
H
hzcheng 已提交
1102

1103
  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
1104
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1105
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
H
hzcheng 已提交
1106

1107
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1108 1109
}

S
[TD-16]  
slguan 已提交
1110 1111 1112 1113 1114 1115 1116
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
  pCmd->payloadLen = sizeof(SCMDropUserMsg);
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_ACCT;

  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1117
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
[TD-16]  
slguan 已提交
1118 1119 1120 1121
  }

  SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1122
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
S
[TD-16]  
slguan 已提交
1123 1124 1125 1126

  return TSDB_CODE_SUCCESS;
}

1127 1128
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1129
  pCmd->payloadLen = sizeof(SCMUseDbMsg);
H
hzcheng 已提交
1130

S
slguan 已提交
1131 1132
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1133
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1134
  }
1135

1136
  SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload;
1137
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1138
  strcpy(pUseDbMsg->db, pTableMetaInfo->name);
S
slguan 已提交
1139
  pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
H
hzcheng 已提交
1140

1141
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1142 1143
}

1144
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1145
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1146
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1147
  pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
1148
  pCmd->payloadLen = sizeof(SCMShowMsg) + 100;
H
hzcheng 已提交
1149

S
slguan 已提交
1150 1151
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1152
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1153
  }
H
hzcheng 已提交
1154

1155
  SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload;
S
slguan 已提交
1156

1157
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1158
  size_t nameLen = strlen(pTableMetaInfo->name);
S
slguan 已提交
1159
  if (nameLen > 0) {
B
Bomin Zhang 已提交
1160
    tstrncpy(pShowMsg->db, pTableMetaInfo->name, sizeof(pShowMsg->db));  // prefix is set here
H
hzcheng 已提交
1161
  } else {
B
Bomin Zhang 已提交
1162
    tstrncpy(pShowMsg->db, pObj->db, sizeof(pShowMsg->db));
H
hzcheng 已提交
1163 1164
  }

1165 1166 1167 1168
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
H
Haojun Liao 已提交
1169
    SStrToken *pPattern = &pShowInfo->pattern;
1170 1171 1172 1173 1174
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
H
Haojun Liao 已提交
1175
    SStrToken *pEpAddr = &pShowInfo->prefix;
dengyihao's avatar
dengyihao 已提交
1176
    assert(pEpAddr->n > 0 && pEpAddr->type > 0);
H
hzcheng 已提交
1177

dengyihao's avatar
dengyihao 已提交
1178 1179
    strncpy(pShowMsg->payload, pEpAddr->z, pEpAddr->n);
    pShowMsg->payloadLen = htons(pEpAddr->n);
1180 1181
  }

1182
  pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
1183
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1184 1185
}

1186
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1187
  SSqlCmd *pCmd = &pSql->cmd;
1188
  pCmd->payloadLen = sizeof(SCMKillQueryMsg);
H
hzcheng 已提交
1189

1190 1191
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
S
slguan 已提交
1192
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
1193 1194
      break;
    case TSDB_SQL_KILL_CONNECTION:
S
slguan 已提交
1195
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
1196 1197
      break;
    case TSDB_SQL_KILL_STREAM:
S
slguan 已提交
1198
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
1199 1200 1201
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1202 1203
}

1204
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1205 1206
  SSqlCmd *pCmd = &(pSql->cmd);

guanshengliang's avatar
guanshengliang 已提交
1207
  int32_t size = minMsgSize() + sizeof(SCMCreateTableMsg);
H
hzcheng 已提交
1208

1209
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1210
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
1211 1212
    size += sizeof(STagData);
  } else {
S
slguan 已提交
1213
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1214
  }
1215

1216 1217 1218
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1219 1220 1221 1222

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1223
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1224
  int              msgLen = 0;
S
slguan 已提交
1225
  SSchema *        pSchema;
H
hzcheng 已提交
1226
  int              size = 0;
1227 1228 1229
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1230
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1231 1232

  // Reallocate the payload size
1233
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1234 1235
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1236
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1237
  }
H
hzcheng 已提交
1238 1239


1240
  SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
H
hjxilinx 已提交
1241
  strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
1242 1243

  // use dbinfo from table id without modifying current db info
H
Haojun Liao 已提交
1244
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pCreateTableMsg->db);
H
hzcheng 已提交
1245

1246 1247 1248
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;
H
hzcheng 已提交
1249 1250 1251 1252
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
S
slguan 已提交
1253
  char *pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
1254

1255 1256
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
1257 1258 1259 1260 1261 1262 1263
    STagData* pTag = &pInfo->pCreateTableInfo->usingInfo.tagdata;
    *(int32_t*)pMsg = htonl(pTag->dataLen);
    pMsg += sizeof(int32_t);
    memcpy(pMsg, pTag->name, sizeof(pTag->name));
    pMsg += sizeof(pTag->name);
    memcpy(pMsg, pTag->data, pTag->dataLen);
    pMsg += pTag->dataLen;
1264
  } else {  // create (super) table
1265
    pSchema = (SSchema *)pCreateTableMsg->schema;
1266

H
hzcheng 已提交
1267
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
H
hjxilinx 已提交
1268
      TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hzcheng 已提交
1269 1270 1271 1272

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
1273

H
hzcheng 已提交
1274 1275 1276 1277
      pSchema++;
    }

    pMsg = (char *)pSchema;
1278 1279
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
1280

1281 1282 1283
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
1284 1285 1286
    }
  }

H
hjxilinx 已提交
1287
  tscFieldInfoClear(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
1288

S
TD-1057  
Shengliang Guan 已提交
1289
  msgLen = (int32_t)(pMsg - (char*)pCreateTableMsg);
S
slguan 已提交
1290
  pCreateTableMsg->contLen = htonl(msgLen);
H
hzcheng 已提交
1291
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1292
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
H
hzcheng 已提交
1293 1294

  assert(msgLen + minMsgSize() <= size);
1295
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1296 1297 1298
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
1299
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
guanshengliang's avatar
guanshengliang 已提交
1300
  return minMsgSize() + sizeof(SCMAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
H
hzcheng 已提交
1301 1302 1303
         TSDB_EXTRA_PAYLOAD_SIZE;
}

1304
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1305 1306
  char *pMsg;
  int   msgLen = 0;
H
hzcheng 已提交
1307

1308
  SSqlCmd    *pCmd = &pSql->cmd;
1309 1310
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
1311
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1312 1313 1314
  
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;
  int size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
1315 1316
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
H
Haojun Liao 已提交
1317
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1318
  }
1319 1320
  
  SCMAlterTableMsg *pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
H
Haojun Liao 已提交
1321
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
1322

H
hjxilinx 已提交
1323
  strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
1324
  pAlterTableMsg->type = htons(pAlterInfo->type);
1325

1326
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
S
slguan 已提交
1327
  SSchema *pSchema = pAlterTableMsg->schema;
1328
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
H
hjxilinx 已提交
1329
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
1330
  
H
hzcheng 已提交
1331 1332 1333 1334 1335 1336 1337
    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;
1338 1339 1340
  pAlterTableMsg->tagValLen = htonl(pAlterInfo->tagData.dataLen);
  memcpy(pMsg, pAlterInfo->tagData.data, pAlterInfo->tagData.dataLen);
  pMsg += pAlterInfo->tagData.dataLen;
H
hzcheng 已提交
1341

S
TD-1057  
Shengliang Guan 已提交
1342
  msgLen = (int32_t)(pMsg - (char*)pAlterTableMsg);
1343

H
hzcheng 已提交
1344
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1345
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
H
hzcheng 已提交
1346 1347

  assert(msgLen + minMsgSize() <= size);
1348

1349
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1350 1351
}

1352 1353 1354 1355
int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
  SSqlCmd* pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL;
  
1356
  SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
1357
  pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
1358

1359 1360
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1361

dengyihao's avatar
dengyihao 已提交
1362
  tscDumpEpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->epSet);
1363

1364 1365 1366
  return TSDB_CODE_SUCCESS;
}

1367
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1368
  SSqlCmd *pCmd = &pSql->cmd;
1369
  pCmd->payloadLen = sizeof(SCMAlterDbMsg);
S
slguan 已提交
1370
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
H
hzcheng 已提交
1371

1372
  SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
1373
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1374
  tstrncpy(pAlterDbMsg->db, pTableMetaInfo->name, sizeof(pAlterDbMsg->db));
H
hzcheng 已提交
1375

1376
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1377 1378
}

1379
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1380
  SSqlCmd *pCmd = &pSql->cmd;
J
jtao1735 已提交
1381
  pCmd->msgType = TSDB_MSG_TYPE_CM_RETRIEVE;
S
slguan 已提交
1382
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
1383

S
slguan 已提交
1384 1385
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1386
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
1387
  }
S
slguan 已提交
1388

S
slguan 已提交
1389 1390 1391 1392
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
1393

1394
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1395 1396
}

1397
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
1398
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
H
hzcheng 已提交
1399 1400 1401
    return pRes->code;
  }

H
hjxilinx 已提交
1402
  for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
H
hjxilinx 已提交
1403
    int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
1404
    pRes->tsrow[i] = (unsigned char*)((char*) pRes->data + offset * pRes->numOfRows);
H
hzcheng 已提交
1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418
  }

  return 0;
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
1419

1420
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1421

H
hzcheng 已提交
1422 1423 1424 1425 1426 1427
  pRes->code = TSDB_CODE_SUCCESS;
  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

1428
    tscSetResultPointer(pQueryInfo, pRes);
H
hzcheng 已提交
1429
  } else {
S
slguan 已提交
1430
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
1446
  SSqlCmd *       pCmd = &pSql->cmd;
1447
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
1448

H
hjxilinx 已提交
1449
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1450 1451
  
  int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
H
hzcheng 已提交
1452 1453 1454
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
Haojun Liao 已提交
1455 1456 1457
int tscProcessLocalRetrieveRsp(SSqlObj *pSql) {
  int32_t numOfRes = 1;
  pSql->res.completed = true;
H
hzcheng 已提交
1458 1459 1460
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
hjxilinx 已提交
1461
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
H
hzcheng 已提交
1462
  SSqlRes *pRes = &pSql->res;
H
Haojun Liao 已提交
1463
  SSqlCmd* pCmd = &pSql->cmd;
H
hzcheng 已提交
1464

H
Haojun Liao 已提交
1465 1466 1467 1468 1469 1470
  int32_t code = pRes->code;
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return code;
  }

1471
  pRes->code = tscDoLocalMerge(pSql);
H
hjxilinx 已提交
1472
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1473 1474

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
1475
    tscCreateResPointerInfo(pRes, pQueryInfo);
H
hzcheng 已提交
1476 1477 1478
  }

  pRes->row = 0;
1479
  pRes->completed = (pRes->numOfRows == 0);
H
hzcheng 已提交
1480

H
Haojun Liao 已提交
1481
  code = pRes->code;
H
hjxilinx 已提交
1482 1483 1484 1485
  if (pRes->code == TSDB_CODE_SUCCESS) {
    (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
  } else {
    tscQueueAsyncRes(pSql);
H
hzcheng 已提交
1486 1487 1488 1489 1490
  }

  return code;
}

S
slguan 已提交
1491
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
1492

1493
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1494
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1495
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1496
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
1497
  pCmd->payloadLen = sizeof(SCMConnectMsg);
H
hzcheng 已提交
1498

S
slguan 已提交
1499 1500
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1501
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1502 1503
  }

1504
  SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
H
hzcheng 已提交
1505

H
Haojun Liao 已提交
1506
  // TODO refactor full_name
H
hzcheng 已提交
1507 1508 1509
  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
B
Bomin Zhang 已提交
1510 1511 1512
  tstrncpy(pConnect->db, db, sizeof(pConnect->db));
  tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion));
  tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion));
H
hzcheng 已提交
1513

H
Haojun Liao 已提交
1514 1515 1516
  pConnect->pid = htonl(taosGetPId());
  taosGetCurrentAPPName(pConnect->appName, NULL);

1517
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1518 1519
}

H
hjxilinx 已提交
1520
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1521 1522 1523
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
1524
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1525

B
Bomin Zhang 已提交
1526
  SCMTableInfoMsg* pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
H
hjxilinx 已提交
1527
  strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
H
hjxilinx 已提交
1528
  pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
S
slguan 已提交
1529

B
Bomin Zhang 已提交
1530
  char* pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
H
hzcheng 已提交
1531

B
Bomin Zhang 已提交
1532 1533 1534 1535 1536 1537 1538
  size_t len = htonl(pCmd->tagData.dataLen);
  if (pSql->cmd.autoCreated) {
    if (len > 0) {
      len += sizeof(pCmd->tagData.name) + sizeof(pCmd->tagData.dataLen);
      memcpy(pInfoMsg->tags, &pCmd->tagData, len);
      pMsg += len;
    }
H
hzcheng 已提交
1539 1540
  }

S
TD-1057  
Shengliang Guan 已提交
1541
  pCmd->payloadLen = (int32_t)(pMsg - (char*)pInfoMsg);
S
slguan 已提交
1542
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
H
hzcheng 已提交
1543

1544
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1545 1546
}

S
slguan 已提交
1547
/**
1548
 *  multi table meta req pkg format:
1549
 *  | SMgmtHead | SCMMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
1550 1551
 *      no used         4B
 **/
1552
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
guanshengliang's avatar
guanshengliang 已提交
1553
#if 0
S
slguan 已提交
1554 1555 1556 1557 1558
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
H
Haojun Liao 已提交
1559
    if ((tmpData = calloc(1, pCmd->payloadLen + 1)) == NULL) return -1;
S
slguan 已提交
1560 1561 1562 1563 1564
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
H
Haojun Liao 已提交
1565
  memset(pMgmt->db, 0, TSDB_TABLE_FNAME_LEN);  // server don't need the db
S
slguan 已提交
1566

1567
  SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
S
slguan 已提交
1568
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
1569 1570

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
1571
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
1572 1573
  }

S
Shengliang Guan 已提交
1574
  taosTFree(tmpData);
S
slguan 已提交
1575

1576
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1577
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
S
slguan 已提交
1578 1579 1580

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

1581
  tscDebug("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
1582 1583 1584
           pCmd->payloadLen);

  return pCmd->payloadLen;
guanshengliang's avatar
guanshengliang 已提交
1585 1586
#endif
  return 0;  
S
slguan 已提交
1587 1588
}

H
hjxilinx 已提交
1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605
//static UNUSED_FUNC int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
////  const int32_t defaultSize =
////      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
////  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
////
////  int32_t n = 0;
////  size_t size = taosArrayGetSize(pQueryInfo->tagCond.pCond);
////  for (int32_t i = 0; i < size; ++i) {
////    assert(0);
//////    n += strlen(pQueryInfo->tagCond.cond[i].cond);
////  }
////
////  int32_t tagLen = n * TSDB_NCHAR_SIZE;
////  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
////    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
////  }
////
H
Haojun Liao 已提交
1606
////  int32_t joinCondLen = (TSDB_TABLE_FNAME_LEN + sizeof(int16_t)) * 2;
H
hjxilinx 已提交
1607 1608 1609 1610 1611 1612 1613 1614
////  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
////
////  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndex);
////
////  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
////
////  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
//}
H
hzcheng 已提交
1615

H
hjxilinx 已提交
1616 1617
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hjxilinx 已提交
1618 1619 1620 1621 1622 1623 1624 1625 1626 1627
  
  char* pMsg = pCmd->payload;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  
  SCMSTableVgroupMsg *pStableVgroupMsg = (SCMSTableVgroupMsg *) pMsg;
  pStableVgroupMsg->numOfTables = htonl(pQueryInfo->numOfTables);
  pMsg += sizeof(SCMSTableVgroupMsg);
  
  for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
1628 1629 1630
    size_t size = sizeof(pTableMetaInfo->name);
    tstrncpy(pMsg, pTableMetaInfo->name, size);
    pMsg += size;
H
hjxilinx 已提交
1631
  }
H
hjxilinx 已提交
1632 1633

  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
S
TD-1057  
Shengliang Guan 已提交
1634
  pCmd->payloadLen = (int32_t)(pMsg - pCmd->payload);
H
hjxilinx 已提交
1635

1636
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1637 1638
}

1639 1640
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1641 1642
  STscObj *pObj = pSql->pTscObj;

1643
  pthread_mutex_lock(&pObj->mutex);
H
hzcheng 已提交
1644

S
Shengliang Guan 已提交
1645
  int32_t numOfQueries = 2;
H
hzcheng 已提交
1646 1647 1648
  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
    tpSql = tpSql->next;
1649
    numOfQueries++;
H
hzcheng 已提交
1650 1651
  }

S
Shengliang Guan 已提交
1652
  int32_t numOfStreams = 2;
H
hzcheng 已提交
1653 1654 1655
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
    pStream = pStream->next;
1656
    numOfStreams++;
H
hzcheng 已提交
1657 1658
  }

1659
  int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100;
S
slguan 已提交
1660
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
1661
    pthread_mutex_unlock(&pObj->mutex);
H
Haojun Liao 已提交
1662
    tscError("%p failed to create heartbeat msg", pSql);
H
Haojun Liao 已提交
1663
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1664
  }
H
hzcheng 已提交
1665

H
Haojun Liao 已提交
1666
  // TODO the expired hb and client can not be identified by server till now.
1667
  SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
H
Haojun Liao 已提交
1668 1669
  tstrncpy(pHeartbeat->clientVer, version, tListLen(pHeartbeat->clientVer));

1670 1671
  pHeartbeat->numOfQueries = numOfQueries;
  pHeartbeat->numOfStreams = numOfStreams;
H
Haojun Liao 已提交
1672 1673 1674 1675

  pHeartbeat->pid = htonl(taosGetPId());
  taosGetCurrentAPPName(pHeartbeat->appName, NULL);

1676
  int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj);
H
hzcheng 已提交
1677 1678 1679 1680

  pthread_mutex_unlock(&pObj->mutex);

  pCmd->payloadLen = msgLen;
S
slguan 已提交
1681
  pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
H
hzcheng 已提交
1682

1683
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1684 1685
}

1686 1687
int tscProcessTableMetaRsp(SSqlObj *pSql) {
  STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
H
hzcheng 已提交
1688

H
Haojun Liao 已提交
1689
  pMetaMsg->tid = htonl(pMetaMsg->tid);
1690
  pMetaMsg->sversion = htons(pMetaMsg->sversion);
1691
  pMetaMsg->tversion = htons(pMetaMsg->tversion);
H
hjxilinx 已提交
1692 1693
  pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
  
1694 1695
  pMetaMsg->uid = htobe64(pMetaMsg->uid);
  pMetaMsg->contLen = htons(pMetaMsg->contLen);
1696
  pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1697

H
Haojun Liao 已提交
1698
  if ((pMetaMsg->tableType != TSDB_SUPER_TABLE) &&
H
Haojun Liao 已提交
1699
      (pMetaMsg->tid <= 0 || pMetaMsg->vgroup.vgId < 2 || pMetaMsg->vgroup.numOfEps <= 0)) {
H
Haojun Liao 已提交
1700
    tscError("invalid value in table numOfEps:%d, vgId:%d tid:%d, name:%s", pMetaMsg->vgroup.numOfEps, pMetaMsg->vgroup.vgId,
H
Haojun Liao 已提交
1701
             pMetaMsg->tid, pMetaMsg->tableId);
1702
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1703 1704
  }

B
Bomin Zhang 已提交
1705
  if (pMetaMsg->numOfTags > TSDB_MAX_TAGS) {
1706
    tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
1707
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1708 1709
  }

1710 1711
  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
1712
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1713 1714
  }

1715 1716
  for (int i = 0; i < pMetaMsg->vgroup.numOfEps; ++i) {
    pMetaMsg->vgroup.epAddr[i].port = htons(pMetaMsg->vgroup.epAddr[i].port);
H
hzcheng 已提交
1717 1718
  }

1719
  SSchema* pSchema = pMetaMsg->schema;
H
hzcheng 已提交
1720

1721
  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
H
hzcheng 已提交
1722 1723 1724
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);
1725 1726 1727 1728 1729

    if (pSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
      assert(i == 0);
    }

H
hjxilinx 已提交
1730
    assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
1731 1732 1733
    pSchema++;
  }

1734 1735
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
1736
  
H
hzcheng 已提交
1737
  // todo add one more function: taosAddDataIfNotExists();
1738
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1739
  assert(pTableMetaInfo->pTableMeta == NULL);
H
hzcheng 已提交
1740

H
Haojun Liao 已提交
1741
  pTableMetaInfo->pTableMeta = (STableMeta *) taosCachePut(tscMetaCache, pTableMetaInfo->name,
1742
      strlen(pTableMetaInfo->name), pTableMeta, size, tsTableMetaKeepTimer * 1000);
H
hjxilinx 已提交
1743
  
1744
  // todo handle out of memory case
1745
  if (pTableMetaInfo->pTableMeta == NULL) {
B
Bomin Zhang 已提交
1746
    free(pTableMeta);
1747
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1748
  }
H
hzcheng 已提交
1749

1750
  tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->id.uid, pTableMeta->id.tid, pTableMetaInfo->name);
1751
  free(pTableMeta);
1752
  
H
hjxilinx 已提交
1753
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1754 1755
}

S
slguan 已提交
1756
/**
1757
 *  multi table meta rsp pkg format:
1758
 *  | STaosRsp | ieType | SCMMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
1759 1760 1761
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
1762
#if 0
S
slguan 已提交
1763 1764 1765 1766 1767
  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
1768
    pSql->res.code = TSDB_CODE_TSC_INVALID_IE;
S
slguan 已提交
1769
    pSql->res.numOfTotal = 0;
1770
    return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1771 1772 1773 1774
  }

  rsp++;

1775
  SCMMultiTableInfoMsg *pInfo = (SCMMultiTableInfoMsg *)rsp;
S
slguan 已提交
1776
  totalNum = htonl(pInfo->numOfTables);
1777
  rsp += sizeof(SCMMultiTableInfoMsg);
S
slguan 已提交
1778 1779

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
1780
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
1781
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
1782 1783 1784

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
H
hjxilinx 已提交
1785
    pMeta->vgId = htonl(pMeta->vgId);
S
slguan 已提交
1786 1787
    pMeta->uid = htobe64(pMeta->uid);

H
hjxilinx 已提交
1788 1789
    if (pMeta->sid <= 0 || pMeta->vgId < 0) {
      tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
1790
      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
S
slguan 已提交
1791
      pSql->res.numOfTotal = i;
1792
      return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1793 1794
    }

H
hjxilinx 已提交
1795 1796 1797 1798
    //    pMeta->numOfColumns = htons(pMeta->numOfColumns);
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid tag value count:%d", pMeta->numOfTags);
1799
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1800
    //      pSql->res.numOfTotal = i;
1801
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1802 1803 1804 1805
    //    }
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid numOfTags:%d", pMeta->numOfTags);
1806
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1807
    //      pSql->res.numOfTotal = i;
1808
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1809 1810 1811 1812
    //    }
    //
    //    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
    //      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
1813
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1814
    //      pSql->res.numOfTotal = i;
1815
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1816 1817
    //    }
    //
H
hjxilinx 已提交
1818
    //    for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) {
H
hjxilinx 已提交
1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852
    //      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    //    }
    //
    //    pMeta->rowSize = 0;
    //    rsp += sizeof(SMultiTableMeta);
    //    pSchema = (SSchema *)rsp;
    //
    //    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    //    for (int j = 0; j < numOfTotalCols; ++j) {
    //      pSchema->bytes = htons(pSchema->bytes);
    //      pSchema->colId = htons(pSchema->colId);
    //
    //      // ignore the tags length
    //      if (j < pMeta->numOfColumns) {
    //        pMeta->rowSize += pSchema->bytes;
    //      }
    //      pSchema++;
    //    }
    //
    //    rsp += numOfTotalCols * sizeof(SSchema);
    //
    //    int32_t  tagLen = 0;
    //    SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
    //
    //    if (pMeta->tableType == TSDB_CHILD_TABLE) {
    //      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
    //        tagLen += pTagsSchema[j].bytes;
    //      }
    //    }
    //
    //    rsp += tagLen;
    //    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
    //
    //    pMeta->index = 0;
H
Haojun Liao 已提交
1853
    //    (void)taosCachePut(tscMetaCache, pMeta->tableId, (char *)pMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1854
    //  }
S
slguan 已提交
1855
  }
H
hjxilinx 已提交
1856
  
S
slguan 已提交
1857 1858
  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
1859
  tscDebug("%p load multi-metermeta resp from complete num:%d", pSql, pSql->res.numOfTotal);
1860 1861
#endif
  
S
slguan 已提交
1862 1863 1864
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
1865
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
1866
  SSqlRes* pRes = &pSql->res;
H
hjxilinx 已提交
1867
  
H
hjxilinx 已提交
1868
  // NOTE: the order of several table must be preserved.
1869
  SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pRes->pRsp;
H
hjxilinx 已提交
1870 1871
  pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables);
  char* pMsg = pRes->pRsp + sizeof(SCMSTableVgroupRspMsg);
H
hjxilinx 已提交
1872
  
1873 1874 1875
  // master sqlObj locates in param
  SSqlObj* parent = pSql->param;
  assert(parent != NULL);
H
hjxilinx 已提交
1876
  
1877
  SSqlCmd* pCmd = &parent->cmd;
H
hjxilinx 已提交
1878 1879 1880
  for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) {
    STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);

H
Haojun Liao 已提交
1881 1882 1883
    SVgroupsMsg *  pVgroupMsg = (SVgroupsMsg *) pMsg;
    pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups);

H
Haojun Liao 已提交
1884 1885 1886 1887
    size_t size = sizeof(SCMVgroupMsg) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsMsg);

    size_t vgroupsz = sizeof(SCMVgroupInfo) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsInfo);
    pInfo->vgroupList = calloc(1, vgroupsz);
H
hjxilinx 已提交
1888 1889
    assert(pInfo->vgroupList != NULL);

H
Haojun Liao 已提交
1890
    pInfo->vgroupList->numOfVgroups = pVgroupMsg->numOfVgroups;
H
hjxilinx 已提交
1891
    for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
dengyihao's avatar
dengyihao 已提交
1892
      //just init, no need to lock
H
hjxilinx 已提交
1893
      SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
H
Haojun Liao 已提交
1894

H
Haojun Liao 已提交
1895 1896 1897
      SCMVgroupMsg *vmsg = &pVgroupMsg->vgroups[j];
      pVgroups->vgId = htonl(vmsg->vgId);
      pVgroups->numOfEps = vmsg->numOfEps;
H
Haojun Liao 已提交
1898

H
Haojun Liao 已提交
1899
      assert(pVgroups->numOfEps >= 1 && pVgroups->vgId >= 1);
H
hjxilinx 已提交
1900

1901
      for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
H
Haojun Liao 已提交
1902 1903
        pVgroups->epAddr[k].port = htons(vmsg->epAddr[k].port);
        pVgroups->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, tListLen(vmsg->epAddr[k].fqdn));
H
hjxilinx 已提交
1904
      }
1905
    }
1906 1907

    pMsg += size;
H
hjxilinx 已提交
1908 1909
  }
  
S
slguan 已提交
1910
  return pSql->res.code;
H
hzcheng 已提交
1911 1912 1913 1914 1915 1916
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
H
hjxilinx 已提交
1917
  STableMetaMsg * pMetaMsg;
1918
  SCMShowRsp *pShow;
S
slguan 已提交
1919
  SSchema *    pSchema;
H
hzcheng 已提交
1920 1921
  char         key[20];

1922 1923 1924
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
1925
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
1926

H
hjxilinx 已提交
1927
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1928

1929
  pShow = (SCMShowRsp *)pRes->pRsp;
S
slguan 已提交
1930
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
1931 1932
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
1933
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
1934
  pMetaMsg = &(pShow->tableMeta);
H
hzcheng 已提交
1935

H
hjxilinx 已提交
1936
  pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1937

H
hjxilinx 已提交
1938
  pSchema = pMetaMsg->schema;
H
Haojun Liao 已提交
1939
  pMetaMsg->tid = ntohs(pMetaMsg->tid);
H
hjxilinx 已提交
1940
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
hzcheng 已提交
1941 1942 1943 1944
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

1945
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
1946 1947
  strcpy(key + 1, "showlist");

H
Haojun Liao 已提交
1948 1949 1950 1951
  if (pTableMetaInfo->pTableMeta != NULL) {
    taosCacheRelease(tscMetaCache, (void *)&(pTableMetaInfo->pTableMeta), false);
  }

H
hjxilinx 已提交
1952 1953 1954
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
  
H
Haojun Liao 已提交
1955
  pTableMetaInfo->pTableMeta = taosCachePut(tscMetaCache, key, strlen(key), (char *)pTableMeta, size,
1956
      tsTableMetaKeepTimer * 1000);
H
hjxilinx 已提交
1957
  SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
1958

1959 1960 1961 1962
  if (pQueryInfo->colList == NULL) {
    pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
  }
  
H
hjxilinx 已提交
1963 1964
  SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
  
S
slguan 已提交
1965
  SColumnIndex index = {0};
H
hjxilinx 已提交
1966 1967 1968
  pSchema = pMetaMsg->schema;
  
  for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) {
S
slguan 已提交
1969
    index.columnIndex = i;
1970 1971
    tscColumnListInsert(pQueryInfo->colList, &index);
    
H
hjxilinx 已提交
1972
    TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes);
H
Haojun Liao 已提交
1973
    SInternalField* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
H
hjxilinx 已提交
1974
    
H
hjxilinx 已提交
1975
    pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
1976
                     pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false);
H
hzcheng 已提交
1977
  }
H
hjxilinx 已提交
1978 1979
  
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
1980
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
1981
  
S
Shengliang Guan 已提交
1982
  taosTFree(pTableMeta);
H
hzcheng 已提交
1983 1984 1985
  return 0;
}

1986
// TODO multithread problem
1987 1988 1989 1990 1991 1992 1993 1994 1995 1996
static void createHBObj(STscObj* pObj) {
  if (pObj->pHb != NULL) {
    return;
  }

  SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
  if (NULL == pSql) return;

  pSql->fp = tscProcessHeartBeatRsp;

1997 1998 1999 2000 2001 2002
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetailSafely(&pSql->cmd, 0);
  if (pQueryInfo == NULL) {
    pSql->res.code = terrno;
    return;
  }

2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014
  pQueryInfo->command = TSDB_SQL_HB;

  pSql->cmd.command = pQueryInfo->command;
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
    taosTFree(pSql);
    return;
  }

  pSql->param = pObj;
  pSql->pTscObj = pObj;
  pSql->signature = pSql;

2015 2016
  registerSqlObj(pSql);
  tscDebug("%p HB is allocated, pObj:%p", pSql, pObj);
2017

2018
  pObj->pHb = pSql;
2019 2020
}

H
hzcheng 已提交
2021 2022 2023 2024
int tscProcessConnectRsp(SSqlObj *pSql) {
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

H
Haojun Liao 已提交
2025 2026
  char temp[TSDB_TABLE_FNAME_LEN * 2] = {0};

2027
  SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
B
Bomin Zhang 已提交
2028
  tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId));  // copy acctId from response
2029 2030
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

B
Bomin Zhang 已提交
2031 2032
  assert(len <= sizeof(pObj->db));
  tstrncpy(pObj->db, temp, sizeof(pObj->db));
H
hjxilinx 已提交
2033
  
dengyihao's avatar
dengyihao 已提交
2034 2035 2036
  if (pConnect->epSet.numOfEps > 0) {
    tscEpSetHtons(&pConnect->epSet);
    tscUpdateMgmtEpSet(&pConnect->epSet);
dengyihao's avatar
dengyihao 已提交
2037
  } 
H
hzcheng 已提交
2038

S
slguan 已提交
2039
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
2040 2041
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
S
Shengliang Guan 已提交
2042
  pObj->connId = htonl(pConnect->connId);
2043 2044

  createHBObj(pObj);
H
Haojun Liao 已提交
2045 2046

  //launch a timer to send heartbeat to maintain the connection and send status to mnode
2047
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
H
hzcheng 已提交
2048 2049 2050 2051 2052

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
2053
  STscObj *       pObj = pSql->pTscObj;
2054
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2055

B
Bomin Zhang 已提交
2056
  tstrncpy(pObj->db, pTableMetaInfo->name, sizeof(pObj->db));
H
hzcheng 已提交
2057 2058 2059
  return 0;
}

Y
TD-1039  
yihaoDeng 已提交
2060 2061
int tscProcessDropDbRsp(SSqlObj *pSql) {
  pSql->pTscObj->db[0] = 0;
H
Haojun Liao 已提交
2062
  taosCacheEmpty(tscMetaCache);
H
hzcheng 已提交
2063 2064 2065 2066
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
2067
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2068

H
Haojun Liao 已提交
2069
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
Haojun Liao 已提交
2070
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2071 2072 2073 2074 2075 2076 2077
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
2078 2079
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
2080
   */
2081
  tscDebug("%p force release table meta after drop table:%s", pSql, pTableMetaInfo->name);
H
Haojun Liao 已提交
2082
  taosCacheRelease(tscMetaCache, (void **)&pTableMeta, true);
H
hzcheng 已提交
2083

H
hjxilinx 已提交
2084
  if (pTableMetaInfo->pTableMeta) {
H
Haojun Liao 已提交
2085
    taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2086 2087 2088 2089 2090 2091
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
2092
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2093

H
Haojun Liao 已提交
2094
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2095
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2096 2097 2098
    return 0;
  }

2099
  tscDebug("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
H
Haojun Liao 已提交
2100
  taosCacheRelease(tscMetaCache, (void **)&pTableMeta, true);
H
hzcheng 已提交
2101

H
hjxilinx 已提交
2102
  if (pTableMetaInfo->pTableMeta) {
weixin_48148422's avatar
weixin_48148422 已提交
2103
    bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
H
Haojun Liao 已提交
2104
    taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2105

2106
    if (isSuperTable) {  // if it is a super table, reset whole query cache
2107
      tscDebug("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
H
Haojun Liao 已提交
2108
      taosCacheEmpty(tscMetaCache);
H
hzcheng 已提交
2109 2110 2111 2112 2113 2114 2115 2116 2117 2118
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}
2119 2120 2121
int tscProcessShowCreateRsp(SSqlObj *pSql) {
  return tscLocalResultCommonBuilder(pSql, 1);
}
H
hzcheng 已提交
2122 2123 2124 2125

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

2126
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
2127 2128 2129
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
2130
  pRes->data = NULL;
S
slguan 已提交
2131
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2132 2133 2134
  return 0;
}

H
hjxilinx 已提交
2135
int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
S
slguan 已提交
2136 2137 2138
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
2139
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
2140 2141 2142 2143
  if (pRetrieve == NULL) {
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pRes->code;
  }
H
hzcheng 已提交
2144 2145 2146

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
2147 2148
  pRes->offset    = htobe64(pRetrieve->offset);
  pRes->useconds  = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
2149
  pRes->completed = (pRetrieve->completed == 1);
2150
  pRes->data      = pRetrieve->data;
H
hjxilinx 已提交
2151
  
2152
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2153 2154 2155 2156
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
    return pRes->code;
  }
  
weixin_48148422's avatar
weixin_48148422 已提交
2157
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
2158
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2159
    
H
hjxilinx 已提交
2160
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
H
hjxilinx 已提交
2161 2162
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
2163 2164
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
2165
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
2166
    p += sizeof(int32_t);
S
slguan 已提交
2167
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
2168 2169
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
weixin_48148422's avatar
weixin_48148422 已提交
2170
      p += sizeof(int32_t); // skip tid
weixin_48148422's avatar
weixin_48148422 已提交
2171 2172
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
2173
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
2174
    }
2175 2176
  }

H
hzcheng 已提交
2177
  pRes->row = 0;
2178
  tscDebug("%p numOfRows:%" PRId64 ", offset:%" PRId64 ", complete:%d", pSql, pRes->numOfRows, pRes->offset, pRes->completed);
H
hzcheng 已提交
2179 2180 2181 2182

  return 0;
}

H
hjxilinx 已提交
2183
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
H
hzcheng 已提交
2184

2185
static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
S
slguan 已提交
2186 2187
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
2188
    tscError("%p malloc failed for new sqlobj to get table meta", pSql);
2189
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2190
  }
2191

H
hzcheng 已提交
2192 2193 2194
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
2195

2196
  registerSqlObj(pNew);
H
Haojun Liao 已提交
2197

2198
  tscAddSubqueryInfo(&pNew->cmd);
2199

2200
  SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0);
2201

H
hjxilinx 已提交
2202
  pNew->cmd.autoCreated = pSql->cmd.autoCreated;  // create table if not exists
B
Bomin Zhang 已提交
2203
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) {
2204
    tscError("%p malloc failed for payload to get table meta", pSql);
S
slguan 已提交
2205
    free(pNew);
2206

2207
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2208 2209
  }

H
hjxilinx 已提交
2210
  STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
2211
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
2212

B
Bomin Zhang 已提交
2213
  tstrncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, sizeof(pNewMeterMetaInfo->name));
B
Bomin Zhang 已提交
2214
  memcpy(&pNew->cmd.tagData, &pSql->cmd.tagData, sizeof(pSql->cmd.tagData));
2215
  tscDebug("%p new pSqlObj:%p to get tableMeta, auto create:%d", pSql, pNew, pNew->cmd.autoCreated);
H
hzcheng 已提交
2216

H
hjxilinx 已提交
2217 2218
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
H
hzcheng 已提交
2219

H
hjxilinx 已提交
2220 2221
  int32_t code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2222
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;  // notify upper application that current process need to be terminated
H
hzcheng 已提交
2223 2224 2225 2226 2227
  }

  return code;
}

H
hjxilinx 已提交
2228
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
H
hjxilinx 已提交
2229
  assert(strlen(pTableMetaInfo->name) != 0);
S
slguan 已提交
2230

2231
  // If this STableMetaInfo owns a table meta, release it first
H
hjxilinx 已提交
2232
  if (pTableMetaInfo->pTableMeta != NULL) {
H
Haojun Liao 已提交
2233
    taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), false);
2234 2235
  }
  
H
Haojun Liao 已提交
2236
  pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2237
  if (pTableMetaInfo->pTableMeta != NULL) {
H
hjxilinx 已提交
2238
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
2239
    tscDebug("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
2240
             tinfo.numOfTags, pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2241 2242 2243

    return TSDB_CODE_SUCCESS;
  }
2244 2245
  
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2246 2247
}

H
hjxilinx 已提交
2248
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
H
hjxilinx 已提交
2249
  pSql->cmd.autoCreated = createIfNotExists;
H
hjxilinx 已提交
2250
  return tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
2251 2252 2253
}

/**
H
Haojun Liao 已提交
2254
 * retrieve table meta from mnode, and update the local table meta cache.
H
hzcheng 已提交
2255
 * @param pSql          sql object
B
Bomin Zhang 已提交
2256
 * @param tableIndex    table index
H
hzcheng 已提交
2257 2258
 * @return              status code
 */
B
Bomin Zhang 已提交
2259
int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
H
hzcheng 已提交
2260
  SSqlCmd *pCmd = &pSql->cmd;
2261 2262

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
B
Bomin Zhang 已提交
2263
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
H
hzcheng 已提交
2264

H
Haojun Liao 已提交
2265 2266
  STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
  if (pTableMetaInfo->pTableMeta) {
2267
    tscDebug("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
2268
             tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid, pTableMeta);
H
hzcheng 已提交
2269 2270
  }

H
Haojun Liao 已提交
2271
  taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
H
Haojun Liao 已提交
2272
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2273 2274
}

H
hjxilinx 已提交
2275
static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
2276
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2277
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2278
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2279 2280
    if (pTableMetaInfo->vgroupList == NULL) {
      return false;
S
slguan 已提交
2281 2282
    }
  }
H
hjxilinx 已提交
2283 2284 2285 2286
  
  // all super tables vgroupinfo are retrieved, no need to retrieve vgroup info anymore
  return true;
}
H
hzcheng 已提交
2287

H
hjxilinx 已提交
2288
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
2289
  int      code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hjxilinx 已提交
2290 2291 2292
  SSqlCmd *pCmd = &pSql->cmd;
  
  if (allVgroupInfoRetrieved(pCmd, clauseIndex)) {
H
hzcheng 已提交
2293 2294
    return TSDB_CODE_SUCCESS;
  }
H
hjxilinx 已提交
2295

S
slguan 已提交
2296
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
2297 2298 2299
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

H
hjxilinx 已提交
2300
  pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
H
Haojun Liao 已提交
2301 2302

  // TODO TEST IT
2303 2304
  SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0);
  if (pNewQueryInfo == NULL) {
B
Bomin Zhang 已提交
2305
    tscFreeSqlObj(pNew);
2306 2307
    return code;
  }
2308
  
H
hjxilinx 已提交
2309
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2310
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2311
    STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
H
Haojun Liao 已提交
2312
    STableMeta *pTableMeta = taosCacheAcquireByData(tscMetaCache, pMInfo->pTableMeta);
H
Haojun Liao 已提交
2313
    tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList, pMInfo->pVgroupTables);
S
slguan 已提交
2314 2315 2316 2317 2318 2319
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
2320

2321
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
2322
  registerSqlObj(pNew);
H
Haojun Liao 已提交
2323

2324
  tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
H
hzcheng 已提交
2325

2326 2327 2328 2329
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
  code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
2330
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2331 2332 2333 2334 2335
  }

  return code;
}

2336
void tscInitMsgsFp() {
S
slguan 已提交
2337 2338
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
J
jtao1735 已提交
2339
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;
H
hzcheng 已提交
2340 2341

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
2342
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
2343

2344 2345
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
2346 2347

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
S
[TD-16]  
slguan 已提交
2348
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserMsg;
H
hzcheng 已提交
2349 2350 2351
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
2352
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
2353 2354 2355
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
2356
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
2357
  tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
H
hzcheng 已提交
2358 2359 2360 2361
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
H
hjxilinx 已提交
2362
  tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
H
hjxilinx 已提交
2363
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
S
slguan 已提交
2364
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
2365 2366 2367 2368

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
2369 2370 2371
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
2372 2373

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
H
hjxilinx 已提交
2374
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
H
hzcheng 已提交
2375 2376 2377 2378 2379

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
2380
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
H
hjxilinx 已提交
2381
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
S
slguan 已提交
2382
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
2383 2384

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
H
hjxilinx 已提交
2385
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode;  // rsp handled by same function.
H
hzcheng 已提交
2386
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
2387

H
Haojun Liao 已提交
2388 2389 2390 2391 2392
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB]   = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION]  = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS]  = tscProcessLocalRetrieveRsp;
2393

H
hzcheng 已提交
2394 2395
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

H
hjxilinx 已提交
2396
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp;
H
hzcheng 已提交
2397 2398 2399 2400

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

2401 2402 2403 2404
  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp;
  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp;
  

H
hzcheng 已提交
2405 2406 2407 2408 2409 2410
  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;
}