tscServer.c 84.7 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17
#include "tcache.h"
H
Haojun Liao 已提交
18
#include "tcmdtype.h"
H
hzcheng 已提交
19
#include "trpc.h"
20 21
#include "tscLocalMerge.h"
#include "tscLog.h"
H
hzcheng 已提交
22 23 24 25 26
#include "tscProfile.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "ttimer.h"
dengyihao's avatar
dengyihao 已提交
27
#include "tlockfree.h"
H
hzcheng 已提交
28

29
SRpcCorEpSet  tscMgmtEpSet;
S
slguan 已提交
30

31 32
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
33 34 35
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
36

B
Bomin Zhang 已提交
37
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt);
38 39
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
40

S
slguan 已提交
41
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
Haojun Liao 已提交
42 43 44 45 46 47 48 49
static int32_t getWaitingTimeInterval(int32_t count) {
  int32_t initial = 100; // 100 ms by default
  if (count <= 1) {
    return 0;
  }

  return initial * (2<<(count - 2));
}
H
hzcheng 已提交
50

S
TD-1732  
Shengliang Guan 已提交
51
static void tscSetDnodeEpSet(SSqlObj* pSql, SVgroupInfo* pVgroupInfo) {
52 53
  assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);

54
  SRpcEpSet* pEpSet = &pSql->epSet;
H
Haojun Liao 已提交
55 56 57 58

  // Issue the query to one of the vnode among a vgroup randomly.
  // change the inUse property would not affect the isUse attribute of STableMeta
  pEpSet->inUse = rand() % pVgroupInfo->numOfEps;
59 60 61

  // apply the FQDN string length check here
  bool hasFqdn = false;
dengyihao's avatar
bugfix  
dengyihao 已提交
62

63 64
  pEpSet->numOfEps = pVgroupInfo->numOfEps;
  for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
H
Haojun Liao 已提交
65
    tstrncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(pEpSet->fqdn[i]));
66
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
67 68 69 70

    if (!hasFqdn) {
      hasFqdn = (strlen(pEpSet->fqdn[i]) > 0);
    }
71
  }
72 73

  assert(hasFqdn);
74
}
75

dengyihao's avatar
dengyihao 已提交
76 77 78 79
static void tscDumpMgmtEpSet(SRpcEpSet *epSet) {
  taosCorBeginRead(&tscMgmtEpSet.version);
  *epSet = tscMgmtEpSet.epSet;
  taosCorEndRead(&tscMgmtEpSet.version);
80
}  
dengyihao's avatar
dengyihao 已提交
81 82
static void tscEpSetHtons(SRpcEpSet *s) {
   for (int32_t i = 0; i < s->numOfEps; i++) {
dengyihao's avatar
fixbug  
dengyihao 已提交
83 84 85
      s->port[i] = htons(s->port[i]);    
   }
} 
dengyihao's avatar
dengyihao 已提交
86 87
bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
   if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
88 89
     return false;
   } 
dengyihao's avatar
dengyihao 已提交
90
   for (int32_t i = 0; i < s1->numOfEps; i++) {
91 92 93 94 95
     if (s1->port[i] != s2->port[i] 
        || strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0)
        return false;
   }
   return true;
S
slguan 已提交
96
}
dengyihao's avatar
dengyihao 已提交
97
void tscUpdateMgmtEpSet(SRpcEpSet *pEpSet) {
98
  // no need to update if equal
dengyihao's avatar
dengyihao 已提交
99 100 101
  taosCorBeginWrite(&tscMgmtEpSet.version);
  tscMgmtEpSet.epSet = *pEpSet;
  taosCorEndWrite(&tscMgmtEpSet.version);
102
}
S
Shengliang Guan 已提交
103
static void tscDumpEpSetFromVgroupInfo(SCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
104
  if (pVgroupInfo == NULL) { return;}
dengyihao's avatar
bugfix  
dengyihao 已提交
105
  taosCorBeginRead(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
106
  int8_t inUse = pVgroupInfo->inUse;
dengyihao's avatar
dengyihao 已提交
107 108 109
  pEpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0; 
  pEpSet->numOfEps = pVgroupInfo->numOfEps;  
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
Y
yihaoDeng 已提交
110
    tstrncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, sizeof(pEpSet->fqdn[i]));
dengyihao's avatar
dengyihao 已提交
111
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
S
slguan 已提交
112
  }
dengyihao's avatar
bugfix  
dengyihao 已提交
113
  taosCorEndRead(&pVgroupInfo->version);
S
slguan 已提交
114 115
}

dengyihao's avatar
dengyihao 已提交
116
static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
117 118
  SSqlCmd *pCmd = &pObj->cmd;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
dengyihao's avatar
dengyihao 已提交
119
  if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;}
S
Shengliang Guan 已提交
120
  SCorVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->corVgroupInfo;
dengyihao's avatar
dengyihao 已提交
121

dengyihao's avatar
bugfix  
dengyihao 已提交
122
  taosCorBeginWrite(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
123
  tscDebug("before: Endpoint in use: %d", pVgroupInfo->inUse);
124 125
  pVgroupInfo->inUse = pEpSet->inUse;
  pVgroupInfo->numOfEps = pEpSet->numOfEps;
dengyihao's avatar
dengyihao 已提交
126
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; i++) {
S
TD-1848  
Shengliang Guan 已提交
127
    tfree(pVgroupInfo->epAddr[i].fqdn);
H
Haojun Liao 已提交
128
    pVgroupInfo->epAddr[i].fqdn = strndup(pEpSet->fqdn[i], tListLen(pEpSet->fqdn[i]));
129
    pVgroupInfo->epAddr[i].port = pEpSet->port[i];
dengyihao's avatar
dengyihao 已提交
130
  }
H
Haojun Liao 已提交
131

dengyihao's avatar
dengyihao 已提交
132
  tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse);
dengyihao's avatar
bugfix  
dengyihao 已提交
133
  taosCorEndWrite(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
134
}
H
Haojun Liao 已提交
135

136 137 138 139
void tscPrintMgmtEp() {
  SRpcEpSet dump;
  tscDumpMgmtEpSet(&dump);
  if (dump.numOfEps <= 0) {
dengyihao's avatar
dengyihao 已提交
140
    tscError("invalid mnode EP list:%d", dump.numOfEps);
S
slguan 已提交
141
  } else {
142
    for (int i = 0; i < dump.numOfEps; ++i) {
143
      tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]);
S
slguan 已提交
144
    }
S
slguan 已提交
145
  }
S
slguan 已提交
146 147
}

H
hzcheng 已提交
148 149 150
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
H
Haojun Liao 已提交
151

H
hzcheng 已提交
152
  if (pObj != pObj->signature) {
H
Haojun Liao 已提交
153
    tscError("heartbeat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
H
hzcheng 已提交
154 155 156
    return;
  }

H
Haojun Liao 已提交
157
  SSqlObj *pSql = tres;
H
hzcheng 已提交
158 159 160
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
S
TD-1732  
Shengliang Guan 已提交
161 162
    SHeartBeatRsp *pRsp = (SHeartBeatRsp *)pRes->pRsp;
    SRpcEpSet *    epSet = &pRsp->epSet;
dengyihao's avatar
dengyihao 已提交
163 164 165
    if (epSet->numOfEps > 0) {
      tscEpSetHtons(epSet);
      tscUpdateMgmtEpSet(epSet);
S
TD-1732  
Shengliang Guan 已提交
166
    }
S
slguan 已提交
167

S
Shengliang Guan 已提交
168 169
    pSql->pTscObj->connId = htonl(pRsp->connId);

H
hzcheng 已提交
170 171
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
172
      return;
H
hzcheng 已提交
173
    } else {
S
slguan 已提交
174 175
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
176 177
    }
  } else {
H
Haojun Liao 已提交
178
    tscDebug("%p heartbeat failed, code:%s", pObj->pHb, tstrerror(code));
H
hzcheng 已提交
179 180
  }

H
Haojun Liao 已提交
181
  if (pObj->pHb != NULL) {
182
    int32_t waitingDuring = tsShellActivityTimer * 500;
H
Haojun Liao 已提交
183
    tscDebug("%p send heartbeat in %dms", pSql, waitingDuring);
H
Haojun Liao 已提交
184

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
185
    taosTmrReset(tscProcessActivityTimer, waitingDuring, (void *)pObj->rid, tscTmr, &pObj->pTimer);
H
Haojun Liao 已提交
186 187 188
  } else {
    tscDebug("%p start to close tscObj:%p, not send heartbeat again", pSql, pObj);
  }
H
hzcheng 已提交
189 190 191
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
192 193 194
  int64_t rid = (int64_t) handle;
  STscObj *pObj = taosAcquireRef(tscRefId, rid);
  if (pObj == NULL) return; 
H
hzcheng 已提交
195

196
  SSqlObj* pHB = pObj->pHb;
H
hzcheng 已提交
197

H
Haojun Liao 已提交
198
  void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE));
H
Haojun Liao 已提交
199 200
  if (p == NULL) {
    tscWarn("%p HB object has been released already", pHB);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
201
    taosReleaseRef(tscRefId, pObj->rid);
H
Haojun Liao 已提交
202 203 204 205 206
    return;
  }

  assert(*pHB->self == pHB);

207
  pHB->retry = 0;
H
Haojun Liao 已提交
208 209 210 211 212
  int32_t code = tscProcessSql(pHB);
  taosCacheRelease(tscObjCache, (void**) &p, false);

  if (code != TSDB_CODE_SUCCESS) {
    tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
H
hzcheng 已提交
213
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
214

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
215
  taosReleaseRef(tscRefId, rid);
H
hzcheng 已提交
216 217 218
}

int tscSendMsgToServer(SSqlObj *pSql) {
219
  STscObj* pObj = pSql->pTscObj;
H
hjxilinx 已提交
220 221 222
  SSqlCmd* pCmd = &pSql->cmd;
  
  char *pMsg = rpcMallocCont(pCmd->payloadLen);
S
slguan 已提交
223
  if (NULL == pMsg) {
224
    tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]);
225
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
226 227
  }

228 229
  // set the mgmt ip list
  if (pSql->cmd.command >= TSDB_SQL_MGMT) {
dengyihao's avatar
dengyihao 已提交
230
    tscDumpMgmtEpSet(&pSql->epSet);
J
jtao1735 已提交
231 232
  }

233
  memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
234

J
jtao1735 已提交
235
  SRpcMsg rpcMsg = {
236 237 238
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
      .contLen = pSql->cmd.payloadLen,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
239
      .ahandle = pSql,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
240
      .handle  = NULL,
H
hjxilinx 已提交
241
      .code    = 0
J
jtao1735 已提交
242
  };
H
Haojun Liao 已提交
243

244
  rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid);
H
Haojun Liao 已提交
245
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
246 247
}

248
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
S
TD-1530  
Shengliang Guan 已提交
249 250
  TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
  void** p = taosCacheAcquireByKey(tscObjCache, &handle, sizeof(TSDB_CACHE_PTR_TYPE));
H
Haojun Liao 已提交
251 252
  if (p == NULL) {
    rpcFreeCont(rpcMsg->pCont);
253 254
    return;
  }
255

H
Haojun Liao 已提交
256 257 258
  SSqlObj* pSql = *p;
  assert(pSql != NULL);

H
Haojun Liao 已提交
259
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
260 261
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
262

H
Haojun Liao 已提交
263
  assert(*pSql->self == pSql);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
264
  pSql->rpcRid = -1;
H
Haojun Liao 已提交
265

H
Haojun Liao 已提交
266
  if (pObj->signature != pObj) {
267
    tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
H
Haojun Liao 已提交
268

H
Haojun Liao 已提交
269
    taosCacheRelease(tscObjCache, (void**) &p, true);
H
Haojun Liao 已提交
270 271 272 273 274 275
    rpcFreeCont(rpcMsg->pCont);
    return;
  }

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
276
    tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
277
        pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
H
Haojun Liao 已提交
278

H
Haojun Liao 已提交
279 280 281
    void** p1 = p;
    taosCacheRelease(tscObjCache, (void**) &p1, false);

H
Haojun Liao 已提交
282
    taosCacheRelease(tscObjCache, (void**) &p, true);
283
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
284
    return;
H
hzcheng 已提交
285 286
  }

H
Haojun Liao 已提交
287
  if (pEpSet) {
dengyihao's avatar
dengyihao 已提交
288
    if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
H
Haojun Liao 已提交
289 290
      if (pCmd->command < TSDB_SQL_MGMT) {
        tscUpdateVgroupInfo(pSql, pEpSet);
dengyihao's avatar
dengyihao 已提交
291
      } else {
dengyihao's avatar
dengyihao 已提交
292
        tscUpdateMgmtEpSet(pEpSet);
H
Haojun Liao 已提交
293
      }
dengyihao's avatar
dengyihao 已提交
294
    }
J
jtao1735 已提交
295 296
  }

297 298 299 300 301
  int32_t cmd = pCmd->command;
  if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
      (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
       rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
       rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL ||
S
Shengliang Guan 已提交
302
       rpcMsg->code == TSDB_CODE_APP_NOT_READY ||
303 304 305 306 307 308 309
       rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) {
    tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);

    // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
    if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
      pSql->cmd.submitSchema = 1;
    }
310

311 312 313 314
    pSql->res.code = rpcMsg->code;  // keep the previous error code
    if (pSql->retry > pSql->maxRetry) {
      tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
    } else {
H
Haojun Liao 已提交
315
      // wait for a little bit moment and then retry, todo do not sleep in rpc callback thread
H
Haojun Liao 已提交
316 317 318 319 320
      if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
        int32_t duration = getWaitingTimeInterval(pSql->retry);
        taosMsleep(duration);
      }

B
Bomin Zhang 已提交
321
      rpcMsg->code = tscRenewTableMeta(pSql, 0);
322 323 324

      // if there is an error occurring, proceed to the following error handling procedure.
      if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
H
Haojun Liao 已提交
325
        taosCacheRelease(tscObjCache, (void**) &p, false);
326 327
        rpcFreeCont(rpcMsg->pCont);
        return;
H
hzcheng 已提交
328 329
      }
    }
S
slguan 已提交
330
  }
331

H
hzcheng 已提交
332
  pRes->rspLen = 0;
333
  
H
Haojun Liao 已提交
334
  if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
335
    tscDebug("%p query is cancelled, code:%s", pSql, tstrerror(pRes->code));
H
Haojun Liao 已提交
336 337
  } else {
    pRes->code = rpcMsg->code;
H
hzcheng 已提交
338 339
  }

S
slguan 已提交
340
  if (pRes->code == TSDB_CODE_SUCCESS) {
341
    tscDebug("%p reset retry counter to be 0 due to success rsp, old:%d", pSql, pSql->retry);
S
slguan 已提交
342 343 344
    pSql->retry = 0;
  }

345
  if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
346
    assert(rpcMsg->msgType == pCmd->msgType + 1);
347
    pRes->code    = rpcMsg->code;
348
    pRes->rspType = rpcMsg->msgType;
349
    pRes->rspLen  = rpcMsg->contLen;
H
hzcheng 已提交
350

351
    if (pRes->rspLen > 0 && rpcMsg->pCont) {
352 353
      char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
      if (tmp == NULL) {
354
        pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
355 356
      } else {
        pRes->pRsp = tmp;
357
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
358
      }
359
    } else {
H
Haojun Liao 已提交
360
      tfree(pRes->pRsp);
S
slguan 已提交
361 362
    }

H
hzcheng 已提交
363 364 365 366
    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
367
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
368
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
369 370 371 372 373 374 375
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
376
      tscDebug("%p SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql, sqlCmd[pCmd->command], 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
377
          tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
S
slguan 已提交
378
    } else {
379
      tscDebug("%p SQL cmd:%s, code:%s rspLen:%d", pSql, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
H
hzcheng 已提交
380 381
    }
  }
382
  
H
Haojun Liao 已提交
383
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
384
    rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
Haojun Liao 已提交
385
  }
S
Shengliang Guan 已提交
386

H
Haojun Liao 已提交
387
  bool shouldFree = tscShouldBeFreed(pSql);
388
  if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
S
TD-1057  
Shengliang Guan 已提交
389
    rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
H
Haojun Liao 已提交
390
    (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
H
hzcheng 已提交
391 392
  }

H
Haojun Liao 已提交
393
  void** p1 = p;
H
Haojun Liao 已提交
394
  taosCacheRelease(tscObjCache, (void**) &p1, false);
H
Haojun Liao 已提交
395

H
Haojun Liao 已提交
396 397
  if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it
    taosCacheRelease(tscObjCache, (void **)&p, true);
H
Haojun Liao 已提交
398 399 400
    tscDebug("%p sqlObj is automatically freed", pSql);
  }

401
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
402 403
}

S
slguan 已提交
404 405 406
int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
407

H
hjxilinx 已提交
408 409 410 411 412 413 414
  if (pCmd->command == TSDB_SQL_SELECT ||
      pCmd->command == TSDB_SQL_FETCH ||
      pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT ||
      pCmd->command == TSDB_SQL_CONNECT ||
      pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META ||
H
hjxilinx 已提交
415
      pCmd->command == TSDB_SQL_STABLEVGROUP) {
416 417 418 419 420 421
    pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
  }
  
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return pRes->code;
S
slguan 已提交
422
  }
423

424 425 426
  int32_t code = tscSendMsgToServer(pSql);

  // NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads.
427
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
428
    pRes->code = code;
H
hjxilinx 已提交
429
    tscQueueAsyncRes(pSql);
H
Haojun Liao 已提交
430
    return code;
S
slguan 已提交
431
  }
H
hjxilinx 已提交
432 433
  
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
434 435 436
}

int tscProcessSql(SSqlObj *pSql) {
437
  char    *name = NULL;
438
  SSqlCmd *pCmd = &pSql->cmd;
439
  
440
  SQueryInfo     *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
441
  STableMetaInfo *pTableMetaInfo = NULL;
442
  uint32_t        type = 0;
443

444
  if (pQueryInfo != NULL) {
H
hjxilinx 已提交
445
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
446
    name = (pTableMetaInfo != NULL)? pTableMetaInfo->name:NULL;
447
    type = pQueryInfo->type;
448

H
hjxilinx 已提交
449
    // while numOfTables equals to 0, it must be Heartbeat
H
hjxilinx 已提交
450
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
451
  }
452

453
  tscDebug("%p SQL cmd:%s will be processed, name:%s, type:%d", pSql, sqlCmd[pCmd->command], name, type);
H
hjxilinx 已提交
454
  if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL
H
hjxilinx 已提交
455
    if (pTableMetaInfo == NULL) {
456
      pSql->res.code = TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
457 458
      return pSql->res.code;
    }
H
hjxilinx 已提交
459
  } else if (pCmd->command < TSDB_SQL_LOCAL) {
460
    //pSql->epSet = tscMgmtEpSet;
H
hzcheng 已提交
461 462 463
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }
464
  
S
slguan 已提交
465 466
  return doProcessSql(pSql);
}
H
hzcheng 已提交
467

J
jtao1735 已提交
468
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
469
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
S
slguan 已提交
470
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
S
slguan 已提交
471

H
Haojun Liao 已提交
472
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
S
slguan 已提交
473
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
474

475
  // todo valid the vgroupId at the client side
H
hjxilinx 已提交
476 477
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  
weixin_48148422's avatar
weixin_48148422 已提交
478
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
479
    int32_t vgIndex = pTableMetaInfo->vgroupIndex;
H
Haojun Liao 已提交
480 481 482 483 484 485 486 487 488
    if (pTableMetaInfo->pVgroupTables == NULL) {
      SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList;
      assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);

      pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
      tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex);
    } else {
      int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
      assert(vgIndex >= 0 && vgIndex < numOfVgroups);
H
Haojun Liao 已提交
489

H
Haojun Liao 已提交
490 491 492 493 494
      SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);

      pRetrieveMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId);
      tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex);
    }
495
  } else {
H
hjxilinx 已提交
496
    STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
497
    pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
H
Haojun Liao 已提交
498
    tscDebug("%p build fetch msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId);
499
  }
500 501

  pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg);
J
jtao1735 已提交
502
  pSql->cmd.msgType = TSDB_MSG_TYPE_FETCH;
503 504 505

  pRetrieveMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));

506
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
507 508
}

509
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hjxilinx 已提交
510
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
511
  STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
H
hjxilinx 已提交
512
  
513
  char* pMsg = pSql->cmd.payload;
514 515 516
  
  // NOTE: shell message size should not include SMsgDesc
  int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
517 518
  int32_t vgId = pTableMeta->vgroupInfo.vgId;

519
  SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
520 521
  pMsgDesc->numOfVnodes = htonl(1); // always one vnode

522
  pMsg += sizeof(SMsgDesc);
523
  SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
524

H
hjxilinx 已提交
525
  pShellMsg->header.vgId = htonl(vgId);
H
Haojun Liao 已提交
526
  pShellMsg->header.contLen = htonl(size);      // the length not includes the size of SMsgDesc
527
  pShellMsg->length = pShellMsg->header.contLen;
H
hjxilinx 已提交
528
  
H
Haojun Liao 已提交
529
  pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit);  // number of tables to be inserted
H
hzcheng 已提交
530

H
hjxilinx 已提交
531
  // pSql->cmd.payloadLen is set during copying data into payload
S
slguan 已提交
532
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
dengyihao's avatar
dengyihao 已提交
533
  tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet);
534

535 536
  tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
      pSql->epSet.numOfEps);
537
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
538 539 540
}

/*
541
 * for table query, simply return the size <= 1k
H
hzcheng 已提交
542
 */
543
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
544
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
545
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
546

S
TD-1057  
Shengliang Guan 已提交
547
  int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo));
H
Haojun Liao 已提交
548 549

  size_t  numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
H
Haojun Liao 已提交
550
  int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs * 2);
H
Haojun Liao 已提交
551

H
Haojun Liao 已提交
552 553 554 555 556 557 558 559 560 561
  int32_t tsBufSize = (pQueryInfo->tsBuf != NULL) ? pQueryInfo->tsBuf->fileSize : 0;

  int32_t tableSerialize = 0;
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  if (pTableMetaInfo->pVgroupTables != NULL) {
    size_t numOfGroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables);

    int32_t totalTables = 0;
    for (int32_t i = 0; i < numOfGroups; ++i) {
      SVgroupTableInfo *pTableInfo = taosArrayGet(pTableMetaInfo->pVgroupTables, i);
H
Haojun Liao 已提交
562
      totalTables += (int32_t) taosArrayGetSize(pTableInfo->itemList);
H
Haojun Liao 已提交
563 564 565 566
    }

    tableSerialize = totalTables * sizeof(STableIdInfo);
  }
H
Haojun Liao 已提交
567

H
Haojun Liao 已提交
568 569
  return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + tsBufSize +
         tableSerialize + 4096;
H
hzcheng 已提交
570 571
}

572
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
573
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
B
Bomin Zhang 已提交
574
  TSKEY dfltKey = htobe64(pQueryMsg->window.skey);
575

H
hjxilinx 已提交
576
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
weixin_48148422's avatar
weixin_48148422 已提交
577
  if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
578
    
S
TD-1732  
Shengliang Guan 已提交
579
    SVgroupInfo* pVgroupInfo = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
580
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
581 582
      int32_t index = pTableMetaInfo->vgroupIndex;
      assert(index >= 0);
H
hjxilinx 已提交
583
  
B
Bomin Zhang 已提交
584
      if (pTableMetaInfo->vgroupList->numOfVgroups > 0) {
H
Haojun Liao 已提交
585
        assert(index < pTableMetaInfo->vgroupList->numOfVgroups);
B
Bomin Zhang 已提交
586 587
        pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
      }
H
Haojun Liao 已提交
588
      tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups);
H
hjxilinx 已提交
589 590
    } else {
      pVgroupInfo = &pTableMeta->vgroupInfo;
591
    }
weixin_48148422's avatar
weixin_48148422 已提交
592

593 594 595 596
    assert(pVgroupInfo != NULL);

    tscSetDnodeEpSet(pSql, pVgroupInfo);
    pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
weixin_48148422's avatar
weixin_48148422 已提交
597

598
    STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
599 600 601
    pTableIdInfo->tid = htonl(pTableMeta->id.tid);
    pTableIdInfo->uid = htobe64(pTableMeta->id.uid);
    pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->id.uid, dfltKey));
weixin_48148422's avatar
weixin_48148422 已提交
602

603 604
    pQueryMsg->numOfTables = htonl(1);  // set the number of tables
    pMsg += sizeof(STableIdInfo);
605
  } else { // it is a subquery of the super table query, this EP info is acquired from vgroupInfo
606
    int32_t index = pTableMetaInfo->vgroupIndex;
S
TD-1057  
Shengliang Guan 已提交
607
    int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
608
    assert(index >= 0 && index < numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
609

610
    tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
611

612
    SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
613

dengyihao's avatar
bugfix  
dengyihao 已提交
614
    // set the vgroup info 
615
    tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo);
616 617
    pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
    
S
TD-1057  
Shengliang Guan 已提交
618
    int32_t numOfTables = (int32_t)taosArrayGetSize(pTableIdList->itemList);
619 620 621 622 623 624 625 626 627
    pQueryMsg->numOfTables = htonl(numOfTables);  // set the number of tables
  
    // serialize each table id info
    for(int32_t i = 0; i < numOfTables; ++i) {
      STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i);
      
      STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
      pTableIdInfo->tid = htonl(pItem->tid);
      pTableIdInfo->uid = htobe64(pItem->uid);
B
Bomin Zhang 已提交
628
      pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pItem->uid, dfltKey));
629 630 631 632
      pMsg += sizeof(STableIdInfo);
    }
  }
  
H
Haojun Liao 已提交
633 634
  tscDebug("%p vgId:%d, query on table:%s, tid:%d, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name,
      pTableMeta->id.tid, pTableMeta->id.uid);
H
hjxilinx 已提交
635
  
636 637 638
  return pMsg;
}

639
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
640 641
  SSqlCmd *pCmd = &pSql->cmd;

642
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
643

S
slguan 已提交
644 645
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
H
Haojun Liao 已提交
646
    return TSDB_CODE_TSC_INVALID_SQL;  // todo add test for this
S
slguan 已提交
647
  }
648
  
649
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
650
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
651
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
652 653 654

  size_t numOfSrcCols = taosArrayGetSize(pQueryInfo->colList);
  if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo)) {
S
Shengliang Guan 已提交
655
    tscError("%p illegal value of numOfCols in query msg: %" PRIu64 ", table cols:%d", pSql, (uint64_t)numOfSrcCols,
656 657
        tscGetNumOfColumns(pTableMeta));

H
Haojun Liao 已提交
658
    return TSDB_CODE_TSC_INVALID_SQL;
659
  }
660
  
661
  if (pQueryInfo->interval.interval < 0) {
S
TD-1530  
Shengliang Guan 已提交
662
    tscError("%p illegal value of aggregation time interval in query msg: %" PRId64, pSql, (int64_t)pQueryInfo->interval.interval);
H
Haojun Liao 已提交
663
    return TSDB_CODE_TSC_INVALID_SQL;
664 665 666 667
  }
  
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
H
Haojun Liao 已提交
668
    return TSDB_CODE_TSC_INVALID_SQL;
669
  }
670

671
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
672
  tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version));
H
hzcheng 已提交
673

S
TD-1057  
Shengliang Guan 已提交
674
  int32_t numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList);
675
  
676
  if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
H
hjxilinx 已提交
677 678
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey);
H
hzcheng 已提交
679
  } else {
H
hjxilinx 已提交
680 681
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.ekey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey);
H
hzcheng 已提交
682 683
  }

684 685
  pQueryMsg->order          = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId     = htons(pQueryInfo->order.orderColId);
H
Haojun Liao 已提交
686
  pQueryMsg->fillType       = htons(pQueryInfo->fillType);
687 688
  pQueryMsg->limit          = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset         = htobe64(pQueryInfo->limit.offset);
S
TD-1057  
Shengliang Guan 已提交
689
  pQueryMsg->numOfCols      = htons((int16_t)taosArrayGetSize(pQueryInfo->colList));
H
Haojun Liao 已提交
690 691
  pQueryMsg->interval.interval = htobe64(pQueryInfo->interval.interval);
  pQueryMsg->interval.sliding  = htobe64(pQueryInfo->interval.sliding);
692 693
  pQueryMsg->interval.offset   = htobe64(pQueryInfo->interval.offset);
  pQueryMsg->interval.intervalUnit = pQueryInfo->interval.intervalUnit;
H
Haojun Liao 已提交
694 695
  pQueryMsg->interval.slidingUnit  = pQueryInfo->interval.slidingUnit;
  pQueryMsg->interval.offsetUnit   = pQueryInfo->interval.offsetUnit;
696
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
weixin_48148422's avatar
weixin_48148422 已提交
697
  pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
698
  pQueryMsg->numOfTags      = htonl(numOfTags);
H
Haojun Liao 已提交
699
  pQueryMsg->queryType      = htonl(pQueryInfo->type);
H
hjxilinx 已提交
700 701
  
  size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
H
Haojun Liao 已提交
702
  pQueryMsg->numOfOutput = htons((int16_t)numOfOutput);  // this is the stage one output column number
H
hzcheng 已提交
703 704

  // set column list ids
705 706
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
H
hjxilinx 已提交
707
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
708
  
709 710 711
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
    SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
712

713
    if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
714
        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
715 716
      tscError("%p tid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
          pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
717 718
               pColSchema->name);

719
      return TSDB_CODE_TSC_INVALID_SQL;
720
    }
H
hzcheng 已提交
721 722 723

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
724
    pQueryMsg->colList[i].type  = htons(pColSchema->type);
S
slguan 已提交
725
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
726

S
slguan 已提交
727 728 729
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
730

S
slguan 已提交
731
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
732
      pFilterMsg->filterstr = htons(pColFilter->filterstr);
S
slguan 已提交
733 734

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
735

736
      if (pColFilter->filterstr) {
S
slguan 已提交
737
        pFilterMsg->len = htobe64(pColFilter->len);
738
        memcpy(pMsg, (void *)pColFilter->pz, (size_t)(pColFilter->len + 1));
S
slguan 已提交
739 740 741 742 743 744 745 746
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
747

S
slguan 已提交
748 749
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
H
Haojun Liao 已提交
750
        return TSDB_CODE_TSC_INVALID_SQL;
S
slguan 已提交
751 752
      }
    }
H
hzcheng 已提交
753 754
  }

H
hjxilinx 已提交
755
  SSqlFuncMsg *pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hjxilinx 已提交
756
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
757
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
758

759
    if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
H
hzcheng 已提交
760
      tscError("%p table schema is not matched with parsed sql", pSql);
H
Haojun Liao 已提交
761
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
762 763
    }

H
Haojun Liao 已提交
764 765
    assert(pExpr->resColId < 0);

766 767 768
    pSqlFuncExpr->colInfo.colId    = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
    pSqlFuncExpr->colInfo.flag     = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
769

770
    pSqlFuncExpr->functionId  = htons(pExpr->functionId);
H
hzcheng 已提交
771
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
H
Haojun Liao 已提交
772
    pSqlFuncExpr->resColId    = htons(pExpr->resColId);
H
hjxilinx 已提交
773
    pMsg += sizeof(SSqlFuncMsg);
H
hzcheng 已提交
774 775

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
H
Haojun Liao 已提交
776
      // todo add log
H
hzcheng 已提交
777 778 779 780 781
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
782
        pMsg += pExpr->param[j].nLen;
H
hzcheng 已提交
783 784 785 786 787
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

H
hjxilinx 已提交
788
    pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hzcheng 已提交
789
  }
H
Haojun Liao 已提交
790

H
Haojun Liao 已提交
791 792
  size_t output = tscNumOfFields(pQueryInfo);

H
Haojun Liao 已提交
793
  if (tscIsSecondStageQuery(pQueryInfo)) {
S
Shengliang Guan 已提交
794
    pQueryMsg->secondStageOutput = htonl((int32_t) output);
H
Haojun Liao 已提交
795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857

    SSqlFuncMsg *pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;

    for (int32_t i = 0; i < output; ++i) {
      SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i);
      SSqlExpr *pExpr = pField->pSqlExpr;
      if (pExpr != NULL) {
        if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
          tscError("%p table schema is not matched with parsed sql", pSql);
          return TSDB_CODE_TSC_INVALID_SQL;
        }

        pSqlFuncExpr1->colInfo.colId    = htons(pExpr->colInfo.colId);
        pSqlFuncExpr1->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
        pSqlFuncExpr1->colInfo.flag     = htons(pExpr->colInfo.flag);

        pSqlFuncExpr1->functionId  = htons(pExpr->functionId);
        pSqlFuncExpr1->numOfParams = htons(pExpr->numOfParams);
        pMsg += sizeof(SSqlFuncMsg);

        for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
          // todo add log
          pSqlFuncExpr1->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
          pSqlFuncExpr1->arg[j].argBytes = htons(pExpr->param[j].nLen);

          if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
            memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
            pMsg += pExpr->param[j].nLen;
          } else {
            pSqlFuncExpr1->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
          }
        }

        pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;
      } else {
        assert(pField->pArithExprInfo != NULL);
        SExprInfo* pExprInfo = pField->pArithExprInfo;

        pSqlFuncExpr1->colInfo.colId    = htons(pExprInfo->base.colInfo.colId);
        pSqlFuncExpr1->functionId  = htons(pExprInfo->base.functionId);
        pSqlFuncExpr1->numOfParams = htons(pExprInfo->base.numOfParams);
        pMsg += sizeof(SSqlFuncMsg);

        for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
          // todo add log
          pSqlFuncExpr1->arg[j].argType = htons((uint16_t)pExprInfo->base.arg[j].argType);
          pSqlFuncExpr1->arg[j].argBytes = htons(pExprInfo->base.arg[j].argBytes);

          if (pExprInfo->base.arg[j].argType == TSDB_DATA_TYPE_BINARY) {
            memcpy(pMsg, pExprInfo->base.arg[j].argValue.pz, pExprInfo->base.arg[j].argBytes);
            pMsg += pExprInfo->base.arg[j].argBytes;
          } else {
            pSqlFuncExpr1->arg[j].argValue.i64 = htobe64(pExprInfo->base.arg[j].argValue.i64);
          }
        }

        pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;
      }
    }
  } else {
    pQueryMsg->secondStageOutput = 0;
  }

858
  // serialize the table info (sid, uid, tags)
859 860
  pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
  
861
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
862
  if (pGroupbyExpr->numOfGroupCols > 0) {
S
slguan 已提交
863
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
864 865
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
866
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
867 868
      SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
  
S
slguan 已提交
869 870 871
      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

872 873
      *((int16_t *)pMsg) += pCol->colIndex;
      pMsg += sizeof(pCol->colIndex);
S
slguan 已提交
874 875 876

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
877 878 879
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
880 881 882
    }
  }

883
  if (pQueryInfo->fillType != TSDB_FILL_NONE) {
H
Haojun Liao 已提交
884
    for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
885 886
      *((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]);
      pMsg += sizeof(pQueryInfo->fillVal[0]);
S
slguan 已提交
887 888
    }
  }
889 890 891 892 893 894 895 896 897
  
  if (numOfTags != 0) {
    int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
    int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
    int32_t total = numOfTagColumns + numOfColumns;
    
    pSchema = tscGetTableTagSchema(pTableMeta);
    
    for (int32_t i = 0; i < numOfTags; ++i) {
H
hjxilinx 已提交
898
      SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
899 900 901 902
      SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];

      if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
          (pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
903 904
        tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
                 pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, total, numOfTagColumns,
B
Bomin Zhang 已提交
905
                 pCol->colIndex.columnIndex, pColSchema->name);
906

907
        return TSDB_CODE_TSC_INVALID_SQL;
908 909 910 911 912 913 914 915 916 917 918 919
      }
  
      SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
  
      pTagCol->colId = htons(pColSchema->colId);
      pTagCol->bytes = htons(pColSchema->bytes);
      pTagCol->type  = htons(pColSchema->type);
      pTagCol->numOfFilters = 0;
      
      pMsg += sizeof(SColumnInfo);
    }
  }
S
slguan 已提交
920

H
Haojun Liao 已提交
921 922 923 924
  // serialize tag column query condition
  if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
    STagCond* pTagCond = &pQueryInfo->tagCond;
    
925
    SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid);
H
Haojun Liao 已提交
926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941
    if (pCond != NULL && pCond->cond != NULL) {
      pQueryMsg->tagCondLen = htons(pCond->len);
      memcpy(pMsg, pCond->cond, pCond->len);
      
      pMsg += pCond->len;
    }
  }
  
  if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
    *pMsg = 0;
    pMsg++;
  } else {
    strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
    pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
  }

S
slguan 已提交
942
  // compressed ts block
S
TD-1057  
Shengliang Guan 已提交
943
  pQueryMsg->tsOffset = htonl((int32_t)(pMsg - pCmd->payload));
S
slguan 已提交
944

945
  if (pQueryInfo->tsBuf != NULL) {
946 947
    // note: here used the index instead of actual vnode id.
    int32_t vnodeIndex = pTableMetaInfo->vgroupIndex;
H
Haojun Liao 已提交
948
    int32_t code = dumpFileBlockByGroupId(pQueryInfo->tsBuf, vnodeIndex, pMsg, &pQueryMsg->tsLen, &pQueryMsg->tsNumOfBlocks);
949
    if (code != TSDB_CODE_SUCCESS) {
B
Bomin Zhang 已提交
950 951
      return code;
    }
S
slguan 已提交
952

953
    pMsg += pQueryMsg->tsLen;
H
hzcheng 已提交
954

955
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
956 957
    pQueryMsg->tsLen   = htonl(pQueryMsg->tsLen);
    pQueryMsg->tsNumOfBlocks = htonl(pQueryMsg->tsNumOfBlocks);
H
hzcheng 已提交
958 959
  }

S
TD-1057  
Shengliang Guan 已提交
960
  int32_t msgLen = (int32_t)(pMsg - pCmd->payload);
H
hzcheng 已提交
961

H
Haojun Liao 已提交
962
  tscDebug("%p msg built success, len:%d bytes", pSql, msgLen);
H
hzcheng 已提交
963
  pCmd->payloadLen = msgLen;
S
slguan 已提交
964
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hjxilinx 已提交
965
  
966
  pQueryMsg->head.contLen = htonl(msgLen);
S
TD-1057  
Shengliang Guan 已提交
967
  assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize);
968 969

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
970 971
}

972 973
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
974
  pCmd->payloadLen = sizeof(SCreateDbMsg);
S
slguan 已提交
975
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
H
hzcheng 已提交
976

S
TD-1732  
Shengliang Guan 已提交
977
  SCreateDbMsg *pCreateDbMsg = (SCreateDbMsg *)pCmd->payload;
978

979
  assert(pCmd->numOfClause == 1);
980
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
981
  tstrncpy(pCreateDbMsg->db, pTableMetaInfo->name, sizeof(pCreateDbMsg->db));
H
hzcheng 已提交
982

983
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
984 985
}

986 987
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
988
  pCmd->payloadLen = sizeof(SCreateDnodeMsg);
S
slguan 已提交
989 990
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
991
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
992
  }
H
hzcheng 已提交
993

S
TD-1732  
Shengliang Guan 已提交
994
  SCreateDnodeMsg *pCreate = (SCreateDnodeMsg *)pCmd->payload;
J
jtao1735 已提交
995 996
  strncpy(pCreate->ep, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
  
S
slguan 已提交
997
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
H
hzcheng 已提交
998

999
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1000 1001
}

1002 1003
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1762  
Shengliang Guan 已提交
1004
  pCmd->payloadLen = sizeof(SCreateAcctMsg);
S
slguan 已提交
1005 1006
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1007
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1008
  }
H
hzcheng 已提交
1009

S
TD-1762  
Shengliang Guan 已提交
1010
  SCreateAcctMsg *pAlterMsg = (SCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
1011

H
Haojun Liao 已提交
1012 1013
  SStrToken *pName = &pInfo->pDCLInfo->user.user;
  SStrToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
1014

1015 1016
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
1017

1018
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
1019

1020 1021 1022 1023 1024 1025 1026 1027
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
1028

1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
1042

S
slguan 已提交
1043
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
1044
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1045 1046
}

1047 1048
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1049
  pCmd->payloadLen = sizeof(SCreateUserMsg);
S
slguan 已提交
1050

S
slguan 已提交
1051 1052
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1053
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1054 1055
  }

S
TD-1732  
Shengliang Guan 已提交
1056
  SCreateUserMsg *pAlterMsg = (SCreateUserMsg *)pCmd->payload;
H
hzcheng 已提交
1057

1058 1059
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
S
TD-1057  
Shengliang Guan 已提交
1060
  pAlterMsg->flag = (int8_t)pUser->type;
H
hzcheng 已提交
1061

1062 1063 1064 1065
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
1066 1067
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
1068
  }
H
hzcheng 已提交
1069

1070
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
S
slguan 已提交
1071
    pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
1072
  } else {
S
slguan 已提交
1073
    pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
1074
  }
H
hzcheng 已提交
1075

1076
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1077 1078
}

1079 1080
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1081
  pCmd->payloadLen = sizeof(SCfgDnodeMsg);
guanshengliang's avatar
guanshengliang 已提交
1082
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONFIG_DNODE;
1083 1084
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
1085

1086 1087
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1088
  pCmd->payloadLen = sizeof(SDropDbMsg);
H
hzcheng 已提交
1089

S
slguan 已提交
1090 1091
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1092
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1093 1094
  }

S
TD-1732  
Shengliang Guan 已提交
1095
  SDropDbMsg *pDropDbMsg = (SDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
1096

1097
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1098
  tstrncpy(pDropDbMsg->db, pTableMetaInfo->name, sizeof(pDropDbMsg->db));
1099
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1100

S
slguan 已提交
1101
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
1102
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1103 1104
}

1105 1106
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1107
  pCmd->payloadLen = sizeof(SCMDropTableMsg);
H
hzcheng 已提交
1108

S
slguan 已提交
1109 1110
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1111
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1112
  }
H
hzcheng 已提交
1113

1114
  SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
1115
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1116
  strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
S
slguan 已提交
1117
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1118

S
slguan 已提交
1119
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
1120
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1121 1122
}

1123
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1124
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1125
  pCmd->payloadLen = sizeof(SDropDnodeMsg);
S
slguan 已提交
1126 1127
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1128
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1129
  }
H
hzcheng 已提交
1130

S
TD-1732  
Shengliang Guan 已提交
1131
  SDropDnodeMsg * pDrop = (SDropDnodeMsg *)pCmd->payload;
1132
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1133
  tstrncpy(pDrop->ep, pTableMetaInfo->name, sizeof(pDrop->ep));
S
slguan 已提交
1134
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
H
hzcheng 已提交
1135

1136
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1137 1138
}

S
[TD-16]  
slguan 已提交
1139
int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1140
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1141
  pCmd->payloadLen = sizeof(SDropUserMsg);
S
slguan 已提交
1142
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
H
hzcheng 已提交
1143

S
slguan 已提交
1144 1145
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1146
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1147
  }
H
hzcheng 已提交
1148

S
TD-1732  
Shengliang Guan 已提交
1149
  SDropUserMsg *  pDropMsg = (SDropUserMsg *)pCmd->payload;
1150
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1151
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
H
hzcheng 已提交
1152

1153
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1154 1155
}

S
[TD-16]  
slguan 已提交
1156 1157
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1158
  pCmd->payloadLen = sizeof(SDropUserMsg);
S
[TD-16]  
slguan 已提交
1159 1160 1161 1162
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_ACCT;

  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1163
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
[TD-16]  
slguan 已提交
1164 1165
  }

S
TD-1732  
Shengliang Guan 已提交
1166
  SDropUserMsg *  pDropMsg = (SDropUserMsg *)pCmd->payload;
S
[TD-16]  
slguan 已提交
1167
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1168
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
S
[TD-16]  
slguan 已提交
1169 1170 1171 1172

  return TSDB_CODE_SUCCESS;
}

1173 1174
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1175
  pCmd->payloadLen = sizeof(SUseDbMsg);
H
hzcheng 已提交
1176

S
slguan 已提交
1177 1178
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1179
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1180
  }
1181

S
TD-1732  
Shengliang Guan 已提交
1182
  SUseDbMsg *pUseDbMsg = (SUseDbMsg *)pCmd->payload;
1183
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1184
  strcpy(pUseDbMsg->db, pTableMetaInfo->name);
S
slguan 已提交
1185
  pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
H
hzcheng 已提交
1186

1187
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1188 1189
}

1190
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1191
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1192
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1193
  pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
S
TD-1732  
Shengliang Guan 已提交
1194
  pCmd->payloadLen = sizeof(SShowMsg) + 100;
H
hzcheng 已提交
1195

S
slguan 已提交
1196 1197
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1198
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1199
  }
H
hzcheng 已提交
1200

S
TD-1732  
Shengliang Guan 已提交
1201
  SShowMsg *pShowMsg = (SShowMsg *)pCmd->payload;
S
slguan 已提交
1202

1203
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1204
  size_t nameLen = strlen(pTableMetaInfo->name);
S
slguan 已提交
1205
  if (nameLen > 0) {
B
Bomin Zhang 已提交
1206
    tstrncpy(pShowMsg->db, pTableMetaInfo->name, sizeof(pShowMsg->db));  // prefix is set here
H
hzcheng 已提交
1207
  } else {
B
Bomin Zhang 已提交
1208
    tstrncpy(pShowMsg->db, pObj->db, sizeof(pShowMsg->db));
H
hzcheng 已提交
1209 1210
  }

1211 1212 1213 1214
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
H
Haojun Liao 已提交
1215
    SStrToken *pPattern = &pShowInfo->pattern;
1216 1217 1218 1219 1220
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
H
Haojun Liao 已提交
1221
    SStrToken *pEpAddr = &pShowInfo->prefix;
dengyihao's avatar
dengyihao 已提交
1222
    assert(pEpAddr->n > 0 && pEpAddr->type > 0);
H
hzcheng 已提交
1223

dengyihao's avatar
dengyihao 已提交
1224 1225
    strncpy(pShowMsg->payload, pEpAddr->z, pEpAddr->n);
    pShowMsg->payloadLen = htons(pEpAddr->n);
1226 1227
  }

S
TD-1732  
Shengliang Guan 已提交
1228
  pCmd->payloadLen = sizeof(SShowMsg) + pShowMsg->payloadLen;
1229
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1230 1231
}

1232
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1233
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1234
  pCmd->payloadLen = sizeof(SKillQueryMsg);
H
hzcheng 已提交
1235

1236 1237
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
S
slguan 已提交
1238
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
1239 1240
      break;
    case TSDB_SQL_KILL_CONNECTION:
S
slguan 已提交
1241
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
1242 1243
      break;
    case TSDB_SQL_KILL_STREAM:
S
slguan 已提交
1244
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
1245 1246 1247
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1248 1249
}

1250
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1251 1252
  SSqlCmd *pCmd = &(pSql->cmd);

guanshengliang's avatar
guanshengliang 已提交
1253
  int32_t size = minMsgSize() + sizeof(SCMCreateTableMsg);
H
hzcheng 已提交
1254

1255
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1256
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
1257 1258
    size += sizeof(STagData);
  } else {
S
slguan 已提交
1259
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1260
  }
1261

1262 1263 1264
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1265 1266 1267 1268

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1269
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1270
  int              msgLen = 0;
S
slguan 已提交
1271
  SSchema *        pSchema;
H
hzcheng 已提交
1272
  int              size = 0;
1273 1274 1275
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1276
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1277 1278

  // Reallocate the payload size
1279
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1280 1281
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1282
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1283
  }
H
hzcheng 已提交
1284 1285


1286
  SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
H
hjxilinx 已提交
1287
  strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
1288 1289

  // use dbinfo from table id without modifying current db info
H
Haojun Liao 已提交
1290
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pCreateTableMsg->db);
H
hzcheng 已提交
1291

1292 1293 1294
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;
H
hzcheng 已提交
1295 1296 1297 1298
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
S
slguan 已提交
1299
  char *pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
1300

1301 1302
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
1303 1304 1305 1306 1307 1308 1309
    STagData* pTag = &pInfo->pCreateTableInfo->usingInfo.tagdata;
    *(int32_t*)pMsg = htonl(pTag->dataLen);
    pMsg += sizeof(int32_t);
    memcpy(pMsg, pTag->name, sizeof(pTag->name));
    pMsg += sizeof(pTag->name);
    memcpy(pMsg, pTag->data, pTag->dataLen);
    pMsg += pTag->dataLen;
1310
  } else {  // create (super) table
1311
    pSchema = (SSchema *)pCreateTableMsg->schema;
1312

H
hzcheng 已提交
1313
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
H
hjxilinx 已提交
1314
      TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hzcheng 已提交
1315 1316 1317 1318

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
1319

H
hzcheng 已提交
1320 1321 1322 1323
      pSchema++;
    }

    pMsg = (char *)pSchema;
1324 1325
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
1326

1327 1328 1329
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
1330 1331 1332
    }
  }

H
hjxilinx 已提交
1333
  tscFieldInfoClear(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
1334

S
TD-1057  
Shengliang Guan 已提交
1335
  msgLen = (int32_t)(pMsg - (char*)pCreateTableMsg);
S
slguan 已提交
1336
  pCreateTableMsg->contLen = htonl(msgLen);
H
hzcheng 已提交
1337
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1338
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
H
hzcheng 已提交
1339 1340

  assert(msgLen + minMsgSize() <= size);
1341
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1342 1343 1344
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
1345
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
S
TD-1732  
Shengliang Guan 已提交
1346
  return minMsgSize() + sizeof(SAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) + TSDB_EXTRA_PAYLOAD_SIZE;
H
hzcheng 已提交
1347 1348
}

1349
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1350 1351
  char *pMsg;
  int   msgLen = 0;
H
hzcheng 已提交
1352

1353
  SSqlCmd    *pCmd = &pSql->cmd;
1354 1355
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
1356
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1357 1358 1359
  
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;
  int size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
1360 1361
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
H
Haojun Liao 已提交
1362
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1363
  }
1364
  
S
TD-1732  
Shengliang Guan 已提交
1365
  SAlterTableMsg *pAlterTableMsg = (SAlterTableMsg *)pCmd->payload;
H
Haojun Liao 已提交
1366
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
1367

H
hjxilinx 已提交
1368
  strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
1369
  pAlterTableMsg->type = htons(pAlterInfo->type);
1370

1371
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
S
slguan 已提交
1372
  SSchema *pSchema = pAlterTableMsg->schema;
1373
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
H
hjxilinx 已提交
1374
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
1375
  
H
hzcheng 已提交
1376 1377 1378 1379 1380 1381 1382
    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;
1383 1384 1385
  pAlterTableMsg->tagValLen = htonl(pAlterInfo->tagData.dataLen);
  memcpy(pMsg, pAlterInfo->tagData.data, pAlterInfo->tagData.dataLen);
  pMsg += pAlterInfo->tagData.dataLen;
H
hzcheng 已提交
1386

S
TD-1057  
Shengliang Guan 已提交
1387
  msgLen = (int32_t)(pMsg - (char*)pAlterTableMsg);
1388

H
hzcheng 已提交
1389
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1390
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
H
hzcheng 已提交
1391 1392

  assert(msgLen + minMsgSize() <= size);
1393

1394
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1395 1396
}

1397 1398 1399 1400
int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
  SSqlCmd* pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL;
  
1401
  SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
1402
  pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
1403

1404 1405
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1406

dengyihao's avatar
dengyihao 已提交
1407
  tscDumpEpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->epSet);
1408

1409 1410 1411
  return TSDB_CODE_SUCCESS;
}

1412
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1413
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1414
  pCmd->payloadLen = sizeof(SAlterDbMsg);
S
slguan 已提交
1415
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
H
hzcheng 已提交
1416

S
TD-1732  
Shengliang Guan 已提交
1417
  SAlterDbMsg *pAlterDbMsg = (SAlterDbMsg* )pCmd->payload;
1418
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1419
  tstrncpy(pAlterDbMsg->db, pTableMetaInfo->name, sizeof(pAlterDbMsg->db));
H
hzcheng 已提交
1420

1421
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1422 1423
}

1424
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1425
  SSqlCmd *pCmd = &pSql->cmd;
J
jtao1735 已提交
1426
  pCmd->msgType = TSDB_MSG_TYPE_CM_RETRIEVE;
S
slguan 已提交
1427
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
1428

S
slguan 已提交
1429 1430
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1431
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
1432
  }
S
slguan 已提交
1433

S
slguan 已提交
1434 1435 1436 1437
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
1438

1439
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
1451

H
hzcheng 已提交
1452 1453 1454 1455 1456 1457
  pRes->code = TSDB_CODE_SUCCESS;
  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

1458 1459 1460 1461 1462 1463
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
    if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
      return pRes->code;
    }

    tscSetResRawPtr(pRes, pQueryInfo);
H
hzcheng 已提交
1464
  } else {
S
slguan 已提交
1465
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
1481
  SSqlCmd *       pCmd = &pSql->cmd;
1482
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
1483

H
hjxilinx 已提交
1484
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1485 1486
  
  int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
H
hzcheng 已提交
1487 1488 1489
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
Haojun Liao 已提交
1490 1491 1492
int tscProcessLocalRetrieveRsp(SSqlObj *pSql) {
  int32_t numOfRes = 1;
  pSql->res.completed = true;
H
hzcheng 已提交
1493 1494 1495
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
hjxilinx 已提交
1496
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
H
hzcheng 已提交
1497
  SSqlRes *pRes = &pSql->res;
H
Haojun Liao 已提交
1498
  SSqlCmd* pCmd = &pSql->cmd;
H
hzcheng 已提交
1499

H
Haojun Liao 已提交
1500 1501 1502 1503 1504 1505
  int32_t code = pRes->code;
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return code;
  }

1506
  pRes->code = tscDoLocalMerge(pSql);
H
hzcheng 已提交
1507 1508

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
1509
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1510
    tscCreateResPointerInfo(pRes, pQueryInfo);
1511
    tscSetResRawPtr(pRes, pQueryInfo);
H
hzcheng 已提交
1512 1513 1514
  }

  pRes->row = 0;
1515
  pRes->completed = (pRes->numOfRows == 0);
H
hzcheng 已提交
1516

H
Haojun Liao 已提交
1517
  code = pRes->code;
H
hjxilinx 已提交
1518 1519 1520 1521
  if (pRes->code == TSDB_CODE_SUCCESS) {
    (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
  } else {
    tscQueueAsyncRes(pSql);
H
hzcheng 已提交
1522 1523 1524 1525 1526
  }

  return code;
}

S
slguan 已提交
1527
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
1528

1529
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1530
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1531
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1532
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
S
TD-1762  
Shengliang Guan 已提交
1533
  pCmd->payloadLen = sizeof(SConnectMsg);
H
hzcheng 已提交
1534

S
slguan 已提交
1535 1536
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1537
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1538 1539
  }

S
TD-1762  
Shengliang Guan 已提交
1540
  SConnectMsg *pConnect = (SConnectMsg*)pCmd->payload;
H
hzcheng 已提交
1541

H
Haojun Liao 已提交
1542
  // TODO refactor full_name
H
hzcheng 已提交
1543 1544 1545
  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
B
Bomin Zhang 已提交
1546 1547 1548
  tstrncpy(pConnect->db, db, sizeof(pConnect->db));
  tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion));
  tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion));
H
hzcheng 已提交
1549

H
Haojun Liao 已提交
1550 1551 1552
  pConnect->pid = htonl(taosGetPId());
  taosGetCurrentAPPName(pConnect->appName, NULL);

1553
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1554 1555
}

H
hjxilinx 已提交
1556
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1557 1558 1559
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
1560
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1561

S
TD-1732  
Shengliang Guan 已提交
1562
  STableInfoMsg *pInfoMsg = (STableInfoMsg *)pCmd->payload;
H
hjxilinx 已提交
1563
  strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
H
hjxilinx 已提交
1564
  pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
S
slguan 已提交
1565

S
TD-1732  
Shengliang Guan 已提交
1566
  char *pMsg = (char *)pInfoMsg + sizeof(STableInfoMsg);
H
hzcheng 已提交
1567

1568 1569
  if (pCmd->autoCreated && pCmd->pTagData != NULL) {
    int len = htonl(pCmd->pTagData->dataLen);
B
Bomin Zhang 已提交
1570
    if (len > 0) {
1571 1572
      len += sizeof(pCmd->pTagData->name) + sizeof(pCmd->pTagData->dataLen);
      memcpy(pInfoMsg->tags, pCmd->pTagData, len);
B
Bomin Zhang 已提交
1573 1574
      pMsg += len;
    }
H
hzcheng 已提交
1575 1576
  }

S
TD-1057  
Shengliang Guan 已提交
1577
  pCmd->payloadLen = (int32_t)(pMsg - (char*)pInfoMsg);
S
slguan 已提交
1578
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
H
hzcheng 已提交
1579

1580
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1581 1582
}

S
slguan 已提交
1583
/**
1584
 *  multi table meta req pkg format:
S
TD-1732  
Shengliang Guan 已提交
1585
 *  | SMgmtHead | SMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
1586 1587
 *      no used         4B
 **/
1588
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
guanshengliang's avatar
guanshengliang 已提交
1589
#if 0
S
slguan 已提交
1590 1591 1592 1593 1594
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
H
Haojun Liao 已提交
1595
    if ((tmpData = calloc(1, pCmd->payloadLen + 1)) == NULL) return -1;
S
slguan 已提交
1596 1597 1598 1599 1600
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
H
Haojun Liao 已提交
1601
  memset(pMgmt->db, 0, TSDB_TABLE_FNAME_LEN);  // server don't need the db
S
slguan 已提交
1602

S
TD-1732  
Shengliang Guan 已提交
1603
  SMultiTableInfoMsg *pInfoMsg = (SMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
S
slguan 已提交
1604
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
1605 1606

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
1607
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
1608 1609
  }

S
TD-1848  
Shengliang Guan 已提交
1610
  tfree(tmpData);
S
slguan 已提交
1611

S
TD-1732  
Shengliang Guan 已提交
1612
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SMultiTableInfoMsg);
S
slguan 已提交
1613
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
S
slguan 已提交
1614 1615 1616

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

1617
  tscDebug("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
1618 1619 1620
           pCmd->payloadLen);

  return pCmd->payloadLen;
guanshengliang's avatar
guanshengliang 已提交
1621 1622
#endif
  return 0;  
S
slguan 已提交
1623 1624
}

H
hjxilinx 已提交
1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641
//static UNUSED_FUNC int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
////  const int32_t defaultSize =
////      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
////  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
////
////  int32_t n = 0;
////  size_t size = taosArrayGetSize(pQueryInfo->tagCond.pCond);
////  for (int32_t i = 0; i < size; ++i) {
////    assert(0);
//////    n += strlen(pQueryInfo->tagCond.cond[i].cond);
////  }
////
////  int32_t tagLen = n * TSDB_NCHAR_SIZE;
////  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
////    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
////  }
////
H
Haojun Liao 已提交
1642
////  int32_t joinCondLen = (TSDB_TABLE_FNAME_LEN + sizeof(int16_t)) * 2;
H
hjxilinx 已提交
1643 1644 1645 1646 1647 1648 1649 1650
////  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
////
////  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndex);
////
////  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
////
////  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
//}
H
hzcheng 已提交
1651

H
hjxilinx 已提交
1652 1653
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hjxilinx 已提交
1654 1655 1656
  
  char* pMsg = pCmd->payload;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
S
TD-1732  
Shengliang Guan 已提交
1657 1658

  SSTableVgroupMsg *pStableVgroupMsg = (SSTableVgroupMsg *)pMsg;
H
hjxilinx 已提交
1659
  pStableVgroupMsg->numOfTables = htonl(pQueryInfo->numOfTables);
S
TD-1732  
Shengliang Guan 已提交
1660 1661 1662
  pMsg += sizeof(SSTableVgroupMsg);

  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
1663
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
1664 1665 1666
    size_t size = sizeof(pTableMetaInfo->name);
    tstrncpy(pMsg, pTableMetaInfo->name, size);
    pMsg += size;
H
hjxilinx 已提交
1667
  }
H
hjxilinx 已提交
1668 1669

  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
S
TD-1057  
Shengliang Guan 已提交
1670
  pCmd->payloadLen = (int32_t)(pMsg - pCmd->payload);
H
hjxilinx 已提交
1671

1672
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1673 1674
}

1675 1676
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1677 1678
  STscObj *pObj = pSql->pTscObj;

1679
  pthread_mutex_lock(&pObj->mutex);
H
hzcheng 已提交
1680

S
Shengliang Guan 已提交
1681
  int32_t numOfQueries = 2;
H
hzcheng 已提交
1682 1683 1684
  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
    tpSql = tpSql->next;
1685
    numOfQueries++;
H
hzcheng 已提交
1686 1687
  }

S
Shengliang Guan 已提交
1688
  int32_t numOfStreams = 2;
H
hzcheng 已提交
1689 1690 1691
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
    pStream = pStream->next;
1692
    numOfStreams++;
H
hzcheng 已提交
1693 1694
  }

S
TD-1732  
Shengliang Guan 已提交
1695
  int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SHeartBeatMsg) + 100;
S
slguan 已提交
1696
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
1697
    pthread_mutex_unlock(&pObj->mutex);
H
Haojun Liao 已提交
1698
    tscError("%p failed to create heartbeat msg", pSql);
H
Haojun Liao 已提交
1699
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1700
  }
H
hzcheng 已提交
1701

H
Haojun Liao 已提交
1702
  // TODO the expired hb and client can not be identified by server till now.
S
TD-1732  
Shengliang Guan 已提交
1703
  SHeartBeatMsg *pHeartbeat = (SHeartBeatMsg *)pCmd->payload;
H
Haojun Liao 已提交
1704 1705
  tstrncpy(pHeartbeat->clientVer, version, tListLen(pHeartbeat->clientVer));

1706 1707
  pHeartbeat->numOfQueries = numOfQueries;
  pHeartbeat->numOfStreams = numOfStreams;
H
Haojun Liao 已提交
1708 1709 1710 1711

  pHeartbeat->pid = htonl(taosGetPId());
  taosGetCurrentAPPName(pHeartbeat->appName, NULL);

1712
  int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj);
H
hzcheng 已提交
1713 1714 1715 1716

  pthread_mutex_unlock(&pObj->mutex);

  pCmd->payloadLen = msgLen;
S
slguan 已提交
1717
  pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
H
hzcheng 已提交
1718

1719
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1720 1721
}

1722 1723
int tscProcessTableMetaRsp(SSqlObj *pSql) {
  STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
H
hzcheng 已提交
1724

H
Haojun Liao 已提交
1725
  pMetaMsg->tid = htonl(pMetaMsg->tid);
1726
  pMetaMsg->sversion = htons(pMetaMsg->sversion);
1727
  pMetaMsg->tversion = htons(pMetaMsg->tversion);
H
hjxilinx 已提交
1728 1729
  pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
  
1730 1731
  pMetaMsg->uid = htobe64(pMetaMsg->uid);
  pMetaMsg->contLen = htons(pMetaMsg->contLen);
1732
  pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1733

H
Haojun Liao 已提交
1734
  if ((pMetaMsg->tableType != TSDB_SUPER_TABLE) &&
H
Haojun Liao 已提交
1735
      (pMetaMsg->tid <= 0 || pMetaMsg->vgroup.vgId < 2 || pMetaMsg->vgroup.numOfEps <= 0)) {
H
Haojun Liao 已提交
1736
    tscError("invalid value in table numOfEps:%d, vgId:%d tid:%d, name:%s", pMetaMsg->vgroup.numOfEps, pMetaMsg->vgroup.vgId,
H
Haojun Liao 已提交
1737
             pMetaMsg->tid, pMetaMsg->tableId);
1738
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1739 1740
  }

B
Bomin Zhang 已提交
1741
  if (pMetaMsg->numOfTags > TSDB_MAX_TAGS) {
1742
    tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
1743
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1744 1745
  }

1746 1747
  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
1748
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1749 1750
  }

1751 1752
  for (int i = 0; i < pMetaMsg->vgroup.numOfEps; ++i) {
    pMetaMsg->vgroup.epAddr[i].port = htons(pMetaMsg->vgroup.epAddr[i].port);
H
hzcheng 已提交
1753 1754
  }

1755
  SSchema* pSchema = pMetaMsg->schema;
H
hzcheng 已提交
1756

1757
  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
H
hzcheng 已提交
1758 1759 1760
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);
1761 1762 1763 1764 1765

    if (pSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
      assert(i == 0);
    }

H
hjxilinx 已提交
1766
    assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
1767 1768 1769
    pSchema++;
  }

1770 1771
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
1772
  
H
hzcheng 已提交
1773
  // todo add one more function: taosAddDataIfNotExists();
1774
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1775
  assert(pTableMetaInfo->pTableMeta == NULL);
H
hzcheng 已提交
1776

H
Haojun Liao 已提交
1777
  pTableMetaInfo->pTableMeta = (STableMeta *) taosCachePut(tscMetaCache, pTableMetaInfo->name,
1778
      strlen(pTableMetaInfo->name), pTableMeta, size, tsTableMetaKeepTimer * 1000);
H
Haojun Liao 已提交
1779

1780
  if (pTableMetaInfo->pTableMeta == NULL) {
B
Bomin Zhang 已提交
1781
    free(pTableMeta);
1782
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1783
  }
H
hzcheng 已提交
1784

1785
  tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->id.uid, pTableMeta->id.tid, pTableMetaInfo->name);
1786
  free(pTableMeta);
1787
  
H
hjxilinx 已提交
1788
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1789 1790
}

S
slguan 已提交
1791
/**
1792
 *  multi table meta rsp pkg format:
S
TD-1732  
Shengliang Guan 已提交
1793
 *  | STaosRsp | ieType | SMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
1794 1795 1796
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
1797
#if 0
S
slguan 已提交
1798 1799 1800 1801 1802
  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
1803
    pSql->res.code = TSDB_CODE_TSC_INVALID_IE;
S
slguan 已提交
1804
    pSql->res.numOfTotal = 0;
1805
    return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1806 1807 1808 1809
  }

  rsp++;

S
TD-1732  
Shengliang Guan 已提交
1810
  SMultiTableInfoMsg *pInfo = (SMultiTableInfoMsg *)rsp;
S
slguan 已提交
1811
  totalNum = htonl(pInfo->numOfTables);
S
TD-1732  
Shengliang Guan 已提交
1812
  rsp += sizeof(SMultiTableInfoMsg);
S
slguan 已提交
1813 1814

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
1815
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
1816
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
1817 1818 1819

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
H
hjxilinx 已提交
1820
    pMeta->vgId = htonl(pMeta->vgId);
S
slguan 已提交
1821 1822
    pMeta->uid = htobe64(pMeta->uid);

H
hjxilinx 已提交
1823 1824
    if (pMeta->sid <= 0 || pMeta->vgId < 0) {
      tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
1825
      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
S
slguan 已提交
1826
      pSql->res.numOfTotal = i;
1827
      return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1828 1829
    }

H
hjxilinx 已提交
1830 1831 1832 1833
    //    pMeta->numOfColumns = htons(pMeta->numOfColumns);
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid tag value count:%d", pMeta->numOfTags);
1834
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1835
    //      pSql->res.numOfTotal = i;
1836
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1837 1838 1839 1840
    //    }
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid numOfTags:%d", pMeta->numOfTags);
1841
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1842
    //      pSql->res.numOfTotal = i;
1843
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1844 1845 1846 1847
    //    }
    //
    //    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
    //      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
1848
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1849
    //      pSql->res.numOfTotal = i;
1850
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1851 1852
    //    }
    //
H
hjxilinx 已提交
1853
    //    for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) {
H
hjxilinx 已提交
1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887
    //      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    //    }
    //
    //    pMeta->rowSize = 0;
    //    rsp += sizeof(SMultiTableMeta);
    //    pSchema = (SSchema *)rsp;
    //
    //    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    //    for (int j = 0; j < numOfTotalCols; ++j) {
    //      pSchema->bytes = htons(pSchema->bytes);
    //      pSchema->colId = htons(pSchema->colId);
    //
    //      // ignore the tags length
    //      if (j < pMeta->numOfColumns) {
    //        pMeta->rowSize += pSchema->bytes;
    //      }
    //      pSchema++;
    //    }
    //
    //    rsp += numOfTotalCols * sizeof(SSchema);
    //
    //    int32_t  tagLen = 0;
    //    SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
    //
    //    if (pMeta->tableType == TSDB_CHILD_TABLE) {
    //      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
    //        tagLen += pTagsSchema[j].bytes;
    //      }
    //    }
    //
    //    rsp += tagLen;
    //    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
    //
    //    pMeta->index = 0;
H
Haojun Liao 已提交
1888
    //    (void)taosCachePut(tscMetaCache, pMeta->tableId, (char *)pMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1889
    //  }
S
slguan 已提交
1890
  }
H
hjxilinx 已提交
1891
  
S
slguan 已提交
1892 1893
  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
1894
  tscDebug("%p load multi-metermeta resp from complete num:%d", pSql, pSql->res.numOfTotal);
1895 1896
#endif
  
S
slguan 已提交
1897 1898 1899
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
1900
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
1901
  SSqlRes* pRes = &pSql->res;
H
hjxilinx 已提交
1902
  
H
hjxilinx 已提交
1903
  // NOTE: the order of several table must be preserved.
S
TD-1732  
Shengliang Guan 已提交
1904
  SSTableVgroupRspMsg *pStableVgroup = (SSTableVgroupRspMsg *)pRes->pRsp;
H
hjxilinx 已提交
1905
  pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables);
S
TD-1732  
Shengliang Guan 已提交
1906 1907
  char *pMsg = pRes->pRsp + sizeof(SSTableVgroupRspMsg);

1908 1909 1910
  // master sqlObj locates in param
  SSqlObj* parent = pSql->param;
  assert(parent != NULL);
H
hjxilinx 已提交
1911
  
1912
  SSqlCmd* pCmd = &parent->cmd;
H
hjxilinx 已提交
1913 1914 1915
  for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) {
    STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);

H
Haojun Liao 已提交
1916 1917 1918
    SVgroupsMsg *  pVgroupMsg = (SVgroupsMsg *) pMsg;
    pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups);

S
TD-1732  
Shengliang Guan 已提交
1919
    size_t size = sizeof(SVgroupMsg) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsMsg);
H
Haojun Liao 已提交
1920

S
TD-1732  
Shengliang Guan 已提交
1921
    size_t vgroupsz = sizeof(SVgroupInfo) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsInfo);
H
Haojun Liao 已提交
1922
    pInfo->vgroupList = calloc(1, vgroupsz);
H
hjxilinx 已提交
1923 1924
    assert(pInfo->vgroupList != NULL);

H
Haojun Liao 已提交
1925
    pInfo->vgroupList->numOfVgroups = pVgroupMsg->numOfVgroups;
H
hjxilinx 已提交
1926
    for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
dengyihao's avatar
dengyihao 已提交
1927
      //just init, no need to lock
S
TD-1732  
Shengliang Guan 已提交
1928
      SVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
H
Haojun Liao 已提交
1929

S
TD-1732  
Shengliang Guan 已提交
1930
      SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j];
H
Haojun Liao 已提交
1931 1932
      pVgroups->vgId = htonl(vmsg->vgId);
      pVgroups->numOfEps = vmsg->numOfEps;
H
Haojun Liao 已提交
1933

H
Haojun Liao 已提交
1934
      assert(pVgroups->numOfEps >= 1 && pVgroups->vgId >= 1);
H
hjxilinx 已提交
1935

1936
      for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
H
Haojun Liao 已提交
1937 1938
        pVgroups->epAddr[k].port = htons(vmsg->epAddr[k].port);
        pVgroups->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, tListLen(vmsg->epAddr[k].fqdn));
H
hjxilinx 已提交
1939
      }
1940
    }
1941 1942

    pMsg += size;
H
hjxilinx 已提交
1943 1944
  }
  
S
slguan 已提交
1945
  return pSql->res.code;
H
hzcheng 已提交
1946 1947 1948 1949 1950 1951
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
S
TD-1732  
Shengliang Guan 已提交
1952 1953 1954 1955
  STableMetaMsg *pMetaMsg;
  SShowRsp *     pShow;
  SSchema *      pSchema;
  char           key[20];
H
hzcheng 已提交
1956

1957 1958 1959
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
1960
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
1961

H
hjxilinx 已提交
1962
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1963

S
TD-1732  
Shengliang Guan 已提交
1964
  pShow = (SShowRsp *)pRes->pRsp;
S
slguan 已提交
1965
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
1966 1967
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
1968
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
1969
  pMetaMsg = &(pShow->tableMeta);
H
hzcheng 已提交
1970

H
hjxilinx 已提交
1971
  pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1972

H
hjxilinx 已提交
1973
  pSchema = pMetaMsg->schema;
H
Haojun Liao 已提交
1974
  pMetaMsg->tid = ntohs(pMetaMsg->tid);
H
hjxilinx 已提交
1975
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
hzcheng 已提交
1976 1977 1978 1979
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

1980
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
1981 1982
  strcpy(key + 1, "showlist");

H
Haojun Liao 已提交
1983 1984 1985 1986
  if (pTableMetaInfo->pTableMeta != NULL) {
    taosCacheRelease(tscMetaCache, (void *)&(pTableMetaInfo->pTableMeta), false);
  }

H
hjxilinx 已提交
1987 1988 1989
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
  
H
Haojun Liao 已提交
1990
  pTableMetaInfo->pTableMeta = taosCachePut(tscMetaCache, key, strlen(key), (char *)pTableMeta, size,
1991
      tsTableMetaKeepTimer * 1000);
H
hjxilinx 已提交
1992
  SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
1993

1994 1995 1996 1997
  if (pQueryInfo->colList == NULL) {
    pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
  }
  
H
hjxilinx 已提交
1998 1999
  SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
  
S
slguan 已提交
2000
  SColumnIndex index = {0};
H
hjxilinx 已提交
2001 2002 2003
  pSchema = pMetaMsg->schema;
  
  for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) {
S
slguan 已提交
2004
    index.columnIndex = i;
2005 2006
    tscColumnListInsert(pQueryInfo->colList, &index);
    
H
hjxilinx 已提交
2007
    TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes);
H
Haojun Liao 已提交
2008
    SInternalField* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
H
hjxilinx 已提交
2009
    
H
hjxilinx 已提交
2010
    pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
H
Haojun Liao 已提交
2011
                     pTableSchema[i].type, pTableSchema[i].bytes, getNewResColId(pQueryInfo), pTableSchema[i].bytes, false);
H
hzcheng 已提交
2012
  }
H
hjxilinx 已提交
2013 2014
  
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2015
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
2016
  
S
TD-1848  
Shengliang Guan 已提交
2017
  tfree(pTableMeta);
H
hzcheng 已提交
2018 2019 2020
  return 0;
}

2021
// TODO multithread problem
2022 2023 2024 2025 2026 2027 2028 2029 2030 2031
static void createHBObj(STscObj* pObj) {
  if (pObj->pHb != NULL) {
    return;
  }

  SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
  if (NULL == pSql) return;

  pSql->fp = tscProcessHeartBeatRsp;

2032 2033
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetailSafely(&pSql->cmd, 0);
  if (pQueryInfo == NULL) {
H
Haojun Liao 已提交
2034 2035
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    tfree(pSql);
2036 2037 2038
    return;
  }

2039 2040 2041 2042
  pQueryInfo->command = TSDB_SQL_HB;

  pSql->cmd.command = pQueryInfo->command;
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
H
Haojun Liao 已提交
2043
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
S
TD-1848  
Shengliang Guan 已提交
2044
    tfree(pSql);
2045 2046 2047 2048 2049 2050 2051
    return;
  }

  pSql->param = pObj;
  pSql->pTscObj = pObj;
  pSql->signature = pSql;

2052 2053
  registerSqlObj(pSql);
  tscDebug("%p HB is allocated, pObj:%p", pSql, pObj);
2054

2055
  pObj->pHb = pSql;
2056 2057
}

H
hzcheng 已提交
2058 2059 2060 2061
int tscProcessConnectRsp(SSqlObj *pSql) {
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

H
Haojun Liao 已提交
2062 2063
  char temp[TSDB_TABLE_FNAME_LEN * 2] = {0};

S
TD-1762  
Shengliang Guan 已提交
2064
  SConnectRsp *pConnect = (SConnectRsp *)pRes->pRsp;
B
Bomin Zhang 已提交
2065
  tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId));  // copy acctId from response
2066 2067
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

B
Bomin Zhang 已提交
2068 2069
  assert(len <= sizeof(pObj->db));
  tstrncpy(pObj->db, temp, sizeof(pObj->db));
H
hjxilinx 已提交
2070
  
dengyihao's avatar
dengyihao 已提交
2071 2072 2073
  if (pConnect->epSet.numOfEps > 0) {
    tscEpSetHtons(&pConnect->epSet);
    tscUpdateMgmtEpSet(&pConnect->epSet);
dengyihao's avatar
dengyihao 已提交
2074
  } 
H
hzcheng 已提交
2075

S
slguan 已提交
2076
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
2077 2078
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
S
Shengliang Guan 已提交
2079
  pObj->connId = htonl(pConnect->connId);
2080 2081

  createHBObj(pObj);
H
Haojun Liao 已提交
2082 2083

  //launch a timer to send heartbeat to maintain the connection and send status to mnode
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
2084
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, (void *)pObj->rid, tscTmr, &pObj->pTimer);
H
hzcheng 已提交
2085 2086 2087 2088 2089

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
2090
  STscObj *       pObj = pSql->pTscObj;
2091
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2092

B
Bomin Zhang 已提交
2093
  tstrncpy(pObj->db, pTableMetaInfo->name, sizeof(pObj->db));
H
hzcheng 已提交
2094 2095 2096
  return 0;
}

Y
TD-1039  
yihaoDeng 已提交
2097 2098
int tscProcessDropDbRsp(SSqlObj *pSql) {
  pSql->pTscObj->db[0] = 0;
H
Haojun Liao 已提交
2099
  taosCacheEmpty(tscMetaCache);
H
hzcheng 已提交
2100 2101 2102 2103
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
2104
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2105

H
Haojun Liao 已提交
2106
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
Haojun Liao 已提交
2107
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2108 2109 2110 2111 2112 2113 2114
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
2115 2116
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
2117
   */
2118
  tscDebug("%p force release table meta after drop table:%s", pSql, pTableMetaInfo->name);
H
Haojun Liao 已提交
2119
  taosCacheRelease(tscMetaCache, (void **)&pTableMeta, true);
H
hzcheng 已提交
2120

H
hjxilinx 已提交
2121
  if (pTableMetaInfo->pTableMeta) {
H
Haojun Liao 已提交
2122
    taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2123 2124 2125 2126 2127 2128
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
2129
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2130

H
Haojun Liao 已提交
2131
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2132
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2133 2134 2135
    return 0;
  }

2136
  tscDebug("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
H
Haojun Liao 已提交
2137
  taosCacheRelease(tscMetaCache, (void **)&pTableMeta, true);
H
hzcheng 已提交
2138

H
hjxilinx 已提交
2139
  if (pTableMetaInfo->pTableMeta) {
weixin_48148422's avatar
weixin_48148422 已提交
2140
    bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
H
Haojun Liao 已提交
2141
    taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2142

2143
    if (isSuperTable) {  // if it is a super table, reset whole query cache
2144
      tscDebug("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
H
Haojun Liao 已提交
2145
      taosCacheEmpty(tscMetaCache);
H
hzcheng 已提交
2146 2147 2148 2149 2150 2151 2152 2153 2154 2155
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}
2156 2157 2158
int tscProcessShowCreateRsp(SSqlObj *pSql) {
  return tscLocalResultCommonBuilder(pSql, 1);
}
H
hzcheng 已提交
2159 2160 2161 2162

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

2163
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
2164 2165 2166
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
2167
  pRes->data = NULL;
S
slguan 已提交
2168
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2169 2170 2171
  return 0;
}

H
hjxilinx 已提交
2172
int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
S
slguan 已提交
2173 2174 2175
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
2176
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
2177 2178 2179 2180
  if (pRetrieve == NULL) {
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pRes->code;
  }
H
hzcheng 已提交
2181 2182 2183

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
2184 2185
  pRes->offset    = htobe64(pRetrieve->offset);
  pRes->useconds  = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
2186
  pRes->completed = (pRetrieve->completed == 1);
2187
  pRes->data      = pRetrieve->data;
H
hjxilinx 已提交
2188
  
2189
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2190 2191 2192
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
    return pRes->code;
  }
2193

H
Haojun Liao 已提交
2194 2195 2196 2197 2198 2199
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  if (pCmd->command == TSDB_SQL_RETRIEVE) {
    tscSetResRawPtr(pRes, pQueryInfo);
  } else if ((UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY)) {
    tscSetResRawPtr(pRes, pQueryInfo);
  } else if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
2200 2201 2202
    tscSetResRawPtr(pRes, pQueryInfo);
  }

weixin_48148422's avatar
weixin_48148422 已提交
2203
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
2204
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2205
    
H
hjxilinx 已提交
2206
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
H
hjxilinx 已提交
2207 2208
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
2209 2210
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
2211
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
2212
    p += sizeof(int32_t);
S
slguan 已提交
2213
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
2214 2215
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
weixin_48148422's avatar
weixin_48148422 已提交
2216
      p += sizeof(int32_t); // skip tid
weixin_48148422's avatar
weixin_48148422 已提交
2217 2218
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
2219
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
2220
    }
2221 2222
  }

H
hzcheng 已提交
2223
  pRes->row = 0;
H
Haojun Liao 已提交
2224
  tscDebug("%p numOfRows:%d, offset:%" PRId64 ", complete:%d", pSql, pRes->numOfRows, pRes->offset, pRes->completed);
H
hzcheng 已提交
2225 2226 2227 2228

  return 0;
}

H
hjxilinx 已提交
2229
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
H
hzcheng 已提交
2230

2231
static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
S
slguan 已提交
2232 2233
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
2234
    tscError("%p malloc failed for new sqlobj to get table meta", pSql);
2235
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2236
  }
2237

H
hzcheng 已提交
2238 2239 2240
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
2241

2242
  tscAddSubqueryInfo(&pNew->cmd);
2243

2244
  SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0);
2245

H
hjxilinx 已提交
2246
  pNew->cmd.autoCreated = pSql->cmd.autoCreated;  // create table if not exists
B
Bomin Zhang 已提交
2247
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) {
2248
    tscError("%p malloc failed for payload to get table meta", pSql);
2249
    tscFreeSqlObj(pNew);
2250
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2251 2252
  }

H
hjxilinx 已提交
2253
  STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
2254
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
2255

B
Bomin Zhang 已提交
2256
  tstrncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, sizeof(pNewMeterMetaInfo->name));
2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268

  if (pSql->cmd.pTagData != NULL) {
    int size = offsetof(STagData, data) + htonl(pSql->cmd.pTagData->dataLen);
    pNew->cmd.pTagData = calloc(1, size);
    if (pNew->cmd.pTagData == NULL) {
      tscError("%p malloc failed for new tag data to get table meta", pSql);
      tscFreeSqlObj(pNew);
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    memcpy(pNew->cmd.pTagData, pSql->cmd.pTagData, size);
  }

2269
  tscDebug("%p new pSqlObj:%p to get tableMeta, auto create:%d", pSql, pNew, pNew->cmd.autoCreated);
H
hzcheng 已提交
2270

H
hjxilinx 已提交
2271 2272
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
H
hzcheng 已提交
2273

2274 2275
  registerSqlObj(pNew);

H
hjxilinx 已提交
2276 2277
  int32_t code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2278
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;  // notify upper application that current process need to be terminated
H
hzcheng 已提交
2279 2280 2281 2282 2283
  }

  return code;
}

H
hjxilinx 已提交
2284
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
H
hjxilinx 已提交
2285
  assert(strlen(pTableMetaInfo->name) != 0);
S
slguan 已提交
2286

2287
  // If this STableMetaInfo owns a table meta, release it first
H
hjxilinx 已提交
2288
  if (pTableMetaInfo->pTableMeta != NULL) {
H
Haojun Liao 已提交
2289
    taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), false);
2290 2291
  }
  
H
Haojun Liao 已提交
2292
  pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2293
  if (pTableMetaInfo->pTableMeta != NULL) {
H
hjxilinx 已提交
2294
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
2295
    tscDebug("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
2296
             tinfo.numOfTags, pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2297 2298 2299

    return TSDB_CODE_SUCCESS;
  }
2300 2301
  
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2302 2303
}

H
hjxilinx 已提交
2304
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
H
hjxilinx 已提交
2305
  pSql->cmd.autoCreated = createIfNotExists;
H
hjxilinx 已提交
2306
  return tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
2307 2308 2309
}

/**
H
Haojun Liao 已提交
2310
 * retrieve table meta from mnode, and update the local table meta cache.
H
hzcheng 已提交
2311
 * @param pSql          sql object
B
Bomin Zhang 已提交
2312
 * @param tableIndex    table index
H
hzcheng 已提交
2313 2314
 * @return              status code
 */
B
Bomin Zhang 已提交
2315
int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
H
hzcheng 已提交
2316
  SSqlCmd *pCmd = &pSql->cmd;
2317 2318

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
B
Bomin Zhang 已提交
2319
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
H
hzcheng 已提交
2320

H
Haojun Liao 已提交
2321 2322
  STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
  if (pTableMetaInfo->pTableMeta) {
2323
    tscDebug("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
2324
             tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid, pTableMeta);
H
hzcheng 已提交
2325 2326
  }

H
Haojun Liao 已提交
2327
  taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
H
Haojun Liao 已提交
2328
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2329 2330
}

H
hjxilinx 已提交
2331
static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
2332
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2333
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2334
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2335 2336
    if (pTableMetaInfo->vgroupList == NULL) {
      return false;
S
slguan 已提交
2337 2338
    }
  }
H
hjxilinx 已提交
2339 2340 2341 2342
  
  // all super tables vgroupinfo are retrieved, no need to retrieve vgroup info anymore
  return true;
}
H
hzcheng 已提交
2343

H
hjxilinx 已提交
2344
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
2345
  int      code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hjxilinx 已提交
2346 2347 2348
  SSqlCmd *pCmd = &pSql->cmd;
  
  if (allVgroupInfoRetrieved(pCmd, clauseIndex)) {
H
hzcheng 已提交
2349 2350
    return TSDB_CODE_SUCCESS;
  }
H
hjxilinx 已提交
2351

S
slguan 已提交
2352
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
2353 2354 2355
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

H
hjxilinx 已提交
2356
  pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
H
Haojun Liao 已提交
2357 2358

  // TODO TEST IT
2359 2360
  SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0);
  if (pNewQueryInfo == NULL) {
B
Bomin Zhang 已提交
2361
    tscFreeSqlObj(pNew);
2362 2363
    return code;
  }
2364
  
H
hjxilinx 已提交
2365
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2366
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2367
    STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
H
Haojun Liao 已提交
2368
    STableMeta *pTableMeta = taosCacheAcquireByData(tscMetaCache, pMInfo->pTableMeta);
H
Haojun Liao 已提交
2369
    tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList, pMInfo->pVgroupTables);
S
slguan 已提交
2370 2371 2372 2373 2374 2375
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
2376

2377
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
2378
  registerSqlObj(pNew);
H
Haojun Liao 已提交
2379

2380
  tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
H
hzcheng 已提交
2381

2382 2383 2384 2385
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
  code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
2386
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2387 2388 2389 2390 2391
  }

  return code;
}

2392
void tscInitMsgsFp() {
S
slguan 已提交
2393 2394
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
J
jtao1735 已提交
2395
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;
H
hzcheng 已提交
2396 2397

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
2398
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
2399

2400 2401
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
2402 2403

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
S
[TD-16]  
slguan 已提交
2404
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserMsg;
H
hzcheng 已提交
2405 2406 2407
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
2408
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
2409 2410 2411
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
2412
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
2413
  tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
H
hzcheng 已提交
2414 2415 2416 2417
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
H
hjxilinx 已提交
2418
  tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
H
hjxilinx 已提交
2419
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
S
slguan 已提交
2420
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
2421 2422 2423 2424

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
2425 2426 2427
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
2428 2429

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
H
hjxilinx 已提交
2430
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
H
hzcheng 已提交
2431 2432 2433 2434 2435

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
2436
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
H
hjxilinx 已提交
2437
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
S
slguan 已提交
2438
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
2439 2440

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
H
hjxilinx 已提交
2441
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode;  // rsp handled by same function.
H
hzcheng 已提交
2442
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
2443

H
Haojun Liao 已提交
2444 2445 2446 2447 2448
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB]   = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION]  = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS]  = tscProcessLocalRetrieveRsp;
2449

H
hzcheng 已提交
2450 2451
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

H
hjxilinx 已提交
2452
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp;
H
hzcheng 已提交
2453 2454 2455 2456

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

2457 2458 2459 2460
  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp;
  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp;
  

H
hzcheng 已提交
2461 2462 2463 2464 2465 2466
  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;
}