tscServer.c 86.5 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17
#include "tcache.h"
H
Haojun Liao 已提交
18
#include "tcmdtype.h"
H
hzcheng 已提交
19
#include "trpc.h"
20 21
#include "tscLocalMerge.h"
#include "tscLog.h"
H
hzcheng 已提交
22 23 24 25 26
#include "tscProfile.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "ttimer.h"
dengyihao's avatar
dengyihao 已提交
27
#include "tlockfree.h"
H
hzcheng 已提交
28

dengyihao's avatar
TD-2257  
dengyihao 已提交
29
///SRpcCorEpSet  tscMgmtEpSet;
S
slguan 已提交
30

31 32
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};

H
hzcheng 已提交
33 34 35
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
36

B
Bomin Zhang 已提交
37
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt);
38 39
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
H
hzcheng 已提交
40

S
slguan 已提交
41
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
H
Haojun Liao 已提交
42 43 44 45 46 47 48 49
static int32_t getWaitingTimeInterval(int32_t count) {
  int32_t initial = 100; // 100 ms by default
  if (count <= 1) {
    return 0;
  }

  return initial * (2<<(count - 2));
}
H
hzcheng 已提交
50

S
TD-1732  
Shengliang Guan 已提交
51
static void tscSetDnodeEpSet(SSqlObj* pSql, SVgroupInfo* pVgroupInfo) {
52 53
  assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);

54
  SRpcEpSet* pEpSet = &pSql->epSet;
H
Haojun Liao 已提交
55 56 57 58

  // Issue the query to one of the vnode among a vgroup randomly.
  // change the inUse property would not affect the isUse attribute of STableMeta
  pEpSet->inUse = rand() % pVgroupInfo->numOfEps;
59 60 61

  // apply the FQDN string length check here
  bool hasFqdn = false;
dengyihao's avatar
bugfix  
dengyihao 已提交
62

63 64
  pEpSet->numOfEps = pVgroupInfo->numOfEps;
  for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
H
Haojun Liao 已提交
65
    tstrncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(pEpSet->fqdn[i]));
66
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
67 68 69 70

    if (!hasFqdn) {
      hasFqdn = (strlen(pEpSet->fqdn[i]) > 0);
    }
71
  }
72 73

  assert(hasFqdn);
74
}
75

dengyihao's avatar
TD-2257  
dengyihao 已提交
76 77 78 79 80
static void tscDumpMgmtEpSet(SSqlObj *pSql) {
  SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet;
  taosCorBeginRead(&pCorEpSet->version);
  pSql->epSet = pCorEpSet->epSet;
  taosCorEndRead(&pCorEpSet->version);
81
}  
dengyihao's avatar
dengyihao 已提交
82 83
static void tscEpSetHtons(SRpcEpSet *s) {
   for (int32_t i = 0; i < s->numOfEps; i++) {
dengyihao's avatar
fixbug  
dengyihao 已提交
84 85 86
      s->port[i] = htons(s->port[i]);    
   }
} 
dengyihao's avatar
dengyihao 已提交
87 88
bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
   if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
89 90
     return false;
   } 
dengyihao's avatar
dengyihao 已提交
91
   for (int32_t i = 0; i < s1->numOfEps; i++) {
92 93 94 95 96
     if (s1->port[i] != s2->port[i] 
        || strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0)
        return false;
   }
   return true;
S
slguan 已提交
97
}
dengyihao's avatar
TD-2257  
dengyihao 已提交
98
void tscUpdateMgmtEpSet(SSqlObj *pSql, SRpcEpSet *pEpSet) {
99
  // no need to update if equal
dengyihao's avatar
TD-2257  
dengyihao 已提交
100 101 102 103
  SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet;
  taosCorBeginWrite(&pCorEpSet->version);
  pCorEpSet->epSet = *pEpSet;
  taosCorEndWrite(&pCorEpSet->version);
104
}
S
Shengliang Guan 已提交
105
static void tscDumpEpSetFromVgroupInfo(SCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
106
  if (pVgroupInfo == NULL) { return;}
dengyihao's avatar
bugfix  
dengyihao 已提交
107
  taosCorBeginRead(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
108
  int8_t inUse = pVgroupInfo->inUse;
dengyihao's avatar
dengyihao 已提交
109 110 111
  pEpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0; 
  pEpSet->numOfEps = pVgroupInfo->numOfEps;  
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
Y
yihaoDeng 已提交
112
    tstrncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, sizeof(pEpSet->fqdn[i]));
dengyihao's avatar
dengyihao 已提交
113
    pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
S
slguan 已提交
114
  }
dengyihao's avatar
bugfix  
dengyihao 已提交
115
  taosCorEndRead(&pVgroupInfo->version);
S
slguan 已提交
116 117
}

dengyihao's avatar
dengyihao 已提交
118
static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
119 120
  SSqlCmd *pCmd = &pObj->cmd;
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
dengyihao's avatar
dengyihao 已提交
121
  if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;}
S
Shengliang Guan 已提交
122
  SCorVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->corVgroupInfo;
dengyihao's avatar
dengyihao 已提交
123

dengyihao's avatar
bugfix  
dengyihao 已提交
124
  taosCorBeginWrite(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
125
  tscDebug("before: Endpoint in use: %d", pVgroupInfo->inUse);
126 127
  pVgroupInfo->inUse = pEpSet->inUse;
  pVgroupInfo->numOfEps = pEpSet->numOfEps;
dengyihao's avatar
dengyihao 已提交
128
  for (int32_t i = 0; i < pVgroupInfo->numOfEps; i++) {
S
TD-1848  
Shengliang Guan 已提交
129
    tfree(pVgroupInfo->epAddr[i].fqdn);
H
Haojun Liao 已提交
130
    pVgroupInfo->epAddr[i].fqdn = strndup(pEpSet->fqdn[i], tListLen(pEpSet->fqdn[i]));
131
    pVgroupInfo->epAddr[i].port = pEpSet->port[i];
dengyihao's avatar
dengyihao 已提交
132
  }
H
Haojun Liao 已提交
133

dengyihao's avatar
dengyihao 已提交
134
  tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse);
dengyihao's avatar
bugfix  
dengyihao 已提交
135
  taosCorEndWrite(&pVgroupInfo->version);
dengyihao's avatar
dengyihao 已提交
136
}
H
Haojun Liao 已提交
137

H
hzcheng 已提交
138 139 140
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
  STscObj *pObj = (STscObj *)param;
  if (pObj == NULL) return;
H
Haojun Liao 已提交
141

H
hzcheng 已提交
142
  if (pObj != pObj->signature) {
H
Haojun Liao 已提交
143
    tscError("heartbeat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
H
hzcheng 已提交
144 145 146
    return;
  }

H
Haojun Liao 已提交
147
  SSqlObj *pSql = tres;
H
hzcheng 已提交
148 149 150
  SSqlRes *pRes = &pSql->res;

  if (code == 0) {
S
TD-1732  
Shengliang Guan 已提交
151 152
    SHeartBeatRsp *pRsp = (SHeartBeatRsp *)pRes->pRsp;
    SRpcEpSet *    epSet = &pRsp->epSet;
dengyihao's avatar
dengyihao 已提交
153 154
    if (epSet->numOfEps > 0) {
      tscEpSetHtons(epSet);
155 156 157 158 159 160 161
      if (!tscEpSetIsEqual(&pSql->pTscObj->tscCorMgmtEpSet->epSet, epSet)) {
        tscTrace("%p updating epset: numOfEps: %d, inUse: %d", pSql, epSet->numOfEps, epSet->inUse);
        for (int8_t i = 0; i < epSet->numOfEps; i++) {
          tscTrace("endpoint %d: fqdn = %s, port=%d", i, epSet->fqdn[i], epSet->port[i]);
        }
        tscUpdateMgmtEpSet(pSql, epSet);
      }
S
TD-1732  
Shengliang Guan 已提交
162
    }
S
slguan 已提交
163

S
Shengliang Guan 已提交
164 165
    pSql->pTscObj->connId = htonl(pRsp->connId);

H
hzcheng 已提交
166 167
    if (pRsp->killConnection) {
      tscKillConnection(pObj);
168
      return;
H
hzcheng 已提交
169
    } else {
S
slguan 已提交
170 171
      if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId));
      if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
H
hzcheng 已提交
172 173
    }
  } else {
174
    tscDebug("%" PRId64 " heartbeat failed, code:%s", pObj->hbrid, tstrerror(code));
H
hzcheng 已提交
175 176
  }

177
  if (pObj->hbrid != 0) {
178
    int32_t waitingDuring = tsShellActivityTimer * 500;
H
Haojun Liao 已提交
179
    tscDebug("%p send heartbeat in %dms", pSql, waitingDuring);
H
Haojun Liao 已提交
180

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
181
    taosTmrReset(tscProcessActivityTimer, waitingDuring, (void *)pObj->rid, tscTmr, &pObj->pTimer);
H
Haojun Liao 已提交
182 183 184
  } else {
    tscDebug("%p start to close tscObj:%p, not send heartbeat again", pSql, pObj);
  }
H
hzcheng 已提交
185 186 187
}

void tscProcessActivityTimer(void *handle, void *tmrId) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
188 189
  int64_t rid = (int64_t) handle;
  STscObj *pObj = taosAcquireRef(tscRefId, rid);
190 191 192
  if (pObj == NULL) {
    return;
  }
H
hzcheng 已提交
193

194
  SSqlObj* pHB = taosAcquireRef(tscObjRef, pObj->hbrid);
195 196 197 198 199
  if (pHB == NULL) {
    taosReleaseRef(tscRefId, rid);
    return;
  }

200
  assert(pHB->self == pObj->hbrid);
H
Haojun Liao 已提交
201

202
  pHB->retry = 0;
H
Haojun Liao 已提交
203
  int32_t code = tscProcessSql(pHB);
204
  taosReleaseRef(tscObjRef, pObj->hbrid);
H
Haojun Liao 已提交
205 206 207

  if (code != TSDB_CODE_SUCCESS) {
    tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
H
hzcheng 已提交
208
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
209

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
210
  taosReleaseRef(tscRefId, rid);
H
hzcheng 已提交
211 212 213
}

int tscSendMsgToServer(SSqlObj *pSql) {
214
  STscObj* pObj = pSql->pTscObj;
H
hjxilinx 已提交
215 216
  SSqlCmd* pCmd = &pSql->cmd;
  
217
  char *pMsg = rpcMallocCont(sizeof(SMsgVersion) + pCmd->payloadLen);
S
slguan 已提交
218
  if (NULL == pMsg) {
219
    tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]);
220
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
221 222
  }

223 224
  // set the mgmt ip list
  if (pSql->cmd.command >= TSDB_SQL_MGMT) {
dengyihao's avatar
TD-2257  
dengyihao 已提交
225
    tscDumpMgmtEpSet(pSql);
J
jtao1735 已提交
226 227
  }

228 229
  tstrncpy(pMsg, version, sizeof(SMsgVersion));
  memcpy(pMsg + sizeof(SMsgVersion), pSql->cmd.payload, pSql->cmd.payloadLen);
230

J
jtao1735 已提交
231
  SRpcMsg rpcMsg = {
232 233
      .msgType = pSql->cmd.msgType,
      .pCont   = pMsg,
234
      .contLen = pSql->cmd.payloadLen + sizeof(SMsgVersion),
235
      .ahandle = (void*)pSql->self,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
236
      .handle  = NULL,
H
hjxilinx 已提交
237
      .code    = 0
J
jtao1735 已提交
238
  };
H
Haojun Liao 已提交
239

240
  rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid);
H
Haojun Liao 已提交
241
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
242 243
}

244
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
S
TD-1530  
Shengliang Guan 已提交
245
  TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
246 247
  SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
  if (pSql == NULL) {
H
Haojun Liao 已提交
248
    rpcFreeCont(rpcMsg->pCont);
249 250
    return;
  }
251
  assert(pSql->self == handle);
H
Haojun Liao 已提交
252

H
Haojun Liao 已提交
253
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
254 255
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
256

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
257
  pSql->rpcRid = -1;
H
Haojun Liao 已提交
258

H
Haojun Liao 已提交
259
  if (pObj->signature != pObj) {
260
    tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
H
Haojun Liao 已提交
261

262 263
    taosRemoveRef(tscObjRef, pSql->self);
    taosReleaseRef(tscObjRef, pSql->self);
H
Haojun Liao 已提交
264 265 266 267 268 269
    rpcFreeCont(rpcMsg->pCont);
    return;
  }

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
270
    tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
271
        pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
H
Haojun Liao 已提交
272

273 274
    taosRemoveRef(tscObjRef, pSql->self);
    taosReleaseRef(tscObjRef, pSql->self);
275
    rpcFreeCont(rpcMsg->pCont);
S
slguan 已提交
276
    return;
H
hzcheng 已提交
277 278
  }

H
Haojun Liao 已提交
279
  if (pEpSet) {
dengyihao's avatar
dengyihao 已提交
280
    if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
H
Haojun Liao 已提交
281 282
      if (pCmd->command < TSDB_SQL_MGMT) {
        tscUpdateVgroupInfo(pSql, pEpSet);
dengyihao's avatar
dengyihao 已提交
283
      } else {
dengyihao's avatar
TD-2257  
dengyihao 已提交
284
        tscUpdateMgmtEpSet(pSql, pEpSet);
H
Haojun Liao 已提交
285
      }
dengyihao's avatar
dengyihao 已提交
286
    }
J
jtao1735 已提交
287 288
  }

289
  int32_t cmd = pCmd->command;
290

291 292 293 294 295 296
  // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
  if (cmd == TSDB_SQL_INSERT && rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
    pSql->cmd.submitSchema = 1;
  }

  if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
297 298 299
      (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
       rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
       rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL ||
300
       rpcMsg->code == TSDB_CODE_APP_NOT_READY)) {
301 302 303 304 305 306
    tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);

    pSql->res.code = rpcMsg->code;  // keep the previous error code
    if (pSql->retry > pSql->maxRetry) {
      tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
    } else {
H
Haojun Liao 已提交
307
      // wait for a little bit moment and then retry, todo do not sleep in rpc callback thread
H
Haojun Liao 已提交
308 309 310 311 312
      if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
        int32_t duration = getWaitingTimeInterval(pSql->retry);
        taosMsleep(duration);
      }

B
Bomin Zhang 已提交
313
      rpcMsg->code = tscRenewTableMeta(pSql, 0);
314 315 316

      // if there is an error occurring, proceed to the following error handling procedure.
      if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
317
        taosReleaseRef(tscObjRef, pSql->self);
318 319
        rpcFreeCont(rpcMsg->pCont);
        return;
H
hzcheng 已提交
320 321
      }
    }
S
slguan 已提交
322
  }
323

H
hzcheng 已提交
324
  pRes->rspLen = 0;
325
  
H
Haojun Liao 已提交
326
  if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
327
    tscDebug("%p query is cancelled, code:%s", pSql, tstrerror(pRes->code));
H
Haojun Liao 已提交
328 329
  } else {
    pRes->code = rpcMsg->code;
H
hzcheng 已提交
330 331
  }

S
slguan 已提交
332
  if (pRes->code == TSDB_CODE_SUCCESS) {
333
    tscDebug("%p reset retry counter to be 0 due to success rsp, old:%d", pSql, pSql->retry);
S
slguan 已提交
334 335 336
    pSql->retry = 0;
  }

337
  if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
338
    assert(rpcMsg->msgType == pCmd->msgType + 1);
339
    pRes->code    = rpcMsg->code;
340
    pRes->rspType = rpcMsg->msgType;
341
    pRes->rspLen  = rpcMsg->contLen;
H
hzcheng 已提交
342

343
    if (pRes->rspLen > 0 && rpcMsg->pCont) {
344 345
      char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
      if (tmp == NULL) {
346
        pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
347 348
      } else {
        pRes->pRsp = tmp;
349
        memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
S
slguan 已提交
350
      }
351
    } else {
H
Haojun Liao 已提交
352
      tfree(pRes->pRsp);
S
slguan 已提交
353 354
    }

H
hzcheng 已提交
355 356 357 358
    /*
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
     */
359
    if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
360
      SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
S
slguan 已提交
361 362 363 364 365 366 367
      pMsg->code = htonl(pMsg->code);
      pMsg->numOfRows = htonl(pMsg->numOfRows);
      pMsg->affectedRows = htonl(pMsg->affectedRows);
      pMsg->failedRows = htonl(pMsg->failedRows);
      pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);

      pRes->numOfRows += pMsg->affectedRows;
368
      tscDebug("%p SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql, sqlCmd[pCmd->command], 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
369
          tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
S
slguan 已提交
370
    } else {
371
      tscDebug("%p SQL cmd:%s, code:%s rspLen:%d", pSql, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
H
hzcheng 已提交
372 373
    }
  }
374
  
H
Haojun Liao 已提交
375
  if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
376
    rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
H
Haojun Liao 已提交
377
  }
S
Shengliang Guan 已提交
378

H
Haojun Liao 已提交
379
  bool shouldFree = tscShouldBeFreed(pSql);
380
  if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
S
TD-1057  
Shengliang Guan 已提交
381
    rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
H
Haojun Liao 已提交
382
    (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
H
hzcheng 已提交
383 384
  }

385
  taosReleaseRef(tscObjRef, pSql->self);
H
Haojun Liao 已提交
386

H
Haojun Liao 已提交
387
  if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it
388
    taosRemoveRef(tscObjRef, pSql->self);
H
Haojun Liao 已提交
389 390 391
    tscDebug("%p sqlObj is automatically freed", pSql);
  }

392
  rpcFreeCont(rpcMsg->pCont);
H
hzcheng 已提交
393 394
}

S
slguan 已提交
395 396 397
int doProcessSql(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
398

H
hjxilinx 已提交
399 400 401 402 403 404 405
  if (pCmd->command == TSDB_SQL_SELECT ||
      pCmd->command == TSDB_SQL_FETCH ||
      pCmd->command == TSDB_SQL_RETRIEVE ||
      pCmd->command == TSDB_SQL_INSERT ||
      pCmd->command == TSDB_SQL_CONNECT ||
      pCmd->command == TSDB_SQL_HB ||
      pCmd->command == TSDB_SQL_META ||
406
      pCmd->command == TSDB_SQL_STABLEVGROUP) {
407 408 409 410 411 412
    pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
  }
  
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return pRes->code;
S
slguan 已提交
413
  }
414

415 416 417
  int32_t code = tscSendMsgToServer(pSql);

  // NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads.
418
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
419
    pRes->code = code;
H
hjxilinx 已提交
420
    tscQueueAsyncRes(pSql);
H
Haojun Liao 已提交
421
    return code;
S
slguan 已提交
422
  }
H
hjxilinx 已提交
423 424
  
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
425 426 427
}

int tscProcessSql(SSqlObj *pSql) {
428
  char    *name = NULL;
429
  SSqlCmd *pCmd = &pSql->cmd;
430
  
431
  SQueryInfo     *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
432
  STableMetaInfo *pTableMetaInfo = NULL;
433
  uint32_t        type = 0;
434

435
  if (pQueryInfo != NULL) {
H
hjxilinx 已提交
436
    pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
437
    name = (pTableMetaInfo != NULL)? pTableMetaInfo->name:NULL;
438
    type = pQueryInfo->type;
439

H
hjxilinx 已提交
440
    // while numOfTables equals to 0, it must be Heartbeat
H
hjxilinx 已提交
441
    assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
S
slguan 已提交
442
  }
443

444
  tscDebug("%p SQL cmd:%s will be processed, name:%s, type:%d", pSql, sqlCmd[pCmd->command], name, type);
H
hjxilinx 已提交
445
  if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL
H
hjxilinx 已提交
446
    if (pTableMetaInfo == NULL) {
447
      pSql->res.code = TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
448 449
      return pSql->res.code;
    }
H
hjxilinx 已提交
450
  } else if (pCmd->command < TSDB_SQL_LOCAL) {
451
    //pSql->epSet = tscMgmtEpSet;
H
hzcheng 已提交
452 453 454
  } else {  // local handler
    return (*tscProcessMsgRsp[pCmd->command])(pSql);
  }
455
  
S
slguan 已提交
456 457
  return doProcessSql(pSql);
}
H
hzcheng 已提交
458

J
jtao1735 已提交
459
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
460
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
S
slguan 已提交
461

H
Haojun Liao 已提交
462
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
463 464
  pRetrieveMsg->free    = htons(pQueryInfo->type);
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
H
hzcheng 已提交
465

466
  // todo valid the vgroupId at the client side
H
hjxilinx 已提交
467 468
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  
weixin_48148422's avatar
weixin_48148422 已提交
469
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
470
    int32_t vgIndex = pTableMetaInfo->vgroupIndex;
H
Haojun Liao 已提交
471 472 473 474 475 476 477 478 479
    if (pTableMetaInfo->pVgroupTables == NULL) {
      SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList;
      assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);

      pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
      tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex);
    } else {
      int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
      assert(vgIndex >= 0 && vgIndex < numOfVgroups);
H
Haojun Liao 已提交
480

H
Haojun Liao 已提交
481 482 483 484 485
      SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);

      pRetrieveMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId);
      tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex);
    }
486
  } else {
H
hjxilinx 已提交
487
    STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
H
hjxilinx 已提交
488
    pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
H
Haojun Liao 已提交
489
    tscDebug("%p build fetch msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId);
490
  }
491 492

  pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg);
J
jtao1735 已提交
493
  pSql->cmd.msgType = TSDB_MSG_TYPE_FETCH;
494 495 496

  pRetrieveMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));

497
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
498 499
}

500
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hjxilinx 已提交
501
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
502
  STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
H
hjxilinx 已提交
503
  
504
  char* pMsg = pSql->cmd.payload;
505 506 507
  
  // NOTE: shell message size should not include SMsgDesc
  int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
508 509
  int32_t vgId = pTableMeta->vgroupInfo.vgId;

510
  SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
511 512
  pMsgDesc->numOfVnodes = htonl(1); // always one vnode

513
  pMsg += sizeof(SMsgDesc);
514
  SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
515

H
hjxilinx 已提交
516
  pShellMsg->header.vgId = htonl(vgId);
H
Haojun Liao 已提交
517
  pShellMsg->header.contLen = htonl(size);      // the length not includes the size of SMsgDesc
518
  pShellMsg->length = pShellMsg->header.contLen;
H
hjxilinx 已提交
519
  
H
Haojun Liao 已提交
520
  pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit);  // number of tables to be inserted
H
hzcheng 已提交
521

H
hjxilinx 已提交
522
  // pSql->cmd.payloadLen is set during copying data into payload
S
slguan 已提交
523
  pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
dengyihao's avatar
dengyihao 已提交
524
  tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet);
525

526 527
  tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
      pSql->epSet.numOfEps);
528
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
529 530 531
}

/*
532
 * for table query, simply return the size <= 1k
H
hzcheng 已提交
533
 */
534
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
H
hzcheng 已提交
535
  const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
536
  SQueryInfo *         pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hzcheng 已提交
537

S
TD-1057  
Shengliang Guan 已提交
538
  int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo));
H
Haojun Liao 已提交
539 540

  size_t  numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
H
Haojun Liao 已提交
541
  int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs * 2);
H
Haojun Liao 已提交
542

H
Haojun Liao 已提交
543 544 545 546 547 548 549 550 551 552
  int32_t tsBufSize = (pQueryInfo->tsBuf != NULL) ? pQueryInfo->tsBuf->fileSize : 0;

  int32_t tableSerialize = 0;
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  if (pTableMetaInfo->pVgroupTables != NULL) {
    size_t numOfGroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables);

    int32_t totalTables = 0;
    for (int32_t i = 0; i < numOfGroups; ++i) {
      SVgroupTableInfo *pTableInfo = taosArrayGet(pTableMetaInfo->pVgroupTables, i);
H
Haojun Liao 已提交
553
      totalTables += (int32_t) taosArrayGetSize(pTableInfo->itemList);
H
Haojun Liao 已提交
554 555 556 557
    }

    tableSerialize = totalTables * sizeof(STableIdInfo);
  }
H
Haojun Liao 已提交
558

H
Haojun Liao 已提交
559 560
  return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + tsBufSize +
         tableSerialize + 4096;
H
hzcheng 已提交
561 562
}

563
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
564
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
B
Bomin Zhang 已提交
565
  TSKEY dfltKey = htobe64(pQueryMsg->window.skey);
566

H
hjxilinx 已提交
567
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
weixin_48148422's avatar
weixin_48148422 已提交
568
  if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
569
    
S
TD-1732  
Shengliang Guan 已提交
570
    SVgroupInfo* pVgroupInfo = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
571
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
572 573
      int32_t index = pTableMetaInfo->vgroupIndex;
      assert(index >= 0);
H
hjxilinx 已提交
574
  
B
Bomin Zhang 已提交
575
      if (pTableMetaInfo->vgroupList->numOfVgroups > 0) {
H
Haojun Liao 已提交
576
        assert(index < pTableMetaInfo->vgroupList->numOfVgroups);
B
Bomin Zhang 已提交
577 578
        pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
      }
H
Haojun Liao 已提交
579
      tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups);
H
hjxilinx 已提交
580 581
    } else {
      pVgroupInfo = &pTableMeta->vgroupInfo;
582
    }
weixin_48148422's avatar
weixin_48148422 已提交
583

584 585 586 587
    assert(pVgroupInfo != NULL);

    tscSetDnodeEpSet(pSql, pVgroupInfo);
    pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
weixin_48148422's avatar
weixin_48148422 已提交
588

589
    STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
590 591 592
    pTableIdInfo->tid = htonl(pTableMeta->id.tid);
    pTableIdInfo->uid = htobe64(pTableMeta->id.uid);
    pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->id.uid, dfltKey));
weixin_48148422's avatar
weixin_48148422 已提交
593

594 595
    pQueryMsg->numOfTables = htonl(1);  // set the number of tables
    pMsg += sizeof(STableIdInfo);
596
  } else { // it is a subquery of the super table query, this EP info is acquired from vgroupInfo
597
    int32_t index = pTableMetaInfo->vgroupIndex;
S
TD-1057  
Shengliang Guan 已提交
598
    int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
599
    assert(index >= 0 && index < numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
600

601
    tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
weixin_48148422's avatar
weixin_48148422 已提交
602

603
    SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
604

dengyihao's avatar
bugfix  
dengyihao 已提交
605
    // set the vgroup info 
606
    tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo);
607 608
    pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
    
S
TD-1057  
Shengliang Guan 已提交
609
    int32_t numOfTables = (int32_t)taosArrayGetSize(pTableIdList->itemList);
610 611 612 613 614 615 616 617 618
    pQueryMsg->numOfTables = htonl(numOfTables);  // set the number of tables
  
    // serialize each table id info
    for(int32_t i = 0; i < numOfTables; ++i) {
      STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i);
      
      STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
      pTableIdInfo->tid = htonl(pItem->tid);
      pTableIdInfo->uid = htobe64(pItem->uid);
B
Bomin Zhang 已提交
619
      pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pItem->uid, dfltKey));
620 621 622 623
      pMsg += sizeof(STableIdInfo);
    }
  }
  
H
Haojun Liao 已提交
624 625
  tscDebug("%p vgId:%d, query on table:%s, tid:%d, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name,
      pTableMeta->id.tid, pTableMeta->id.uid);
H
hjxilinx 已提交
626
  
627 628 629
  return pMsg;
}

630
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
631 632
  SSqlCmd *pCmd = &pSql->cmd;

633
  int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
634

S
slguan 已提交
635 636
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for query msg", pSql);
H
Haojun Liao 已提交
637
    return TSDB_CODE_TSC_INVALID_SQL;  // todo add test for this
S
slguan 已提交
638
  }
639
  
640
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
641
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
642
  STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
643 644 645

  size_t numOfSrcCols = taosArrayGetSize(pQueryInfo->colList);
  if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo)) {
S
Shengliang Guan 已提交
646
    tscError("%p illegal value of numOfCols in query msg: %" PRIu64 ", table cols:%d", pSql, (uint64_t)numOfSrcCols,
647 648
        tscGetNumOfColumns(pTableMeta));

H
Haojun Liao 已提交
649
    return TSDB_CODE_TSC_INVALID_SQL;
650
  }
651
  
652
  if (pQueryInfo->interval.interval < 0) {
S
TD-1530  
Shengliang Guan 已提交
653
    tscError("%p illegal value of aggregation time interval in query msg: %" PRId64, pSql, (int64_t)pQueryInfo->interval.interval);
H
Haojun Liao 已提交
654
    return TSDB_CODE_TSC_INVALID_SQL;
655 656 657 658
  }
  
  if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
    tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
H
Haojun Liao 已提交
659
    return TSDB_CODE_TSC_INVALID_SQL;
660
  }
661

662
  SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
663
  tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version));
H
hzcheng 已提交
664

S
TD-1057  
Shengliang Guan 已提交
665
  int32_t numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList);
666
  
667
  if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
H
hjxilinx 已提交
668 669
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey);
H
hzcheng 已提交
670
  } else {
H
hjxilinx 已提交
671 672
    pQueryMsg->window.skey = htobe64(pQueryInfo->window.ekey);
    pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey);
H
hzcheng 已提交
673 674
  }

675 676
  pQueryMsg->order          = htons(pQueryInfo->order.order);
  pQueryMsg->orderColId     = htons(pQueryInfo->order.orderColId);
H
Haojun Liao 已提交
677
  pQueryMsg->fillType       = htons(pQueryInfo->fillType);
678 679
  pQueryMsg->limit          = htobe64(pQueryInfo->limit.limit);
  pQueryMsg->offset         = htobe64(pQueryInfo->limit.offset);
S
TD-1057  
Shengliang Guan 已提交
680
  pQueryMsg->numOfCols      = htons((int16_t)taosArrayGetSize(pQueryInfo->colList));
H
Haojun Liao 已提交
681 682
  pQueryMsg->interval.interval = htobe64(pQueryInfo->interval.interval);
  pQueryMsg->interval.sliding  = htobe64(pQueryInfo->interval.sliding);
683 684
  pQueryMsg->interval.offset   = htobe64(pQueryInfo->interval.offset);
  pQueryMsg->interval.intervalUnit = pQueryInfo->interval.intervalUnit;
H
Haojun Liao 已提交
685 686
  pQueryMsg->interval.slidingUnit  = pQueryInfo->interval.slidingUnit;
  pQueryMsg->interval.offsetUnit   = pQueryInfo->interval.offsetUnit;
687
  pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
weixin_48148422's avatar
weixin_48148422 已提交
688
  pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
689
  pQueryMsg->numOfTags      = htonl(numOfTags);
H
Haojun Liao 已提交
690
  pQueryMsg->queryType      = htonl(pQueryInfo->type);
H
Haojun Liao 已提交
691
  pQueryMsg->vgroupLimit     = htobe64(pQueryInfo->vgroupLimit);
H
hjxilinx 已提交
692 693
  
  size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
H
Haojun Liao 已提交
694
  pQueryMsg->numOfOutput = htons((int16_t)numOfOutput);  // this is the stage one output column number
H
hzcheng 已提交
695 696

  // set column list ids
697 698
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
H
hjxilinx 已提交
699
  SSchema *pSchema = tscGetTableSchema(pTableMeta);
700
  
701 702 703
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
    SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
H
hzcheng 已提交
704

705
    if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
706
        pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
707 708
      tscError("%p tid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
          pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
709 710
               pColSchema->name);

711
      return TSDB_CODE_TSC_INVALID_SQL;
712
    }
H
hzcheng 已提交
713 714 715

    pQueryMsg->colList[i].colId = htons(pColSchema->colId);
    pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
716
    pQueryMsg->colList[i].type  = htons(pColSchema->type);
S
slguan 已提交
717
    pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
H
hzcheng 已提交
718

S
slguan 已提交
719 720 721
    // append the filter information after the basic column information
    for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
      SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
H
hzcheng 已提交
722

S
slguan 已提交
723
      SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
724
      pFilterMsg->filterstr = htons(pColFilter->filterstr);
S
slguan 已提交
725 726

      pMsg += sizeof(SColumnFilterInfo);
H
hzcheng 已提交
727

728
      if (pColFilter->filterstr) {
S
slguan 已提交
729
        pFilterMsg->len = htobe64(pColFilter->len);
730
        memcpy(pMsg, (void *)pColFilter->pz, (size_t)(pColFilter->len + 1));
S
slguan 已提交
731 732 733 734 735 736 737 738
        pMsg += (pColFilter->len + 1);  // append the additional filter binary info
      } else {
        pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
        pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
      }

      pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
      pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
H
hzcheng 已提交
739

S
slguan 已提交
740 741
      if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
        tscError("invalid filter info");
H
Haojun Liao 已提交
742
        return TSDB_CODE_TSC_INVALID_SQL;
S
slguan 已提交
743 744
      }
    }
H
hzcheng 已提交
745 746
  }

H
hjxilinx 已提交
747
  SSqlFuncMsg *pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hjxilinx 已提交
748
  for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
749
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
750

751
    if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
H
hzcheng 已提交
752
      tscError("%p table schema is not matched with parsed sql", pSql);
H
Haojun Liao 已提交
753
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
754 755
    }

H
Haojun Liao 已提交
756 757
    assert(pExpr->resColId < 0);

758 759 760
    pSqlFuncExpr->colInfo.colId    = htons(pExpr->colInfo.colId);
    pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
    pSqlFuncExpr->colInfo.flag     = htons(pExpr->colInfo.flag);
H
hzcheng 已提交
761

762
    pSqlFuncExpr->functionId  = htons(pExpr->functionId);
H
hzcheng 已提交
763
    pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
H
Haojun Liao 已提交
764
    pSqlFuncExpr->resColId    = htons(pExpr->resColId);
H
hjxilinx 已提交
765
    pMsg += sizeof(SSqlFuncMsg);
H
hzcheng 已提交
766 767

    for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
H
Haojun Liao 已提交
768
      // todo add log
H
hzcheng 已提交
769 770 771 772 773
      pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
      pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

      if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
        memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
774
        pMsg += pExpr->param[j].nLen;
H
hzcheng 已提交
775 776 777 778 779
      } else {
        pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
      }
    }

H
hjxilinx 已提交
780
    pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
H
hzcheng 已提交
781
  }
H
Haojun Liao 已提交
782

H
Haojun Liao 已提交
783 784
  size_t output = tscNumOfFields(pQueryInfo);

H
Haojun Liao 已提交
785
  if (tscIsSecondStageQuery(pQueryInfo)) {
S
Shengliang Guan 已提交
786
    pQueryMsg->secondStageOutput = htonl((int32_t) output);
H
Haojun Liao 已提交
787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849

    SSqlFuncMsg *pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;

    for (int32_t i = 0; i < output; ++i) {
      SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i);
      SSqlExpr *pExpr = pField->pSqlExpr;
      if (pExpr != NULL) {
        if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
          tscError("%p table schema is not matched with parsed sql", pSql);
          return TSDB_CODE_TSC_INVALID_SQL;
        }

        pSqlFuncExpr1->colInfo.colId    = htons(pExpr->colInfo.colId);
        pSqlFuncExpr1->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
        pSqlFuncExpr1->colInfo.flag     = htons(pExpr->colInfo.flag);

        pSqlFuncExpr1->functionId  = htons(pExpr->functionId);
        pSqlFuncExpr1->numOfParams = htons(pExpr->numOfParams);
        pMsg += sizeof(SSqlFuncMsg);

        for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
          // todo add log
          pSqlFuncExpr1->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
          pSqlFuncExpr1->arg[j].argBytes = htons(pExpr->param[j].nLen);

          if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
            memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
            pMsg += pExpr->param[j].nLen;
          } else {
            pSqlFuncExpr1->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
          }
        }

        pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;
      } else {
        assert(pField->pArithExprInfo != NULL);
        SExprInfo* pExprInfo = pField->pArithExprInfo;

        pSqlFuncExpr1->colInfo.colId    = htons(pExprInfo->base.colInfo.colId);
        pSqlFuncExpr1->functionId  = htons(pExprInfo->base.functionId);
        pSqlFuncExpr1->numOfParams = htons(pExprInfo->base.numOfParams);
        pMsg += sizeof(SSqlFuncMsg);

        for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
          // todo add log
          pSqlFuncExpr1->arg[j].argType = htons((uint16_t)pExprInfo->base.arg[j].argType);
          pSqlFuncExpr1->arg[j].argBytes = htons(pExprInfo->base.arg[j].argBytes);

          if (pExprInfo->base.arg[j].argType == TSDB_DATA_TYPE_BINARY) {
            memcpy(pMsg, pExprInfo->base.arg[j].argValue.pz, pExprInfo->base.arg[j].argBytes);
            pMsg += pExprInfo->base.arg[j].argBytes;
          } else {
            pSqlFuncExpr1->arg[j].argValue.i64 = htobe64(pExprInfo->base.arg[j].argValue.i64);
          }
        }

        pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;
      }
    }
  } else {
    pQueryMsg->secondStageOutput = 0;
  }

850
  // serialize the table info (sid, uid, tags)
851 852
  pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
  
853
  SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
854
  if (pGroupbyExpr->numOfGroupCols > 0) {
S
slguan 已提交
855
    pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
H
hzcheng 已提交
856 857
    pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

S
slguan 已提交
858
    for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
859 860
      SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
  
S
slguan 已提交
861 862 863
      *((int16_t *)pMsg) = pCol->colId;
      pMsg += sizeof(pCol->colId);

864 865
      *((int16_t *)pMsg) += pCol->colIndex;
      pMsg += sizeof(pCol->colIndex);
S
slguan 已提交
866 867 868

      *((int16_t *)pMsg) += pCol->flag;
      pMsg += sizeof(pCol->flag);
H
hjxilinx 已提交
869 870 871
      
      memcpy(pMsg, pCol->name, tListLen(pCol->name));
      pMsg += tListLen(pCol->name);
S
slguan 已提交
872 873 874
    }
  }

875
  if (pQueryInfo->fillType != TSDB_FILL_NONE) {
H
Haojun Liao 已提交
876
    for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
877 878
      *((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]);
      pMsg += sizeof(pQueryInfo->fillVal[0]);
S
slguan 已提交
879 880
    }
  }
881 882 883 884 885 886 887 888 889
  
  if (numOfTags != 0) {
    int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
    int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
    int32_t total = numOfTagColumns + numOfColumns;
    
    pSchema = tscGetTableTagSchema(pTableMeta);
    
    for (int32_t i = 0; i < numOfTags; ++i) {
H
hjxilinx 已提交
890
      SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
891 892 893 894
      SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];

      if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
          (pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
895 896
        tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
                 pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, total, numOfTagColumns,
B
Bomin Zhang 已提交
897
                 pCol->colIndex.columnIndex, pColSchema->name);
898

899
        return TSDB_CODE_TSC_INVALID_SQL;
900 901 902 903 904 905 906 907 908 909 910 911
      }
  
      SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
  
      pTagCol->colId = htons(pColSchema->colId);
      pTagCol->bytes = htons(pColSchema->bytes);
      pTagCol->type  = htons(pColSchema->type);
      pTagCol->numOfFilters = 0;
      
      pMsg += sizeof(SColumnInfo);
    }
  }
S
slguan 已提交
912

H
Haojun Liao 已提交
913 914 915 916
  // serialize tag column query condition
  if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
    STagCond* pTagCond = &pQueryInfo->tagCond;
    
917
    SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid);
H
Haojun Liao 已提交
918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933
    if (pCond != NULL && pCond->cond != NULL) {
      pQueryMsg->tagCondLen = htons(pCond->len);
      memcpy(pMsg, pCond->cond, pCond->len);
      
      pMsg += pCond->len;
    }
  }
  
  if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
    *pMsg = 0;
    pMsg++;
  } else {
    strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
    pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
  }

S
slguan 已提交
934
  // compressed ts block
S
TD-1057  
Shengliang Guan 已提交
935
  pQueryMsg->tsOffset = htonl((int32_t)(pMsg - pCmd->payload));
S
slguan 已提交
936

937
  if (pQueryInfo->tsBuf != NULL) {
938 939
    // note: here used the index instead of actual vnode id.
    int32_t vnodeIndex = pTableMetaInfo->vgroupIndex;
H
Haojun Liao 已提交
940
    int32_t code = dumpFileBlockByGroupId(pQueryInfo->tsBuf, vnodeIndex, pMsg, &pQueryMsg->tsLen, &pQueryMsg->tsNumOfBlocks);
941
    if (code != TSDB_CODE_SUCCESS) {
B
Bomin Zhang 已提交
942 943
      return code;
    }
S
slguan 已提交
944

945
    pMsg += pQueryMsg->tsLen;
H
hzcheng 已提交
946

947
    pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
948 949
    pQueryMsg->tsLen   = htonl(pQueryMsg->tsLen);
    pQueryMsg->tsNumOfBlocks = htonl(pQueryMsg->tsNumOfBlocks);
H
hzcheng 已提交
950 951
  }

S
TD-1057  
Shengliang Guan 已提交
952
  int32_t msgLen = (int32_t)(pMsg - pCmd->payload);
H
hzcheng 已提交
953

H
Haojun Liao 已提交
954
  tscDebug("%p msg built success, len:%d bytes", pSql, msgLen);
H
hzcheng 已提交
955
  pCmd->payloadLen = msgLen;
S
slguan 已提交
956
  pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
H
hjxilinx 已提交
957
  
958
  pQueryMsg->head.contLen = htonl(msgLen);
S
TD-1057  
Shengliang Guan 已提交
959
  assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize);
960 961

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
962 963
}

964 965
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
966
  pCmd->payloadLen = sizeof(SCreateDbMsg);
S
slguan 已提交
967
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
H
hzcheng 已提交
968

S
TD-1732  
Shengliang Guan 已提交
969
  SCreateDbMsg *pCreateDbMsg = (SCreateDbMsg *)pCmd->payload;
970

971
  assert(pCmd->numOfClause == 1);
972
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
973
  tstrncpy(pCreateDbMsg->db, pTableMetaInfo->name, sizeof(pCreateDbMsg->db));
H
hzcheng 已提交
974

975
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
976 977
}

978 979
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
980
  pCmd->payloadLen = sizeof(SCreateDnodeMsg);
S
slguan 已提交
981 982
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
983
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
984
  }
H
hzcheng 已提交
985

S
TD-1732  
Shengliang Guan 已提交
986
  SCreateDnodeMsg *pCreate = (SCreateDnodeMsg *)pCmd->payload;
J
jtao1735 已提交
987 988
  strncpy(pCreate->ep, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
  
S
slguan 已提交
989
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
H
hzcheng 已提交
990

991
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
992 993
}

994 995
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1762  
Shengliang Guan 已提交
996
  pCmd->payloadLen = sizeof(SCreateAcctMsg);
S
slguan 已提交
997 998
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
999
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1000
  }
H
hzcheng 已提交
1001

S
TD-1762  
Shengliang Guan 已提交
1002
  SCreateAcctMsg *pAlterMsg = (SCreateAcctMsg *)pCmd->payload;
H
hzcheng 已提交
1003

H
Haojun Liao 已提交
1004 1005
  SStrToken *pName = &pInfo->pDCLInfo->user.user;
  SStrToken *pPwd = &pInfo->pDCLInfo->user.passwd;
H
hzcheng 已提交
1006

1007 1008
  strncpy(pAlterMsg->user, pName->z, pName->n);
  strncpy(pAlterMsg->pass, pPwd->z, pPwd->n);
S
slguan 已提交
1009

1010
  SCreateAcctSQL *pAcctOpt = &pInfo->pDCLInfo->acctOpt;
H
hzcheng 已提交
1011

1012 1013 1014 1015 1016 1017 1018 1019
  pAlterMsg->cfg.maxUsers = htonl(pAcctOpt->maxUsers);
  pAlterMsg->cfg.maxDbs = htonl(pAcctOpt->maxDbs);
  pAlterMsg->cfg.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
  pAlterMsg->cfg.maxStreams = htonl(pAcctOpt->maxStreams);
  pAlterMsg->cfg.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
  pAlterMsg->cfg.maxStorage = htobe64(pAcctOpt->maxStorage);
  pAlterMsg->cfg.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
  pAlterMsg->cfg.maxConnections = htonl(pAcctOpt->maxConnections);
H
hzcheng 已提交
1020

1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033
  if (pAcctOpt->stat.n == 0) {
    pAlterMsg->cfg.accessState = -1;
  } else {
    if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_READ_ACCCESS;
    } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
      pAlterMsg->cfg.accessState = TSDB_VN_WRITE_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
      pAlterMsg->cfg.accessState = TSDB_VN_ALL_ACCCESS;
    } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
      pAlterMsg->cfg.accessState = 0;
    }
  }
H
hzcheng 已提交
1034

S
slguan 已提交
1035
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT;
1036
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1037 1038
}

1039 1040
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1041
  pCmd->payloadLen = sizeof(SCreateUserMsg);
S
slguan 已提交
1042

S
slguan 已提交
1043 1044
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1045
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1046 1047
  }

S
TD-1732  
Shengliang Guan 已提交
1048
  SCreateUserMsg *pAlterMsg = (SCreateUserMsg *)pCmd->payload;
H
hzcheng 已提交
1049

1050 1051
  SUserInfo *pUser = &pInfo->pDCLInfo->user;
  strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
S
TD-1057  
Shengliang Guan 已提交
1052
  pAlterMsg->flag = (int8_t)pUser->type;
H
hzcheng 已提交
1053

1054 1055 1056 1057
  if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
    pAlterMsg->privilege = (char)pCmd->count;
  } else if (pUser->type == TSDB_ALTER_USER_PASSWD) {
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
H
hjxilinx 已提交
1058 1059
  } else { // create user password info
    strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
1060
  }
H
hzcheng 已提交
1061

1062
  if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
S
slguan 已提交
1063
    pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER;
1064
  } else {
S
slguan 已提交
1065
    pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER;
1066
  }
H
hzcheng 已提交
1067

1068
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1069 1070
}

1071 1072
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1073
  pCmd->payloadLen = sizeof(SCfgDnodeMsg);
guanshengliang's avatar
guanshengliang 已提交
1074
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONFIG_DNODE;
1075 1076
  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
1077

1078 1079
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1080
  pCmd->payloadLen = sizeof(SDropDbMsg);
H
hzcheng 已提交
1081

S
slguan 已提交
1082 1083
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1084
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1085 1086
  }

S
TD-1732  
Shengliang Guan 已提交
1087
  SDropDbMsg *pDropDbMsg = (SDropDbMsg*)pCmd->payload;
H
hzcheng 已提交
1088

1089
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1090
  tstrncpy(pDropDbMsg->db, pTableMetaInfo->name, sizeof(pDropDbMsg->db));
1091
  pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1092

S
slguan 已提交
1093
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
1094
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1095 1096
}

1097 1098
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
1099
  pCmd->payloadLen = sizeof(SCMDropTableMsg);
H
hzcheng 已提交
1100

S
slguan 已提交
1101 1102
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1103
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1104
  }
H
hzcheng 已提交
1105

1106
  SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
1107
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1108
  strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
S
slguan 已提交
1109
  pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
H
hzcheng 已提交
1110

S
slguan 已提交
1111
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE;
1112
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1113 1114
}

1115
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1116
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1117
  pCmd->payloadLen = sizeof(SDropDnodeMsg);
S
slguan 已提交
1118 1119
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1120
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1121
  }
H
hzcheng 已提交
1122

S
TD-1732  
Shengliang Guan 已提交
1123
  SDropDnodeMsg * pDrop = (SDropDnodeMsg *)pCmd->payload;
1124
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1125
  tstrncpy(pDrop->ep, pTableMetaInfo->name, sizeof(pDrop->ep));
S
slguan 已提交
1126
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
H
hzcheng 已提交
1127

1128
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1129 1130
}

S
[TD-16]  
slguan 已提交
1131
int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1132
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1133
  pCmd->payloadLen = sizeof(SDropUserMsg);
S
slguan 已提交
1134
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
H
hzcheng 已提交
1135

S
slguan 已提交
1136 1137
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1138
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1139
  }
H
hzcheng 已提交
1140

S
TD-1732  
Shengliang Guan 已提交
1141
  SDropUserMsg *  pDropMsg = (SDropUserMsg *)pCmd->payload;
1142
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1143
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
H
hzcheng 已提交
1144

1145
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1146 1147
}

S
[TD-16]  
slguan 已提交
1148 1149
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1150
  pCmd->payloadLen = sizeof(SDropUserMsg);
S
[TD-16]  
slguan 已提交
1151 1152 1153 1154
  pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_ACCT;

  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1155
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
[TD-16]  
slguan 已提交
1156 1157
  }

S
TD-1732  
Shengliang Guan 已提交
1158
  SDropUserMsg *  pDropMsg = (SDropUserMsg *)pCmd->payload;
S
[TD-16]  
slguan 已提交
1159
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1160
  tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
S
[TD-16]  
slguan 已提交
1161 1162 1163 1164

  return TSDB_CODE_SUCCESS;
}

1165 1166
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1167
  pCmd->payloadLen = sizeof(SUseDbMsg);
H
hzcheng 已提交
1168

S
slguan 已提交
1169 1170
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1171
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1172
  }
1173

S
TD-1732  
Shengliang Guan 已提交
1174
  SUseDbMsg *pUseDbMsg = (SUseDbMsg *)pCmd->payload;
1175
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1176
  strcpy(pUseDbMsg->db, pTableMetaInfo->name);
S
slguan 已提交
1177
  pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
H
hzcheng 已提交
1178

1179
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1180 1181
}

1182
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1183
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1184
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1185
  pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
S
TD-1732  
Shengliang Guan 已提交
1186
  pCmd->payloadLen = sizeof(SShowMsg) + 100;
H
hzcheng 已提交
1187

S
slguan 已提交
1188 1189
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1190
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1191
  }
H
hzcheng 已提交
1192

S
TD-1732  
Shengliang Guan 已提交
1193
  SShowMsg *pShowMsg = (SShowMsg *)pCmd->payload;
S
slguan 已提交
1194

1195
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1196
  size_t nameLen = strlen(pTableMetaInfo->name);
S
slguan 已提交
1197
  if (nameLen > 0) {
B
Bomin Zhang 已提交
1198
    tstrncpy(pShowMsg->db, pTableMetaInfo->name, sizeof(pShowMsg->db));  // prefix is set here
H
hzcheng 已提交
1199
  } else {
B
Bomin Zhang 已提交
1200
    tstrncpy(pShowMsg->db, pObj->db, sizeof(pShowMsg->db));
H
hzcheng 已提交
1201 1202
  }

1203 1204 1205 1206
  SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
  pShowMsg->type = pShowInfo->showType;

  if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
H
Haojun Liao 已提交
1207
    SStrToken *pPattern = &pShowInfo->pattern;
1208 1209 1210 1211 1212
    if (pPattern->type > 0) {  // only show tables support wildcard query
      strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
      pShowMsg->payloadLen = htons(pPattern->n);
    }
  } else {
H
Haojun Liao 已提交
1213
    SStrToken *pEpAddr = &pShowInfo->prefix;
dengyihao's avatar
dengyihao 已提交
1214
    assert(pEpAddr->n > 0 && pEpAddr->type > 0);
H
hzcheng 已提交
1215

dengyihao's avatar
dengyihao 已提交
1216 1217
    strncpy(pShowMsg->payload, pEpAddr->z, pEpAddr->n);
    pShowMsg->payloadLen = htons(pEpAddr->n);
1218 1219
  }

S
TD-1732  
Shengliang Guan 已提交
1220
  pCmd->payloadLen = sizeof(SShowMsg) + pShowMsg->payloadLen;
1221
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1222 1223
}

1224
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1225
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1226
  pCmd->payloadLen = sizeof(SKillQueryMsg);
H
hzcheng 已提交
1227

1228 1229
  switch (pCmd->command) {
    case TSDB_SQL_KILL_QUERY:
S
slguan 已提交
1230
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
1231 1232
      break;
    case TSDB_SQL_KILL_CONNECTION:
S
slguan 已提交
1233
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN;
1234 1235
      break;
    case TSDB_SQL_KILL_STREAM:
S
slguan 已提交
1236
      pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM;
1237 1238 1239
      break;
  }
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1240 1241
}

1242
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1243 1244
  SSqlCmd *pCmd = &(pSql->cmd);

guanshengliang's avatar
guanshengliang 已提交
1245
  int32_t size = minMsgSize() + sizeof(SCMCreateTableMsg);
H
hzcheng 已提交
1246

1247
  SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
1248
  if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
H
hzcheng 已提交
1249 1250
    size += sizeof(STagData);
  } else {
S
slguan 已提交
1251
    size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
H
hzcheng 已提交
1252
  }
1253

1254 1255 1256
  if (pCreateTableInfo->pSelect != NULL) {
    size += (pCreateTableInfo->pSelect->selectToken.n + 1);
  }
H
hzcheng 已提交
1257 1258 1259 1260

  return size + TSDB_EXTRA_PAYLOAD_SIZE;
}

1261
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1262
  int              msgLen = 0;
S
slguan 已提交
1263
  SSchema *        pSchema;
H
hzcheng 已提交
1264
  int              size = 0;
1265 1266 1267
  SSqlCmd *pCmd = &pSql->cmd;

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
hjxilinx 已提交
1268
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1269 1270

  // Reallocate the payload size
1271
  size = tscEstimateCreateTableMsgLength(pSql, pInfo);
S
slguan 已提交
1272 1273
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for create table msg", pSql);
1274
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1275
  }
H
hzcheng 已提交
1276 1277


1278
  SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
H
hjxilinx 已提交
1279
  strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
1280 1281

  // use dbinfo from table id without modifying current db info
H
Haojun Liao 已提交
1282
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pCreateTableMsg->db);
H
hzcheng 已提交
1283

1284 1285 1286
  SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;

  pCreateTableMsg->igExists = pCreateTable->existCheck ? 1 : 0;
H
hzcheng 已提交
1287 1288 1289 1290
  pCreateTableMsg->numOfColumns = htons(pCmd->numOfCols);
  pCreateTableMsg->numOfTags = htons(pCmd->count);

  pCreateTableMsg->sqlLen = 0;
S
slguan 已提交
1291
  char *pMsg = (char *)pCreateTableMsg->schema;
H
hzcheng 已提交
1292

1293 1294
  int8_t type = pInfo->pCreateTableInfo->type;
  if (type == TSQL_CREATE_TABLE_FROM_STABLE) {  // create by using super table, tags value
1295 1296 1297 1298 1299 1300 1301
    STagData* pTag = &pInfo->pCreateTableInfo->usingInfo.tagdata;
    *(int32_t*)pMsg = htonl(pTag->dataLen);
    pMsg += sizeof(int32_t);
    memcpy(pMsg, pTag->name, sizeof(pTag->name));
    pMsg += sizeof(pTag->name);
    memcpy(pMsg, pTag->data, pTag->dataLen);
    pMsg += pTag->dataLen;
1302
  } else {  // create (super) table
1303
    pSchema = (SSchema *)pCreateTableMsg->schema;
1304

H
hzcheng 已提交
1305
    for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
H
hjxilinx 已提交
1306
      TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hzcheng 已提交
1307 1308 1309 1310

      pSchema->type = pField->type;
      strcpy(pSchema->name, pField->name);
      pSchema->bytes = htons(pField->bytes);
1311

H
hzcheng 已提交
1312 1313 1314 1315
      pSchema++;
    }

    pMsg = (char *)pSchema;
1316 1317
    if (type == TSQL_CREATE_STREAM) {  // check if it is a stream sql
      SQuerySQL *pQuerySql = pInfo->pCreateTableInfo->pSelect;
H
hzcheng 已提交
1318

1319 1320 1321
      strncpy(pMsg, pQuerySql->selectToken.z, pQuerySql->selectToken.n + 1);
      pCreateTableMsg->sqlLen = htons(pQuerySql->selectToken.n + 1);
      pMsg += pQuerySql->selectToken.n + 1;
H
hzcheng 已提交
1322 1323 1324
    }
  }

H
hjxilinx 已提交
1325
  tscFieldInfoClear(&pQueryInfo->fieldsInfo);
H
hzcheng 已提交
1326

S
TD-1057  
Shengliang Guan 已提交
1327
  msgLen = (int32_t)(pMsg - (char*)pCreateTableMsg);
S
slguan 已提交
1328
  pCreateTableMsg->contLen = htonl(msgLen);
H
hzcheng 已提交
1329
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1330
  pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
H
hzcheng 已提交
1331 1332

  assert(msgLen + minMsgSize() <= size);
1333
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1334 1335 1336
}

int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
1337
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
S
TD-1732  
Shengliang Guan 已提交
1338
  return minMsgSize() + sizeof(SAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) + TSDB_EXTRA_PAYLOAD_SIZE;
H
hzcheng 已提交
1339 1340
}

1341
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1342 1343
  char *pMsg;
  int   msgLen = 0;
H
hzcheng 已提交
1344

1345
  SSqlCmd    *pCmd = &pSql->cmd;
1346 1347
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
1348
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1349 1350 1351
  
  SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;
  int size = tscEstimateAlterTableMsgLength(pCmd);
S
slguan 已提交
1352 1353
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
    tscError("%p failed to malloc for alter table msg", pSql);
H
Haojun Liao 已提交
1354
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1355
  }
1356
  
S
TD-1732  
Shengliang Guan 已提交
1357
  SAlterTableMsg *pAlterTableMsg = (SAlterTableMsg *)pCmd->payload;
H
Haojun Liao 已提交
1358
  tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pAlterTableMsg->db);
H
hzcheng 已提交
1359

H
hjxilinx 已提交
1360
  strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
1361
  pAlterTableMsg->type = htons(pAlterInfo->type);
1362

1363
  pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo));
S
slguan 已提交
1364
  SSchema *pSchema = pAlterTableMsg->schema;
1365
  for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
H
hjxilinx 已提交
1366
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
1367
  
H
hzcheng 已提交
1368 1369 1370 1371 1372 1373 1374
    pSchema->type = pField->type;
    strcpy(pSchema->name, pField->name);
    pSchema->bytes = htons(pField->bytes);
    pSchema++;
  }

  pMsg = (char *)pSchema;
1375 1376 1377
  pAlterTableMsg->tagValLen = htonl(pAlterInfo->tagData.dataLen);
  memcpy(pMsg, pAlterInfo->tagData.data, pAlterInfo->tagData.dataLen);
  pMsg += pAlterInfo->tagData.dataLen;
H
hzcheng 已提交
1378

S
TD-1057  
Shengliang Guan 已提交
1379
  msgLen = (int32_t)(pMsg - (char*)pAlterTableMsg);
1380

H
hzcheng 已提交
1381
  pCmd->payloadLen = msgLen;
S
slguan 已提交
1382
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
H
hzcheng 已提交
1383 1384

  assert(msgLen + minMsgSize() <= size);
1385

1386
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1387 1388
}

1389 1390 1391 1392
int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
  SSqlCmd* pCmd = &pSql->cmd;
  pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL;
  
1393
  SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
1394
  pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
1395

1396 1397
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
1398

dengyihao's avatar
dengyihao 已提交
1399
  tscDumpEpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->epSet);
1400

1401 1402 1403
  return TSDB_CODE_SUCCESS;
}

1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439
//int tscBuildCancelQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
//  SCancelQueryMsg *pCancelMsg = (SCancelQueryMsg*) pSql->cmd.payload;
//  pCancelMsg->qhandle = htobe64(pSql->res.qhandle);
//
//  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
//  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
//
//  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
//    int32_t vgIndex = pTableMetaInfo->vgroupIndex;
//    if (pTableMetaInfo->pVgroupTables == NULL) {
//      SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList;
//      assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);
//
//      pCancelMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
//      tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex);
//    } else {
//      int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
//      assert(vgIndex >= 0 && vgIndex < numOfVgroups);
//
//      SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);
//
//      pCancelMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId);
//      tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex);
//    }
//  } else {
//    STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
//    pCancelMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
//    tscDebug("%p build cancel query msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId);
//  }
//
//  pSql->cmd.payloadLen = sizeof(SCancelQueryMsg);
//  pSql->cmd.msgType = TSDB_MSG_TYPE_CANCEL_QUERY;
//
//  pCancelMsg->header.contLen = htonl(sizeof(SCancelQueryMsg));
//  return TSDB_CODE_SUCCESS;
//}
1440

1441
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1442
  SSqlCmd *pCmd = &pSql->cmd;
S
TD-1732  
Shengliang Guan 已提交
1443
  pCmd->payloadLen = sizeof(SAlterDbMsg);
S
slguan 已提交
1444
  pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
H
hzcheng 已提交
1445

S
TD-1732  
Shengliang Guan 已提交
1446
  SAlterDbMsg *pAlterDbMsg = (SAlterDbMsg* )pCmd->payload;
1447
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
B
Bomin Zhang 已提交
1448
  tstrncpy(pAlterDbMsg->db, pTableMetaInfo->name, sizeof(pAlterDbMsg->db));
H
hzcheng 已提交
1449

1450
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1451 1452
}

1453
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
S
slguan 已提交
1454
  SSqlCmd *pCmd = &pSql->cmd;
J
jtao1735 已提交
1455
  pCmd->msgType = TSDB_MSG_TYPE_CM_RETRIEVE;
S
slguan 已提交
1456
  pCmd->payloadLen = sizeof(SRetrieveTableMsg);
S
slguan 已提交
1457

S
slguan 已提交
1458 1459
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1460
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
1461
  }
S
slguan 已提交
1462

S
slguan 已提交
1463 1464 1465 1466
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
  pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
  pRetrieveMsg->free = htons(pQueryInfo->type);
H
hzcheng 已提交
1467

1468
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479
}

/*
 * this function can only be called once.
 * by using pRes->rspType to denote its status
 *
 * if pRes->rspType is 1, no more result
 */
static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
1480

H
hzcheng 已提交
1481 1482 1483 1484 1485 1486
  pRes->code = TSDB_CODE_SUCCESS;
  if (pRes->rspType == 0) {
    pRes->numOfRows = numOfRes;
    pRes->row = 0;
    pRes->rspType = 1;

1487 1488 1489 1490 1491 1492
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
    if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
      return pRes->code;
    }

    tscSetResRawPtr(pRes, pQueryInfo);
H
hzcheng 已提交
1493
  } else {
S
slguan 已提交
1494
    tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509
  }

  uint8_t code = pSql->res.code;
  if (pSql->fp) {
    if (code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
    } else {
      tscQueueAsyncRes(pSql);
    }
  }

  return code;
}

int tscProcessDescribeTableRsp(SSqlObj *pSql) {
S
slguan 已提交
1510
  SSqlCmd *       pCmd = &pSql->cmd;
1511
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
1512

H
hjxilinx 已提交
1513
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1514 1515
  
  int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
H
hzcheng 已提交
1516 1517 1518
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
Haojun Liao 已提交
1519 1520 1521
int tscProcessLocalRetrieveRsp(SSqlObj *pSql) {
  int32_t numOfRes = 1;
  pSql->res.completed = true;
H
hzcheng 已提交
1522 1523 1524
  return tscLocalResultCommonBuilder(pSql, numOfRes);
}

H
hjxilinx 已提交
1525
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
H
hzcheng 已提交
1526
  SSqlRes *pRes = &pSql->res;
H
Haojun Liao 已提交
1527
  SSqlCmd* pCmd = &pSql->cmd;
H
hzcheng 已提交
1528

H
Haojun Liao 已提交
1529 1530 1531 1532 1533 1534
  int32_t code = pRes->code;
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return code;
  }

1535
  pRes->code = tscDoLocalMerge(pSql);
H
hzcheng 已提交
1536 1537

  if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
1538
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1539
    tscCreateResPointerInfo(pRes, pQueryInfo);
1540
    tscSetResRawPtr(pRes, pQueryInfo);
H
hzcheng 已提交
1541 1542 1543
  }

  pRes->row = 0;
1544
  pRes->completed = (pRes->numOfRows == 0);
H
hzcheng 已提交
1545

H
Haojun Liao 已提交
1546
  code = pRes->code;
H
hjxilinx 已提交
1547 1548 1549 1550
  if (pRes->code == TSDB_CODE_SUCCESS) {
    (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
  } else {
    tscQueueAsyncRes(pSql);
H
hzcheng 已提交
1551 1552 1553 1554 1555
  }

  return code;
}

S
slguan 已提交
1556
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
H
hzcheng 已提交
1557

1558
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
H
hzcheng 已提交
1559
  STscObj *pObj = pSql->pTscObj;
S
slguan 已提交
1560
  SSqlCmd *pCmd = &pSql->cmd;
S
slguan 已提交
1561
  pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
S
TD-1762  
Shengliang Guan 已提交
1562
  pCmd->payloadLen = sizeof(SConnectMsg);
H
hzcheng 已提交
1563

S
slguan 已提交
1564 1565
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
    tscError("%p failed to malloc for query msg", pSql);
1566
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1567 1568
  }

S
TD-1762  
Shengliang Guan 已提交
1569
  SConnectMsg *pConnect = (SConnectMsg*)pCmd->payload;
H
hzcheng 已提交
1570

H
Haojun Liao 已提交
1571
  // TODO refactor full_name
H
hzcheng 已提交
1572 1573 1574
  char *db;  // ugly code to move the space
  db = strstr(pObj->db, TS_PATH_DELIMITER);
  db = (db == NULL) ? pObj->db : db + 1;
B
Bomin Zhang 已提交
1575 1576 1577
  tstrncpy(pConnect->db, db, sizeof(pConnect->db));
  tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion));
  tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion));
H
hzcheng 已提交
1578

H
Haojun Liao 已提交
1579 1580 1581
  pConnect->pid = htonl(taosGetPId());
  taosGetCurrentAPPName(pConnect->appName, NULL);

1582
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1583 1584
}

H
hjxilinx 已提交
1585
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
1586 1587 1588
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

H
hjxilinx 已提交
1589
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
1590

S
TD-1732  
Shengliang Guan 已提交
1591
  STableInfoMsg *pInfoMsg = (STableInfoMsg *)pCmd->payload;
H
hjxilinx 已提交
1592
  strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
H
hjxilinx 已提交
1593
  pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
S
slguan 已提交
1594

S
TD-1732  
Shengliang Guan 已提交
1595
  char *pMsg = (char *)pInfoMsg + sizeof(STableInfoMsg);
H
hzcheng 已提交
1596

1597 1598
  if (pCmd->autoCreated && pCmd->pTagData != NULL) {
    int len = htonl(pCmd->pTagData->dataLen);
B
Bomin Zhang 已提交
1599
    if (len > 0) {
1600 1601
      len += sizeof(pCmd->pTagData->name) + sizeof(pCmd->pTagData->dataLen);
      memcpy(pInfoMsg->tags, pCmd->pTagData, len);
B
Bomin Zhang 已提交
1602 1603
      pMsg += len;
    }
H
hzcheng 已提交
1604 1605
  }

S
TD-1057  
Shengliang Guan 已提交
1606
  pCmd->payloadLen = (int32_t)(pMsg - (char*)pInfoMsg);
S
slguan 已提交
1607
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
H
hzcheng 已提交
1608

1609
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1610 1611
}

S
slguan 已提交
1612
/**
1613
 *  multi table meta req pkg format:
S
TD-1732  
Shengliang Guan 已提交
1614
 *  | SMgmtHead | SMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
S
slguan 已提交
1615 1616
 *      no used         4B
 **/
1617
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
guanshengliang's avatar
guanshengliang 已提交
1618
#if 0
S
slguan 已提交
1619 1620 1621 1622 1623
  SSqlCmd *pCmd = &pSql->cmd;

  // copy payload content to temp buff
  char *tmpData = 0;
  if (pCmd->payloadLen > 0) {
H
Haojun Liao 已提交
1624
    if ((tmpData = calloc(1, pCmd->payloadLen + 1)) == NULL) return -1;
S
slguan 已提交
1625 1626 1627 1628 1629
    memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
  }

  // fill head info
  SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
H
Haojun Liao 已提交
1630
  memset(pMgmt->db, 0, TSDB_TABLE_FNAME_LEN);  // server don't need the db
S
slguan 已提交
1631

S
TD-1732  
Shengliang Guan 已提交
1632
  SMultiTableInfoMsg *pInfoMsg = (SMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
S
slguan 已提交
1633
  pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
S
slguan 已提交
1634 1635

  if (pCmd->payloadLen > 0) {
S
slguan 已提交
1636
    memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
S
slguan 已提交
1637 1638
  }

S
TD-1848  
Shengliang Guan 已提交
1639
  tfree(tmpData);
S
slguan 已提交
1640

S
TD-1732  
Shengliang Guan 已提交
1641
  pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SMultiTableInfoMsg);
S
slguan 已提交
1642
  pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
S
slguan 已提交
1643 1644 1645

  assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);

1646
  tscDebug("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
S
slguan 已提交
1647 1648 1649
           pCmd->payloadLen);

  return pCmd->payloadLen;
guanshengliang's avatar
guanshengliang 已提交
1650 1651
#endif
  return 0;  
S
slguan 已提交
1652 1653
}

H
hjxilinx 已提交
1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670
//static UNUSED_FUNC int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
////  const int32_t defaultSize =
////      minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
////  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
////
////  int32_t n = 0;
////  size_t size = taosArrayGetSize(pQueryInfo->tagCond.pCond);
////  for (int32_t i = 0; i < size; ++i) {
////    assert(0);
//////    n += strlen(pQueryInfo->tagCond.cond[i].cond);
////  }
////
////  int32_t tagLen = n * TSDB_NCHAR_SIZE;
////  if (pQueryInfo->tagCond.tbnameCond.cond != NULL) {
////    tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
////  }
////
H
Haojun Liao 已提交
1671
////  int32_t joinCondLen = (TSDB_TABLE_FNAME_LEN + sizeof(int16_t)) * 2;
H
hjxilinx 已提交
1672 1673 1674 1675 1676 1677 1678 1679
////  int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
////
////  int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndex);
////
////  int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize;
////
////  return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
//}
H
hzcheng 已提交
1680

H
hjxilinx 已提交
1681 1682
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hjxilinx 已提交
1683 1684 1685
  
  char* pMsg = pCmd->payload;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
S
TD-1732  
Shengliang Guan 已提交
1686 1687

  SSTableVgroupMsg *pStableVgroupMsg = (SSTableVgroupMsg *)pMsg;
H
hjxilinx 已提交
1688
  pStableVgroupMsg->numOfTables = htonl(pQueryInfo->numOfTables);
S
TD-1732  
Shengliang Guan 已提交
1689 1690 1691
  pMsg += sizeof(SSTableVgroupMsg);

  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
1692
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
1693 1694 1695
    size_t size = sizeof(pTableMetaInfo->name);
    tstrncpy(pMsg, pTableMetaInfo->name, size);
    pMsg += size;
H
hjxilinx 已提交
1696
  }
H
hjxilinx 已提交
1697 1698

  pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
S
TD-1057  
Shengliang Guan 已提交
1699
  pCmd->payloadLen = (int32_t)(pMsg - pCmd->payload);
H
hjxilinx 已提交
1700

1701
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1702 1703
}

1704 1705
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
  SSqlCmd *pCmd = &pSql->cmd;
H
hzcheng 已提交
1706 1707
  STscObj *pObj = pSql->pTscObj;

1708
  pthread_mutex_lock(&pObj->mutex);
H
hzcheng 已提交
1709

S
Shengliang Guan 已提交
1710
  int32_t numOfQueries = 2;
H
hzcheng 已提交
1711 1712 1713
  SSqlObj *tpSql = pObj->sqlList;
  while (tpSql) {
    tpSql = tpSql->next;
1714
    numOfQueries++;
H
hzcheng 已提交
1715 1716
  }

S
Shengliang Guan 已提交
1717
  int32_t numOfStreams = 2;
H
hzcheng 已提交
1718 1719 1720
  SSqlStream *pStream = pObj->streamList;
  while (pStream) {
    pStream = pStream->next;
1721
    numOfStreams++;
H
hzcheng 已提交
1722 1723
  }

S
TD-1732  
Shengliang Guan 已提交
1724
  int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SHeartBeatMsg) + 100;
S
slguan 已提交
1725
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
1726
    pthread_mutex_unlock(&pObj->mutex);
H
Haojun Liao 已提交
1727
    tscError("%p failed to create heartbeat msg", pSql);
H
Haojun Liao 已提交
1728
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
1729
  }
H
hzcheng 已提交
1730

H
Haojun Liao 已提交
1731
  // TODO the expired hb and client can not be identified by server till now.
S
TD-1732  
Shengliang Guan 已提交
1732
  SHeartBeatMsg *pHeartbeat = (SHeartBeatMsg *)pCmd->payload;
H
Haojun Liao 已提交
1733 1734
  tstrncpy(pHeartbeat->clientVer, version, tListLen(pHeartbeat->clientVer));

1735 1736
  pHeartbeat->numOfQueries = numOfQueries;
  pHeartbeat->numOfStreams = numOfStreams;
H
Haojun Liao 已提交
1737 1738 1739 1740

  pHeartbeat->pid = htonl(taosGetPId());
  taosGetCurrentAPPName(pHeartbeat->appName, NULL);

1741
  int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj);
H
hzcheng 已提交
1742 1743 1744 1745

  pthread_mutex_unlock(&pObj->mutex);

  pCmd->payloadLen = msgLen;
S
slguan 已提交
1746
  pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
H
hzcheng 已提交
1747

1748
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1749 1750
}

1751 1752
int tscProcessTableMetaRsp(SSqlObj *pSql) {
  STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
H
hzcheng 已提交
1753

H
Haojun Liao 已提交
1754
  pMetaMsg->tid = htonl(pMetaMsg->tid);
1755
  pMetaMsg->sversion = htons(pMetaMsg->sversion);
1756
  pMetaMsg->tversion = htons(pMetaMsg->tversion);
H
hjxilinx 已提交
1757 1758
  pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
  
1759 1760
  pMetaMsg->uid = htobe64(pMetaMsg->uid);
  pMetaMsg->contLen = htons(pMetaMsg->contLen);
1761
  pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
H
hzcheng 已提交
1762

H
Haojun Liao 已提交
1763
  if ((pMetaMsg->tableType != TSDB_SUPER_TABLE) &&
H
Haojun Liao 已提交
1764
      (pMetaMsg->tid <= 0 || pMetaMsg->vgroup.vgId < 2 || pMetaMsg->vgroup.numOfEps <= 0)) {
H
Haojun Liao 已提交
1765
    tscError("invalid value in table numOfEps:%d, vgId:%d tid:%d, name:%s", pMetaMsg->vgroup.numOfEps, pMetaMsg->vgroup.vgId,
H
Haojun Liao 已提交
1766
             pMetaMsg->tid, pMetaMsg->tableId);
1767
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1768 1769
  }

B
Bomin Zhang 已提交
1770
  if (pMetaMsg->numOfTags > TSDB_MAX_TAGS) {
1771
    tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
1772
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1773 1774
  }

1775 1776
  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
    tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
1777
    return TSDB_CODE_TSC_INVALID_VALUE;
H
hzcheng 已提交
1778 1779
  }

1780 1781
  for (int i = 0; i < pMetaMsg->vgroup.numOfEps; ++i) {
    pMetaMsg->vgroup.epAddr[i].port = htons(pMetaMsg->vgroup.epAddr[i].port);
H
hzcheng 已提交
1782 1783
  }

1784
  SSchema* pSchema = pMetaMsg->schema;
H
hzcheng 已提交
1785

1786
  int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
H
hzcheng 已提交
1787 1788 1789
  for (int i = 0; i < numOfTotalCols; ++i) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);
1790 1791 1792 1793 1794

    if (pSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
      assert(i == 0);
    }

H
hjxilinx 已提交
1795
    assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR);
H
hzcheng 已提交
1796 1797 1798
    pSchema++;
  }

1799 1800
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
1801
  
H
hzcheng 已提交
1802
  // todo add one more function: taosAddDataIfNotExists();
1803
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
hjxilinx 已提交
1804
  assert(pTableMetaInfo->pTableMeta == NULL);
H
hzcheng 已提交
1805

H
Haojun Liao 已提交
1806
  pTableMetaInfo->pTableMeta = (STableMeta *) taosCachePut(tscMetaCache, pTableMetaInfo->name,
1807
      strlen(pTableMetaInfo->name), pTableMeta, size, tsTableMetaKeepTimer * 1000);
H
Haojun Liao 已提交
1808

1809
  if (pTableMetaInfo->pTableMeta == NULL) {
B
Bomin Zhang 已提交
1810
    free(pTableMeta);
1811
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1812
  }
H
hzcheng 已提交
1813

1814
  tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->id.uid, pTableMeta->id.tid, pTableMetaInfo->name);
1815
  free(pTableMeta);
1816
  
H
hjxilinx 已提交
1817
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1818 1819
}

S
slguan 已提交
1820
/**
1821
 *  multi table meta rsp pkg format:
S
TD-1732  
Shengliang Guan 已提交
1822
 *  | STaosRsp | ieType | SMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
S
slguan 已提交
1823 1824 1825
 *  |...... 1B        1B            4B
 **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
1826
#if 0
S
slguan 已提交
1827 1828 1829 1830 1831
  char *rsp = pSql->res.pRsp;

  ieType = *rsp;
  if (ieType != TSDB_IE_TYPE_META) {
    tscError("invalid ie type:%d", ieType);
1832
    pSql->res.code = TSDB_CODE_TSC_INVALID_IE;
S
slguan 已提交
1833
    pSql->res.numOfTotal = 0;
1834
    return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1835 1836 1837 1838
  }

  rsp++;

S
TD-1732  
Shengliang Guan 已提交
1839
  SMultiTableInfoMsg *pInfo = (SMultiTableInfoMsg *)rsp;
S
slguan 已提交
1840
  totalNum = htonl(pInfo->numOfTables);
S
TD-1732  
Shengliang Guan 已提交
1841
  rsp += sizeof(SMultiTableInfoMsg);
S
slguan 已提交
1842 1843

  for (i = 0; i < totalNum; i++) {
S
slguan 已提交
1844
    SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
1845
    STableMeta *     pMeta = pMultiMeta->metas;
S
slguan 已提交
1846 1847 1848

    pMeta->sid = htonl(pMeta->sid);
    pMeta->sversion = htons(pMeta->sversion);
H
hjxilinx 已提交
1849
    pMeta->vgId = htonl(pMeta->vgId);
S
slguan 已提交
1850 1851
    pMeta->uid = htobe64(pMeta->uid);

H
hjxilinx 已提交
1852 1853
    if (pMeta->sid <= 0 || pMeta->vgId < 0) {
      tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
1854
      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
S
slguan 已提交
1855
      pSql->res.numOfTotal = i;
1856
      return TSDB_CODE_TSC_APP_ERROR;
S
slguan 已提交
1857 1858
    }

H
hjxilinx 已提交
1859 1860 1861 1862
    //    pMeta->numOfColumns = htons(pMeta->numOfColumns);
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid tag value count:%d", pMeta->numOfTags);
1863
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1864
    //      pSql->res.numOfTotal = i;
1865
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1866 1867 1868 1869
    //    }
    //
    //    if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
    //      tscError("invalid numOfTags:%d", pMeta->numOfTags);
1870
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1871
    //      pSql->res.numOfTotal = i;
1872
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1873 1874 1875 1876
    //    }
    //
    //    if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
    //      tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
1877
    //      pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
H
hjxilinx 已提交
1878
    //      pSql->res.numOfTotal = i;
1879
    //      return TSDB_CODE_TSC_APP_ERROR;
H
hjxilinx 已提交
1880 1881
    //    }
    //
H
hjxilinx 已提交
1882
    //    for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) {
H
hjxilinx 已提交
1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916
    //      pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
    //    }
    //
    //    pMeta->rowSize = 0;
    //    rsp += sizeof(SMultiTableMeta);
    //    pSchema = (SSchema *)rsp;
    //
    //    int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
    //    for (int j = 0; j < numOfTotalCols; ++j) {
    //      pSchema->bytes = htons(pSchema->bytes);
    //      pSchema->colId = htons(pSchema->colId);
    //
    //      // ignore the tags length
    //      if (j < pMeta->numOfColumns) {
    //        pMeta->rowSize += pSchema->bytes;
    //      }
    //      pSchema++;
    //    }
    //
    //    rsp += numOfTotalCols * sizeof(SSchema);
    //
    //    int32_t  tagLen = 0;
    //    SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
    //
    //    if (pMeta->tableType == TSDB_CHILD_TABLE) {
    //      for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
    //        tagLen += pTagsSchema[j].bytes;
    //      }
    //    }
    //
    //    rsp += tagLen;
    //    int32_t size = (int32_t)(rsp - ((char *)pMeta));  // Consistent with STableMeta in cache
    //
    //    pMeta->index = 0;
H
Haojun Liao 已提交
1917
    //    (void)taosCachePut(tscMetaCache, pMeta->tableId, (char *)pMeta, size, tsTableMetaKeepTimer);
H
hjxilinx 已提交
1918
    //  }
S
slguan 已提交
1919
  }
H
hjxilinx 已提交
1920
  
S
slguan 已提交
1921 1922
  pSql->res.code = TSDB_CODE_SUCCESS;
  pSql->res.numOfTotal = i;
1923
  tscDebug("%p load multi-metermeta resp from complete num:%d", pSql, pSql->res.numOfTotal);
1924 1925
#endif
  
S
slguan 已提交
1926 1927 1928
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
1929
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
1930
  SSqlRes* pRes = &pSql->res;
H
hjxilinx 已提交
1931
  
H
hjxilinx 已提交
1932
  // NOTE: the order of several table must be preserved.
S
TD-1732  
Shengliang Guan 已提交
1933
  SSTableVgroupRspMsg *pStableVgroup = (SSTableVgroupRspMsg *)pRes->pRsp;
H
hjxilinx 已提交
1934
  pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables);
S
TD-1732  
Shengliang Guan 已提交
1935 1936
  char *pMsg = pRes->pRsp + sizeof(SSTableVgroupRspMsg);

1937 1938 1939
  // master sqlObj locates in param
  SSqlObj* parent = pSql->param;
  assert(parent != NULL);
H
hjxilinx 已提交
1940
  
1941
  SSqlCmd* pCmd = &parent->cmd;
H
hjxilinx 已提交
1942 1943 1944
  for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) {
    STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);

H
Haojun Liao 已提交
1945 1946 1947
    SVgroupsMsg *  pVgroupMsg = (SVgroupsMsg *) pMsg;
    pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups);

S
TD-1732  
Shengliang Guan 已提交
1948
    size_t size = sizeof(SVgroupMsg) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsMsg);
H
Haojun Liao 已提交
1949

S
TD-1732  
Shengliang Guan 已提交
1950
    size_t vgroupsz = sizeof(SVgroupInfo) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsInfo);
H
Haojun Liao 已提交
1951
    pInfo->vgroupList = calloc(1, vgroupsz);
H
hjxilinx 已提交
1952 1953
    assert(pInfo->vgroupList != NULL);

H
Haojun Liao 已提交
1954
    pInfo->vgroupList->numOfVgroups = pVgroupMsg->numOfVgroups;
H
hjxilinx 已提交
1955
    for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
dengyihao's avatar
dengyihao 已提交
1956
      //just init, no need to lock
S
TD-1732  
Shengliang Guan 已提交
1957
      SVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
H
Haojun Liao 已提交
1958

S
TD-1732  
Shengliang Guan 已提交
1959
      SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j];
H
Haojun Liao 已提交
1960 1961
      pVgroups->vgId = htonl(vmsg->vgId);
      pVgroups->numOfEps = vmsg->numOfEps;
H
Haojun Liao 已提交
1962

H
Haojun Liao 已提交
1963
      assert(pVgroups->numOfEps >= 1 && pVgroups->vgId >= 1);
H
hjxilinx 已提交
1964

1965
      for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
H
Haojun Liao 已提交
1966 1967
        pVgroups->epAddr[k].port = htons(vmsg->epAddr[k].port);
        pVgroups->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, tListLen(vmsg->epAddr[k].fqdn));
H
hjxilinx 已提交
1968
      }
1969
    }
1970 1971

    pMsg += size;
H
hjxilinx 已提交
1972 1973
  }
  
S
slguan 已提交
1974
  return pSql->res.code;
H
hzcheng 已提交
1975 1976 1977 1978 1979 1980
}

/*
 * current process do not use the cache at all
 */
int tscProcessShowRsp(SSqlObj *pSql) {
S
TD-1732  
Shengliang Guan 已提交
1981 1982 1983 1984
  STableMetaMsg *pMetaMsg;
  SShowRsp *     pShow;
  SSchema *      pSchema;
  char           key[20];
H
hzcheng 已提交
1985

1986 1987 1988
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

H
hjxilinx 已提交
1989
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
1990

H
hjxilinx 已提交
1991
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
1992

S
TD-1732  
Shengliang Guan 已提交
1993
  pShow = (SShowRsp *)pRes->pRsp;
S
slguan 已提交
1994
  pShow->qhandle = htobe64(pShow->qhandle);
H
hzcheng 已提交
1995 1996
  pRes->qhandle = pShow->qhandle;

S
slguan 已提交
1997
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
1998
  pMetaMsg = &(pShow->tableMeta);
H
hzcheng 已提交
1999

H
hjxilinx 已提交
2000
  pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns);
H
hzcheng 已提交
2001

H
hjxilinx 已提交
2002
  pSchema = pMetaMsg->schema;
H
Haojun Liao 已提交
2003
  pMetaMsg->tid = ntohs(pMetaMsg->tid);
H
hjxilinx 已提交
2004
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
hzcheng 已提交
2005 2006 2007 2008
    pSchema->bytes = htons(pSchema->bytes);
    pSchema++;
  }

2009
  key[0] = pCmd->msgType + 'a';
H
hzcheng 已提交
2010 2011
  strcpy(key + 1, "showlist");

H
Haojun Liao 已提交
2012 2013 2014 2015
  if (pTableMetaInfo->pTableMeta != NULL) {
    taosCacheRelease(tscMetaCache, (void *)&(pTableMetaInfo->pTableMeta), false);
  }

H
hjxilinx 已提交
2016 2017 2018
  size_t size = 0;
  STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
  
H
Haojun Liao 已提交
2019
  pTableMetaInfo->pTableMeta = taosCachePut(tscMetaCache, key, strlen(key), (char *)pTableMeta, size,
2020
      tsTableMetaKeepTimer * 1000);
H
hjxilinx 已提交
2021
  SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2022

2023 2024 2025 2026
  if (pQueryInfo->colList == NULL) {
    pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
  }
  
H
hjxilinx 已提交
2027 2028
  SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
  
S
slguan 已提交
2029
  SColumnIndex index = {0};
H
hjxilinx 已提交
2030 2031 2032
  pSchema = pMetaMsg->schema;
  
  for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) {
S
slguan 已提交
2033
    index.columnIndex = i;
2034 2035
    tscColumnListInsert(pQueryInfo->colList, &index);
    
H
hjxilinx 已提交
2036
    TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes);
H
Haojun Liao 已提交
2037
    SInternalField* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
H
hjxilinx 已提交
2038
    
H
hjxilinx 已提交
2039
    pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
H
Haojun Liao 已提交
2040
                     pTableSchema[i].type, pTableSchema[i].bytes, getNewResColId(pQueryInfo), pTableSchema[i].bytes, false);
H
hzcheng 已提交
2041
  }
H
hjxilinx 已提交
2042 2043
  
  pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2044
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
2045
  
S
TD-1848  
Shengliang Guan 已提交
2046
  tfree(pTableMeta);
H
hzcheng 已提交
2047 2048 2049
  return 0;
}

2050
// TODO multithread problem
2051
static void createHBObj(STscObj* pObj) {
2052
  if (pObj->hbrid != 0) {
2053 2054 2055 2056 2057 2058 2059 2060
    return;
  }

  SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
  if (NULL == pSql) return;

  pSql->fp = tscProcessHeartBeatRsp;

2061 2062
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetailSafely(&pSql->cmd, 0);
  if (pQueryInfo == NULL) {
H
Haojun Liao 已提交
2063 2064
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    tfree(pSql);
2065 2066 2067
    return;
  }

2068 2069 2070 2071
  pQueryInfo->command = TSDB_SQL_HB;

  pSql->cmd.command = pQueryInfo->command;
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
H
Haojun Liao 已提交
2072
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
S
TD-1848  
Shengliang Guan 已提交
2073
    tfree(pSql);
2074 2075 2076 2077 2078 2079 2080
    return;
  }

  pSql->param = pObj;
  pSql->pTscObj = pObj;
  pSql->signature = pSql;

2081 2082
  registerSqlObj(pSql);
  tscDebug("%p HB is allocated, pObj:%p", pSql, pObj);
2083

2084
  pObj->hbrid = pSql->self;
2085 2086
}

H
hzcheng 已提交
2087 2088 2089 2090
int tscProcessConnectRsp(SSqlObj *pSql) {
  STscObj *pObj = pSql->pTscObj;
  SSqlRes *pRes = &pSql->res;

H
Haojun Liao 已提交
2091 2092
  char temp[TSDB_TABLE_FNAME_LEN * 2] = {0};

S
TD-1762  
Shengliang Guan 已提交
2093
  SConnectRsp *pConnect = (SConnectRsp *)pRes->pRsp;
B
Bomin Zhang 已提交
2094
  tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId));  // copy acctId from response
2095 2096
  int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);

B
Bomin Zhang 已提交
2097 2098
  assert(len <= sizeof(pObj->db));
  tstrncpy(pObj->db, temp, sizeof(pObj->db));
H
hjxilinx 已提交
2099
  
dengyihao's avatar
dengyihao 已提交
2100 2101
  if (pConnect->epSet.numOfEps > 0) {
    tscEpSetHtons(&pConnect->epSet);
dengyihao's avatar
TD-2257  
dengyihao 已提交
2102
    tscUpdateMgmtEpSet(pSql, &pConnect->epSet);
dengyihao's avatar
dengyihao 已提交
2103
  } 
H
hzcheng 已提交
2104

S
slguan 已提交
2105
  strcpy(pObj->sversion, pConnect->serverVersion);
H
hzcheng 已提交
2106 2107
  pObj->writeAuth = pConnect->writeAuth;
  pObj->superAuth = pConnect->superAuth;
S
Shengliang Guan 已提交
2108
  pObj->connId = htonl(pConnect->connId);
2109 2110

  createHBObj(pObj);
H
Haojun Liao 已提交
2111 2112

  //launch a timer to send heartbeat to maintain the connection and send status to mnode
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
2113
  taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, (void *)pObj->rid, tscTmr, &pObj->pTimer);
H
hzcheng 已提交
2114 2115 2116 2117 2118

  return 0;
}

int tscProcessUseDbRsp(SSqlObj *pSql) {
S
slguan 已提交
2119
  STscObj *       pObj = pSql->pTscObj;
2120
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2121

B
Bomin Zhang 已提交
2122
  tstrncpy(pObj->db, pTableMetaInfo->name, sizeof(pObj->db));
H
hzcheng 已提交
2123 2124 2125
  return 0;
}

Y
TD-1039  
yihaoDeng 已提交
2126 2127
int tscProcessDropDbRsp(SSqlObj *pSql) {
  pSql->pTscObj->db[0] = 0;
H
Haojun Liao 已提交
2128
  taosCacheEmpty(tscMetaCache);
H
hzcheng 已提交
2129 2130 2131 2132
  return 0;
}

int tscProcessDropTableRsp(SSqlObj *pSql) {
2133
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2134

H
Haojun Liao 已提交
2135
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
Haojun Liao 已提交
2136
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2137 2138 2139 2140 2141 2142 2143
    return 0;
  }

  /*
   * 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
   * 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
   * here the table will reside in a new vnode.
S
slguan 已提交
2144 2145
   * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
   * instead.
H
hzcheng 已提交
2146
   */
2147
  tscDebug("%p force release table meta after drop table:%s", pSql, pTableMetaInfo->name);
H
Haojun Liao 已提交
2148
  taosCacheRelease(tscMetaCache, (void **)&pTableMeta, true);
H
hzcheng 已提交
2149

H
hjxilinx 已提交
2150
  if (pTableMetaInfo->pTableMeta) {
H
Haojun Liao 已提交
2151
    taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2152 2153 2154 2155 2156 2157
  }

  return 0;
}

int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
2158
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
slguan 已提交
2159

H
Haojun Liao 已提交
2160
  STableMeta *pTableMeta = taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2161
  if (pTableMeta == NULL) { /* not in cache, abort */
H
hzcheng 已提交
2162 2163 2164
    return 0;
  }

2165
  tscDebug("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
H
Haojun Liao 已提交
2166
  taosCacheRelease(tscMetaCache, (void **)&pTableMeta, true);
H
hzcheng 已提交
2167

H
hjxilinx 已提交
2168
  if (pTableMetaInfo->pTableMeta) {
weixin_48148422's avatar
weixin_48148422 已提交
2169
    bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
H
Haojun Liao 已提交
2170
    taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
H
hzcheng 已提交
2171

2172
    if (isSuperTable) {  // if it is a super table, reset whole query cache
2173
      tscDebug("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
H
Haojun Liao 已提交
2174
      taosCacheEmpty(tscMetaCache);
H
hzcheng 已提交
2175 2176 2177 2178 2179 2180 2181 2182 2183 2184
    }
  }

  return 0;
}

int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
  UNUSED(pSql);
  return 0;
}
2185 2186 2187
int tscProcessShowCreateRsp(SSqlObj *pSql) {
  return tscLocalResultCommonBuilder(pSql, 1);
}
H
hzcheng 已提交
2188 2189 2190 2191

int tscProcessQueryRsp(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;

2192
  SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
S
slguan 已提交
2193 2194 2195
  pQuery->qhandle = htobe64(pQuery->qhandle);
  pRes->qhandle = pQuery->qhandle;

H
hzcheng 已提交
2196
  pRes->data = NULL;
S
slguan 已提交
2197
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
2198 2199 2200
  return 0;
}

H
hjxilinx 已提交
2201
int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
S
slguan 已提交
2202 2203 2204
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

S
slguan 已提交
2205
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
2206 2207 2208 2209
  if (pRetrieve == NULL) {
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pRes->code;
  }
H
hzcheng 已提交
2210 2211 2212

  pRes->numOfRows = htonl(pRetrieve->numOfRows);
  pRes->precision = htons(pRetrieve->precision);
2213 2214
  pRes->offset    = htobe64(pRetrieve->offset);
  pRes->useconds  = htobe64(pRetrieve->useconds);
H
hjxilinx 已提交
2215
  pRes->completed = (pRetrieve->completed == 1);
2216
  pRes->data      = pRetrieve->data;
H
hjxilinx 已提交
2217
  
2218
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2219 2220 2221
  if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
    return pRes->code;
  }
2222

H
Haojun Liao 已提交
2223 2224 2225 2226 2227 2228
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  if (pCmd->command == TSDB_SQL_RETRIEVE) {
    tscSetResRawPtr(pRes, pQueryInfo);
  } else if ((UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY)) {
    tscSetResRawPtr(pRes, pQueryInfo);
  } else if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
2229 2230 2231
    tscSetResRawPtr(pRes, pQueryInfo);
  }

weixin_48148422's avatar
weixin_48148422 已提交
2232
  if (pSql->pSubscription != NULL) {
H
hjxilinx 已提交
2233
    int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
H
hjxilinx 已提交
2234
    
H
hjxilinx 已提交
2235
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
H
hjxilinx 已提交
2236 2237
    int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
    
weixin_48148422's avatar
weixin_48148422 已提交
2238 2239
    char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;

S
slguan 已提交
2240
    int32_t numOfTables = htonl(*(int32_t*)p);
weixin_48148422's avatar
weixin_48148422 已提交
2241
    p += sizeof(int32_t);
S
slguan 已提交
2242
    for (int i = 0; i < numOfTables; i++) {
weixin_48148422's avatar
weixin_48148422 已提交
2243 2244
      int64_t uid = htobe64(*(int64_t*)p);
      p += sizeof(int64_t);
weixin_48148422's avatar
weixin_48148422 已提交
2245
      p += sizeof(int32_t); // skip tid
weixin_48148422's avatar
weixin_48148422 已提交
2246 2247
      TSKEY key = htobe64(*(TSKEY*)p);
      p += sizeof(TSKEY);
2248
      tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
weixin_48148422's avatar
weixin_48148422 已提交
2249
    }
2250 2251
  }

H
hzcheng 已提交
2252
  pRes->row = 0;
H
Haojun Liao 已提交
2253
  tscDebug("%p numOfRows:%d, offset:%" PRId64 ", complete:%d", pSql, pRes->numOfRows, pRes->offset, pRes->completed);
H
hzcheng 已提交
2254 2255 2256 2257

  return 0;
}

H
hjxilinx 已提交
2258
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
H
hzcheng 已提交
2259

2260
static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
S
slguan 已提交
2261 2262
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
  if (NULL == pNew) {
2263
    tscError("%p malloc failed for new sqlobj to get table meta", pSql);
2264
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2265
  }
2266

H
hzcheng 已提交
2267 2268 2269
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;
  pNew->cmd.command = TSDB_SQL_META;
S
slguan 已提交
2270

2271
  tscAddSubqueryInfo(&pNew->cmd);
2272

2273
  SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0);
2274

H
hjxilinx 已提交
2275
  pNew->cmd.autoCreated = pSql->cmd.autoCreated;  // create table if not exists
B
Bomin Zhang 已提交
2276
  if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) {
2277
    tscError("%p malloc failed for payload to get table meta", pSql);
2278
    tscFreeSqlObj(pNew);
2279
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
2280 2281
  }

H
hjxilinx 已提交
2282
  STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
2283
  assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hzcheng 已提交
2284

B
Bomin Zhang 已提交
2285
  tstrncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, sizeof(pNewMeterMetaInfo->name));
2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297

  if (pSql->cmd.pTagData != NULL) {
    int size = offsetof(STagData, data) + htonl(pSql->cmd.pTagData->dataLen);
    pNew->cmd.pTagData = calloc(1, size);
    if (pNew->cmd.pTagData == NULL) {
      tscError("%p malloc failed for new tag data to get table meta", pSql);
      tscFreeSqlObj(pNew);
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
    memcpy(pNew->cmd.pTagData, pSql->cmd.pTagData, size);
  }

2298
  tscDebug("%p new pSqlObj:%p to get tableMeta, auto create:%d", pSql, pNew, pNew->cmd.autoCreated);
H
hzcheng 已提交
2299

H
hjxilinx 已提交
2300 2301
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
H
hzcheng 已提交
2302

2303 2304
  registerSqlObj(pNew);

H
hjxilinx 已提交
2305 2306
  int32_t code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2307
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;  // notify upper application that current process need to be terminated
H
hzcheng 已提交
2308 2309 2310 2311 2312
  }

  return code;
}

H
hjxilinx 已提交
2313
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
H
hjxilinx 已提交
2314
  assert(strlen(pTableMetaInfo->name) != 0);
S
slguan 已提交
2315

2316
  // If this STableMetaInfo owns a table meta, release it first
H
hjxilinx 已提交
2317
  if (pTableMetaInfo->pTableMeta != NULL) {
H
Haojun Liao 已提交
2318
    taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), false);
2319 2320
  }
  
H
Haojun Liao 已提交
2321
  pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
H
hjxilinx 已提交
2322
  if (pTableMetaInfo->pTableMeta != NULL) {
H
hjxilinx 已提交
2323
    STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
2324
    tscDebug("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
2325
             tinfo.numOfTags, pTableMetaInfo->pTableMeta);
H
hzcheng 已提交
2326 2327 2328

    return TSDB_CODE_SUCCESS;
  }
2329 2330
  
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2331 2332
}

H
hjxilinx 已提交
2333
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
H
hjxilinx 已提交
2334
  pSql->cmd.autoCreated = createIfNotExists;
H
hjxilinx 已提交
2335
  return tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
2336 2337 2338
}

/**
H
Haojun Liao 已提交
2339
 * retrieve table meta from mnode, and update the local table meta cache.
H
hzcheng 已提交
2340
 * @param pSql          sql object
B
Bomin Zhang 已提交
2341
 * @param tableIndex    table index
H
hzcheng 已提交
2342 2343
 * @return              status code
 */
B
Bomin Zhang 已提交
2344
int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
H
hzcheng 已提交
2345
  SSqlCmd *pCmd = &pSql->cmd;
2346 2347

  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
B
Bomin Zhang 已提交
2348
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
H
hzcheng 已提交
2349

H
Haojun Liao 已提交
2350 2351
  STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
  if (pTableMetaInfo->pTableMeta) {
2352
    tscDebug("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
2353
             tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid, pTableMeta);
H
hzcheng 已提交
2354 2355
  }

H
Haojun Liao 已提交
2356
  taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
H
Haojun Liao 已提交
2357
  return getTableMetaFromMgmt(pSql, pTableMetaInfo);
H
hzcheng 已提交
2358 2359
}

H
hjxilinx 已提交
2360
static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
2361
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2362
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2363
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
H
hjxilinx 已提交
2364 2365
    if (pTableMetaInfo->vgroupList == NULL) {
      return false;
S
slguan 已提交
2366 2367
    }
  }
H
hjxilinx 已提交
2368 2369 2370 2371
  
  // all super tables vgroupinfo are retrieved, no need to retrieve vgroup info anymore
  return true;
}
H
hzcheng 已提交
2372

H
hjxilinx 已提交
2373
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
2374
  int      code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
hjxilinx 已提交
2375 2376 2377
  SSqlCmd *pCmd = &pSql->cmd;
  
  if (allVgroupInfoRetrieved(pCmd, clauseIndex)) {
H
hzcheng 已提交
2378 2379
    return TSDB_CODE_SUCCESS;
  }
H
hjxilinx 已提交
2380

S
slguan 已提交
2381
  SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
H
hzcheng 已提交
2382 2383 2384
  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

H
hjxilinx 已提交
2385
  pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
H
Haojun Liao 已提交
2386 2387

  // TODO TEST IT
2388 2389
  SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0);
  if (pNewQueryInfo == NULL) {
B
Bomin Zhang 已提交
2390
    tscFreeSqlObj(pNew);
2391 2392
    return code;
  }
2393
  
H
hjxilinx 已提交
2394
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
2395
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
2396
    STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
H
Haojun Liao 已提交
2397
    STableMeta *pTableMeta = taosCacheAcquireByData(tscMetaCache, pMInfo->pTableMeta);
H
Haojun Liao 已提交
2398
    tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList, pMInfo->pVgroupTables);
S
slguan 已提交
2399 2400 2401 2402 2403 2404
  }

  if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return code;
  }
H
hzcheng 已提交
2405

2406
  pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
2407
  registerSqlObj(pNew);
H
Haojun Liao 已提交
2408

2409
  tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
H
hzcheng 已提交
2410

2411 2412 2413 2414
  pNew->fp = tscTableMetaCallBack;
  pNew->param = pSql;
  code = tscProcessSql(pNew);
  if (code == TSDB_CODE_SUCCESS) {
2415
    code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;
H
hzcheng 已提交
2416 2417 2418 2419 2420
  }

  return code;
}

2421
void tscInitMsgsFp() {
S
slguan 已提交
2422 2423
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
J
jtao1735 已提交
2424
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;
H
hzcheng 已提交
2425 2426

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
2427
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
H
hzcheng 已提交
2428

2429 2430
  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
H
hzcheng 已提交
2431 2432

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
S
[TD-16]  
slguan 已提交
2433
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserMsg;
H
hzcheng 已提交
2434 2435 2436
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
2437
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
S
slguan 已提交
2438 2439 2440
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
H
hzcheng 已提交
2441
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
2442
  tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
H
hzcheng 已提交
2443 2444 2445 2446
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;

  tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
H
hjxilinx 已提交
2447
  tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
H
hjxilinx 已提交
2448
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
S
slguan 已提交
2449
  tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
H
hzcheng 已提交
2450 2451 2452 2453

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
2454 2455 2456
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
H
hzcheng 已提交
2457 2458

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
H
hjxilinx 已提交
2459
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
H
hzcheng 已提交
2460 2461 2462 2463 2464

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
  tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
2465
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
H
hjxilinx 已提交
2466
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
S
slguan 已提交
2467
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
H
hzcheng 已提交
2468 2469

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
H
hjxilinx 已提交
2470
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode;  // rsp handled by same function.
H
hzcheng 已提交
2471
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
2472

H
Haojun Liao 已提交
2473 2474 2475 2476 2477
  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB]   = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION]  = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS]  = tscProcessLocalRetrieveRsp;
2478

H
hzcheng 已提交
2479 2480
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

H
hjxilinx 已提交
2481
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp;
H
hzcheng 已提交
2482 2483 2484 2485

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;

2486 2487 2488 2489
  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp;
  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp;
  

H
hzcheng 已提交
2490 2491 2492 2493 2494 2495
  tscKeepConn[TSDB_SQL_SHOW] = 1;
  tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
  tscKeepConn[TSDB_SQL_SELECT] = 1;
  tscKeepConn[TSDB_SQL_FETCH] = 1;
  tscKeepConn[TSDB_SQL_HB] = 1;
}