tscUtil.c 64.9 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16 17
#include "tscUtil.h"
#include "hash.h"
18
#include "os.h"
H
hjxilinx 已提交
19
#include "qast.h"
H
hzcheng 已提交
20 21 22 23 24 25
#include "taosmsg.h"
#include "tcache.h"
#include "tkey.h"
#include "tmd5.h"
#include "tscProfile.h"
#include "tscSecondaryMerge.h"
H
hjxilinx 已提交
26
#include "tscSubquery.h"
H
hzcheng 已提交
27 28 29
#include "tschemautil.h"
#include "tsclient.h"
#include "ttimer.h"
30
#include "ttokendef.h"
S
slguan 已提交
31
#include "tscLog.h"
H
hzcheng 已提交
32

33 34 35 36
static void freeQueryInfoImpl(SQueryInfo* pQueryInfo);
static void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache);
  
  SCond* tsGetSTableQueryCond(STagCond* pTagCond, uint64_t uid) {
37 38 39 40 41 42 43 44 45 46
  if (pTagCond->pCond == NULL) {
    return NULL;
  }
  
  size_t size = taosArrayGetSize(pTagCond->pCond);
  for (int32_t i = 0; i < size; ++i) {
    SCond* pCond = taosArrayGet(pTagCond->pCond, i);
    
    if (uid == pCond->uid) {
      return pCond;
S
slguan 已提交
47 48 49 50 51 52
    }
  }

  return NULL;
}

53 54
void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, SBufferWriter* bw) {
  if (tbufTell(bw) == 0) {
S
slguan 已提交
55
    return;
H
hzcheng 已提交
56
  }
57 58 59
  
  SCond cond = {
    .uid = uid,
60
    .len = tbufTell(bw),
61 62 63
    .cond = NULL,
  };
  
64
  cond.cond = tbufGetData(bw, true);
65 66 67 68 69 70
  
  if (pTagCond->pCond == NULL) {
    pTagCond->pCond = taosArrayInit(3, sizeof(SCond));
  }
  
  taosArrayPush(pTagCond->pCond, &cond);
S
slguan 已提交
71 72
}

73
bool tscQueryOnSTable(SSqlCmd* pCmd) {
74 75 76
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

  return ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_QUERY) == TSDB_QUERY_TYPE_STABLE_QUERY) &&
S
slguan 已提交
77
         (pCmd->msgType == TSDB_MSG_TYPE_QUERY);
S
slguan 已提交
78 79
}

H
hjxilinx 已提交
80
bool tscQueryTags(SQueryInfo* pQueryInfo) {
H
hjxilinx 已提交
81
  for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
H
Haojun Liao 已提交
82 83 84 85 86 87 88 89
    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
    int32_t functId = pExpr->functionId;

    // "select count(tbname)" query
    if (functId == TSDB_FUNC_COUNT && pExpr->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
      continue;
    }

90
    if (functId != TSDB_FUNC_TAGPRJ && functId != TSDB_FUNC_TID_TAG) {
S
slguan 已提交
91 92 93
      return false;
    }
  }
H
hzcheng 已提交
94

S
slguan 已提交
95
  return true;
H
hzcheng 已提交
96 97
}

S
slguan 已提交
98 99 100 101
bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd) {
  bool    hasTags = false;
  int32_t numOfSelectivity = 0;

102 103
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);

H
hjxilinx 已提交
104
  for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
105
    int32_t functId = tscSqlExprGet(pQueryInfo, i)->functionId;
S
slguan 已提交
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
    if (functId == TSDB_FUNC_TAG_DUMMY) {
      hasTags = true;
      continue;
    }

    if ((aAggs[functId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
      numOfSelectivity++;
    }
  }

  if (numOfSelectivity > 0 && hasTags) {
    return true;
  }

  return false;
}
H
hzcheng 已提交
122

S
slguan 已提交
123 124
void tscGetDBInfoFromMeterId(char* tableId, char* db) {
  char* st = strstr(tableId, TS_PATH_DELIMITER);
H
hzcheng 已提交
125 126 127
  if (st != NULL) {
    char* end = strstr(st + 1, TS_PATH_DELIMITER);
    if (end != NULL) {
S
slguan 已提交
128 129
      memcpy(db, tableId, (end - tableId));
      db[end - tableId] = 0;
H
hzcheng 已提交
130 131 132 133 134 135 136
      return;
    }
  }

  db[0] = 0;
}

137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
//STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx) {
//  if (pSidList == NULL) {
//    tscError("illegal sidlist");
//    return 0;
//  }
//
//  if (idx < 0 || idx >= pSidList->numOfSids) {
//    int32_t sidRange = (pSidList->numOfSids > 0) ? (pSidList->numOfSids - 1) : 0;
//
//    tscError("illegal sidIdx:%d, reset to 0, sidIdx range:%d-%d", idx, 0, sidRange);
//    idx = 0;
//  }
//
//  assert(pSidList->pSidExtInfoList[idx] >= 0);
//
//  return (STableIdInfo*)(pSidList->pSidExtInfoList[idx] + (char*)pSidList);
//}
H
hzcheng 已提交
154

H
hjxilinx 已提交
155
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
156 157 158
  if (pQueryInfo == NULL) {
    return false;
  }
159

H
hjxilinx 已提交
160
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
H
hjxilinx 已提交
161
  if (pTableMetaInfo == NULL) {
162 163 164
    return false;
  }
  
H
hjxilinx 已提交
165
  // for select query super table, the super table vgroup list can not be null in any cases.
weixin_48148422's avatar
weixin_48148422 已提交
166
  if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
167
    assert(pTableMetaInfo->vgroupList != NULL);
168 169
  }
  
H
hjxilinx 已提交
170 171
  if ((pQueryInfo->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE) {
    return false;
S
slguan 已提交
172 173
  }

174 175
  // for ordered projection query, iterate all qualified vnodes sequentially
  if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, tableIndex)) {
H
hzcheng 已提交
176 177 178
    return false;
  }

179
  if (((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) != TSDB_QUERY_TYPE_STABLE_SUBQUERY) &&
180
      pQueryInfo->command == TSDB_SQL_SELECT) {
weixin_48148422's avatar
weixin_48148422 已提交
181
    return UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
H
hzcheng 已提交
182 183 184 185 186
  }

  return false;
}

187
bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
H
hjxilinx 已提交
188
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
189
  
H
hzcheng 已提交
190
  /*
191
   * In following cases, return false for non ordered project query on super table
H
Haojun Liao 已提交
192
   * 1. failed to get tableMeta from server; 2. not a super table; 3. limitation is 0;
193
   * 4. show queries, instead of a select query
H
hzcheng 已提交
194
   */
H
hjxilinx 已提交
195
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
weixin_48148422's avatar
weixin_48148422 已提交
196
  if (pTableMetaInfo == NULL || !UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) ||
H
hjxilinx 已提交
197
      pQueryInfo->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || numOfExprs == 0) {
H
hzcheng 已提交
198 199
    return false;
  }
200
  
H
hjxilinx 已提交
201
  for (int32_t i = 0; i < numOfExprs; ++i) {
202
    int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId;
H
hjxilinx 已提交
203 204
    if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TAG &&
        functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_ARITHM) {
S
slguan 已提交
205
      return false;
H
hzcheng 已提交
206 207
    }
  }
208
  
S
slguan 已提交
209
  return true;
H
hzcheng 已提交
210 211
}

H
Haojun Liao 已提交
212
// not order by timestamp projection query on super table
213 214 215 216 217
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
  if (!tscIsProjectionQueryOnSTable(pQueryInfo, tableIndex)) {
    return false;
  }
  
218
  // order by columnIndex exists, not a non-ordered projection query
219 220 221 222 223 224 225 226
  return pQueryInfo->order.orderColId < 0;
}

bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
  if (!tscIsProjectionQueryOnSTable(pQueryInfo, tableIndex)) {
    return false;
  }
  
227
  // order by columnIndex exists, a non-ordered projection query
228 229 230
  return pQueryInfo->order.orderColId >= 0;
}

231
bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo) {
H
hjxilinx 已提交
232
  for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
233
    int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId;
H
hjxilinx 已提交
234 235 236 237
    if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TS) {
      return false;
    }
  }
H
hjxilinx 已提交
238

H
hjxilinx 已提交
239 240 241
  return true;
}

242
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo) {
H
hjxilinx 已提交
243 244 245
  size_t size = tscSqlExprNumOfExprs(pQueryInfo);
  
  for (int32_t i = 0; i < size; ++i) {
246
    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
247 248 249 250
    if (pExpr == NULL) {
      return false;
    }

S
slguan 已提交
251
    int32_t functionId = pExpr->functionId;
H
hzcheng 已提交
252 253 254 255 256 257 258 259 260 261 262
    if (functionId == TSDB_FUNC_TAG) {
      continue;
    }

    if (functionId != TSDB_FUNC_INTERP) {
      return false;
    }
  }
  return true;
}

263
bool tscIsTWAQuery(SQueryInfo* pQueryInfo) {
H
hjxilinx 已提交
264 265
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
  for (int32_t i = 0; i < numOfExprs; ++i) {
266
    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
S
slguan 已提交
267 268 269 270 271 272 273 274 275 276 277
    if (pExpr == NULL) {
      continue;
    }

    int32_t functionId = pExpr->functionId;
    if (functionId == TSDB_FUNC_TWA) {
      return true;
    }
  }

  return false;
H
hzcheng 已提交
278 279
}

280 281
void tscClearInterpInfo(SQueryInfo* pQueryInfo) {
  if (!tscIsPointInterpQuery(pQueryInfo)) {
H
hzcheng 已提交
282 283 284
    return;
  }

285
  pQueryInfo->fillType = TSDB_FILL_NONE;
286
  tfree(pQueryInfo->fillVal);
H
hzcheng 已提交
287 288
}

289
int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
H
hzcheng 已提交
290
  if (pRes->tsrow == NULL) {
H
hjxilinx 已提交
291 292
    int32_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput;
    pRes->numOfCols = numOfOutput;
293
  
294 295 296
    pRes->tsrow  = calloc(numOfOutput, POINTER_BYTES);
    pRes->length = calloc(numOfOutput, sizeof(int32_t));  // todo refactor
    pRes->buffer = calloc(numOfOutput, POINTER_BYTES);
297 298
  
    // not enough memory
H
hjxilinx 已提交
299
    if (pRes->tsrow == NULL || (pRes->buffer == NULL && pRes->numOfCols > 0)) {
300 301
      tfree(pRes->tsrow);
      tfree(pRes->buffer);
302
      tfree(pRes->length);
303
    
304
      pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
305 306 307 308 309 310 311 312
      return pRes->code;
    }
  }

  return TSDB_CODE_SUCCESS;
}

void tscDestroyResPointerInfo(SSqlRes* pRes) {
H
hjxilinx 已提交
313
  if (pRes->buffer != NULL) { // free all buffers containing the multibyte string
H
hjxilinx 已提交
314
    for (int i = 0; i < pRes->numOfCols; i++) {
H
hzcheng 已提交
315 316
      tfree(pRes->buffer[i]);
    }
317
    
H
hjxilinx 已提交
318
    pRes->numOfCols = 0;
H
hzcheng 已提交
319
  }
320 321
  
  tfree(pRes->pRsp);
H
hzcheng 已提交
322
  tfree(pRes->tsrow);
323
  tfree(pRes->length);
324 325 326 327 328
  
  tfree(pRes->pGroupRec);
  tfree(pRes->pColumnIndex);
  tfree(pRes->buffer);
  
329 330 331 332 333
  if (pRes->pArithSup != NULL) {
    tfree(pRes->pArithSup->data);
    tfree(pRes->pArithSup);
  }
  
334
  pRes->data = NULL;  // pRes->data points to the buffer of pRsp, no need to free
H
hzcheng 已提交
335 336
}

337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
static void tscFreeQueryInfo(SSqlCmd* pCmd) {
  if (pCmd == NULL || pCmd->numOfClause == 0) {
    return;
  }
  
  for (int32_t i = 0; i < pCmd->numOfClause; ++i) {
    char* addr = (char*)pCmd - offsetof(SSqlObj, cmd);
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i);
    
    freeQueryInfoImpl(pQueryInfo);
    clearAllTableMetaInfo(pQueryInfo, (const char*)addr, false);
    tfree(pQueryInfo);
  }
  
  pCmd->numOfClause = 0;
  tfree(pCmd->pQueryInfo);
}

355 356
void tscResetSqlCmdObj(SSqlCmd* pCmd) {
  pCmd->command   = 0;
357
  pCmd->numOfCols = 0;
358 359 360 361
  pCmd->count     = 0;
  pCmd->curSql    = NULL;
  pCmd->msgType   = 0;
  pCmd->parseFinished = 0;
362
  pCmd->autoCreated = 0;
363 364
  
  taosHashCleanup(pCmd->pTableList);
365
  pCmd->pTableList = NULL;
366
  
S
slguan 已提交
367
  pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
368
  
369
  tscFreeQueryInfo(pCmd);
H
hzcheng 已提交
370 371
}

H
hjxilinx 已提交
372
void tscFreeSqlResult(SSqlObj* pSql) {
373 374
  tscDestroyLocalReducer(pSql);
  
H
hjxilinx 已提交
375
  SSqlRes* pRes = &pSql->res;
376
  tscDestroyResPointerInfo(pRes);
377
  
H
hjxilinx 已提交
378
  memset(&pSql->res, 0, sizeof(SSqlRes));
379 380
}

H
hjxilinx 已提交
381
void tscPartiallyFreeSqlObj(SSqlObj* pSql) {
S
slguan 已提交
382 383 384
  if (pSql == NULL || pSql->signature != pSql) {
    return;
  }
H
hzcheng 已提交
385 386 387 388 389

  SSqlCmd* pCmd = &pSql->cmd;
  STscObj* pObj = pSql->pTscObj;

  int32_t cmd = pCmd->command;
H
hjxilinx 已提交
390
  if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_LOCALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
391
      cmd == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
H
hzcheng 已提交
392 393
    tscRemoveFromSqlList(pSql);
  }
394
  
H
hzcheng 已提交
395
  // pSql->sqlstr will be used by tscBuildQueryStreamDesc
L
lihui 已提交
396 397 398 399 400
  if (pObj->signature == pObj) {
    pthread_mutex_lock(&pObj->mutex);
    tfree(pSql->sqlstr);
    pthread_mutex_unlock(&pObj->mutex);
  }
401
  
weixin_48148422's avatar
weixin_48148422 已提交
402
  tscFreeSqlResult(pSql);
H
[TD-98]  
hjxilinx 已提交
403
  
404
  tfree(pSql->pSubs);
405
  pSql->freed = 0;
H
[TD-98]  
hjxilinx 已提交
406 407
  pSql->numOfSubs = 0;
  
408
  tscResetSqlCmdObj(pCmd);
H
hzcheng 已提交
409 410 411
}

void tscFreeSqlObj(SSqlObj* pSql) {
H
Haojun Liao 已提交
412 413 414 415
  if (pSql == NULL || pSql->signature != pSql) {
    return;
  }
  
H
hzcheng 已提交
416
  tscTrace("%p start to free sql object", pSql);
H
hjxilinx 已提交
417
  tscPartiallyFreeSqlObj(pSql);
H
hzcheng 已提交
418 419 420

  pSql->signature = NULL;
  pSql->fp = NULL;
421
  
H
hzcheng 已提交
422 423
  SSqlCmd* pCmd = &pSql->cmd;

S
slguan 已提交
424
  memset(pCmd->payload, 0, (size_t)pCmd->allocSize);
H
hzcheng 已提交
425 426
  tfree(pCmd->payload);
  pCmd->allocSize = 0;
427 428
  
  tfree(pSql->sqlstr);
H
Haojun Liao 已提交
429
  sem_destroy(&pSql->rspSem);
H
hzcheng 已提交
430 431 432
  free(pSql);
}

S
slguan 已提交
433 434
void tscDestroyDataBlock(STableDataBlocks* pDataBlock) {
  if (pDataBlock == NULL) {
H
hzcheng 已提交
435 436 437
    return;
  }

S
slguan 已提交
438
  tfree(pDataBlock->pData);
S
slguan 已提交
439
  tfree(pDataBlock->params);
H
hjxilinx 已提交
440

H
hjxilinx 已提交
441
  // free the refcount for metermeta
H
hjxilinx 已提交
442
  taosCacheRelease(tscCacheHandle, (void**)&(pDataBlock->pTableMeta), false);
S
slguan 已提交
443
  tfree(pDataBlock);
H
hzcheng 已提交
444 445
}

446 447
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes,
                                   uint32_t offset) {
S
slguan 已提交
448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469
  uint32_t needed = pDataBlock->numOfParams + 1;
  if (needed > pDataBlock->numOfAllocedParams) {
    needed *= 2;
    void* tmp = realloc(pDataBlock->params, needed * sizeof(SParamInfo));
    if (tmp == NULL) {
      return NULL;
    }
    pDataBlock->params = (SParamInfo*)tmp;
    pDataBlock->numOfAllocedParams = needed;
  }

  SParamInfo* param = pDataBlock->params + pDataBlock->numOfParams;
  param->idx = -1;
  param->type = type;
  param->timePrec = timePrec;
  param->bytes = bytes;
  param->offset = offset;

  ++pDataBlock->numOfParams;
  return param;
}

H
hzcheng 已提交
470 471 472 473
SDataBlockList* tscCreateBlockArrayList() {
  const int32_t DEFAULT_INITIAL_NUM_OF_BLOCK = 16;

  SDataBlockList* pDataBlockArrayList = calloc(1, sizeof(SDataBlockList));
S
slguan 已提交
474 475 476
  if (pDataBlockArrayList == NULL) {
    return NULL;
  }
H
hjxilinx 已提交
477
  
H
hzcheng 已提交
478 479
  pDataBlockArrayList->nAlloc = DEFAULT_INITIAL_NUM_OF_BLOCK;
  pDataBlockArrayList->pData = calloc(1, POINTER_BYTES * pDataBlockArrayList->nAlloc);
S
slguan 已提交
480 481 482 483
  if (pDataBlockArrayList->pData == NULL) {
    free(pDataBlockArrayList);
    return NULL;
  }
H
hzcheng 已提交
484 485 486 487

  return pDataBlockArrayList;
}

488
void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks) {
S
slguan 已提交
489
  if (pList->nSize >= pList->nAlloc) {
H
hjxilinx 已提交
490 491
    pList->nAlloc = (pList->nAlloc) << 1U;
    pList->pData = realloc(pList->pData, POINTER_BYTES * (size_t)pList->nAlloc);
S
slguan 已提交
492 493

    // reset allocated memory
H
hjxilinx 已提交
494
    memset(pList->pData + pList->nSize, 0, POINTER_BYTES * (pList->nAlloc - pList->nSize));
S
slguan 已提交
495 496 497 498 499
  }

  pList->pData[pList->nSize++] = pBlocks;
}

S
slguan 已提交
500 501 502
void* tscDestroyBlockArrayList(SDataBlockList* pList) {
  if (pList == NULL) {
    return NULL;
H
hzcheng 已提交
503 504
  }

S
slguan 已提交
505 506
  for (int32_t i = 0; i < pList->nSize; i++) {
    tscDestroyDataBlock(pList->pData[i]);
H
hzcheng 已提交
507 508
  }

S
slguan 已提交
509 510 511 512
  tfree(pList->pData);
  tfree(pList);

  return NULL;
H
hzcheng 已提交
513 514
}

S
slguan 已提交
515
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
H
hjxilinx 已提交
516
  SSqlCmd* pCmd = &pSql->cmd;
H
hjxilinx 已提交
517
  assert(pDataBlock->pTableMeta != NULL);
H
hjxilinx 已提交
518

S
slguan 已提交
519
  pCmd->numOfTablesInSubmit = pDataBlock->numOfTables;
520

521
  assert(pCmd->numOfClause == 1);
522
  STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
523

524
  // set the correct table meta object, the table meta has been locked in pDataBlocks, so it must be in the cache
H
hjxilinx 已提交
525 526 527
  if (pTableMetaInfo->pTableMeta != pDataBlock->pTableMeta) {
    strcpy(pTableMetaInfo->name, pDataBlock->tableId);
    taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), false);
528

H
hjxilinx 已提交
529
    pTableMetaInfo->pTableMeta = taosCacheTransfer(tscCacheHandle, (void**)&pDataBlock->pTableMeta);
H
hjxilinx 已提交
530
  } else {
H
hjxilinx 已提交
531
    assert(strncmp(pTableMetaInfo->name, pDataBlock->tableId, tListLen(pDataBlock->tableId)) == 0);
H
hjxilinx 已提交
532
  }
H
hjxilinx 已提交
533

534 535
  /*
   * the submit message consists of : [RPC header|message body|digest]
536
   * the dataBlock only includes the RPC Header buffer and actual submit message body, space for digest needs
537 538
   * additional space.
   */
S
slguan 已提交
539
  int ret = tscAllocPayload(pCmd, pDataBlock->nAllocSize + 100);
H
hjxilinx 已提交
540 541 542
  if (TSDB_CODE_SUCCESS != ret) {
    return ret;
  }
H
hjxilinx 已提交
543

H
hzcheng 已提交
544
  memcpy(pCmd->payload, pDataBlock->pData, pDataBlock->nAllocSize);
H
hjxilinx 已提交
545

546 547 548 549
  /*
   * the payloadLen should be actual message body size
   * the old value of payloadLen is the allocated payload size
   */
550
  pCmd->payloadLen = pDataBlock->nAllocSize - tsRpcHeadSize;
H
hjxilinx 已提交
551

H
hjxilinx 已提交
552
  assert(pCmd->allocSize >= pCmd->payloadLen + tsRpcHeadSize + 100 && pCmd->payloadLen > 0);
H
hjxilinx 已提交
553
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
554 555 556 557 558
}

void tscFreeUnusedDataBlocks(SDataBlockList* pList) {
  /* release additional memory consumption */
  for (int32_t i = 0; i < pList->nSize; ++i) {
S
slguan 已提交
559 560 561
    STableDataBlocks* pDataBlock = pList->pData[i];
    pDataBlock->pData = realloc(pDataBlock->pData, pDataBlock->size);
    pDataBlock->nAllocSize = (uint32_t)pDataBlock->size;
H
hzcheng 已提交
562 563 564
  }
}

H
hjxilinx 已提交
565 566 567 568 569 570
/**
 * create the in-memory buffer for each table to keep the submitted data block
 * @param initialSize
 * @param rowSize
 * @param startOffset
 * @param name
H
hjxilinx 已提交
571
 * @param dataBlocks
H
hjxilinx 已提交
572 573
 * @return
 */
H
hjxilinx 已提交
574
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name,
H
hjxilinx 已提交
575
                           STableMeta* pTableMeta, STableDataBlocks** dataBlocks) {
H
hjxilinx 已提交
576
  STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks));
H
hjxilinx 已提交
577 578
  if (dataBuf == NULL) {
    tscError("failed to allocated memory, reason:%s", strerror(errno));
579
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
580 581 582
  }

  dataBuf->nAllocSize = (uint32_t)initialSize;
L
[#1102]  
lihui 已提交
583
  dataBuf->headerSize = startOffset; // the header size will always be the startOffset value, reserved for the subumit block header
H
hjxilinx 已提交
584 585 586 587
  if (dataBuf->nAllocSize <= dataBuf->headerSize) {
    dataBuf->nAllocSize = dataBuf->headerSize*2;
  }
  
H
hjxilinx 已提交
588 589 590
  dataBuf->pData = calloc(1, dataBuf->nAllocSize);
  dataBuf->ordered = true;
  dataBuf->prevTS = INT64_MIN;
S
slguan 已提交
591 592 593

  dataBuf->rowSize = rowSize;
  dataBuf->size = startOffset;
S
slguan 已提交
594 595
  dataBuf->tsSource = -1;

B
Bomin Zhang 已提交
596
  tstrncpy(dataBuf->tableId, name, sizeof(dataBuf->tableId));
H
hjxilinx 已提交
597 598

  /*
599
   * The table meta may be released since the table meta cache are completed clean by other thread
600 601
   * due to operation such as drop database. So here we add the reference count directly instead of invoke
   * taosGetDataFromCache, which may return NULL value.
H
hjxilinx 已提交
602
   */
H
hjxilinx 已提交
603 604
  dataBuf->pTableMeta = taosCacheAcquireByData(tscCacheHandle, pTableMeta);
  assert(initialSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);
605

606 607
  *dataBlocks = dataBuf;
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
608 609
}

H
hjxilinx 已提交
610
int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size,
H
hjxilinx 已提交
611
                                int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta,
H
hjxilinx 已提交
612 613
                                STableDataBlocks** dataBlocks) {
  *dataBlocks = NULL;
S
slguan 已提交
614

H
hjxilinx 已提交
615
  STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id));
S
slguan 已提交
616
  if (t1 != NULL) {
H
hjxilinx 已提交
617
    *dataBlocks = *t1;
S
slguan 已提交
618 619
  }

H
hjxilinx 已提交
620
  if (*dataBlocks == NULL) {
H
hjxilinx 已提交
621
    int32_t ret = tscCreateDataBlock((size_t)size, rowSize, startOffset, tableId, pTableMeta, dataBlocks);
H
hjxilinx 已提交
622 623 624 625
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }

H
hjxilinx 已提交
626
    taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES);
H
hjxilinx 已提交
627
    tscAppendDataBlock(pDataBlockList, *dataBlocks);
S
slguan 已提交
628 629
  }

H
hjxilinx 已提交
630
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
631 632
}

H
TD-100  
hzcheng 已提交
633
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
634
  // TODO: optimize this function, handle the case while binary is not presented
H
TD-100  
hzcheng 已提交
635
  int len = 0;
H
TD-166  
hzcheng 已提交
636 637

  STableMeta*   pTableMeta = pTableDataBlock->pTableMeta;
638
  STableComInfo tinfo = tscGetTableInfo(pTableMeta);
H
TD-166  
hzcheng 已提交
639 640 641
  SSchema*      pSchema = tscGetTableSchema(pTableMeta);

  SSubmitBlk* pBlock = pDataBlock;
642 643
  memcpy(pDataBlock, pTableDataBlock->pData, sizeof(SSubmitBlk));
  pDataBlock += sizeof(SSubmitBlk);
H
TD-166  
hzcheng 已提交
644

H
hjxilinx 已提交
645
  int32_t flen = 0;  // original total length of row
H
TD-166  
hzcheng 已提交
646 647
  for (int32_t i = 0; i < tinfo.numOfColumns; ++i) {
    flen += TYPE_BYTES[pSchema[i].type];
648
  }
H
TD-166  
hzcheng 已提交
649

650
  char* p = pTableDataBlock->pData + sizeof(SSubmitBlk);
H
TD-166  
hzcheng 已提交
651
  pBlock->len = 0;
H
hjxilinx 已提交
652 653 654
  int32_t numOfRows = htons(pBlock->numOfRows);
  
  for (int32_t i = 0; i < numOfRows; ++i) {
H
TD-166  
hzcheng 已提交
655 656
    SDataRow trow = (SDataRow)pDataBlock;
    dataRowSetLen(trow, TD_DATA_ROW_HEAD_SIZE + flen);
H
TD-90  
Hongze Cheng 已提交
657
    dataRowSetVersion(trow, pTableMeta->sversion);
H
TD-166  
hzcheng 已提交
658 659 660

    int toffset = 0;
    for (int32_t j = 0; j < tinfo.numOfColumns; j++) {
H
TD-166  
hzcheng 已提交
661
      tdAppendColVal(trow, p, pSchema[j].type, pSchema[j].bytes, toffset);
H
TD-166  
hzcheng 已提交
662 663 664 665 666 667 668
      toffset += TYPE_BYTES[pSchema[j].type];
      p += pSchema[j].bytes;
    }

    // p += pTableDataBlock->rowSize;
    pDataBlock += dataRowLen(trow);
    pBlock->len += dataRowLen(trow);
669
  }
H
TD-166  
hzcheng 已提交
670

H
TD-100  
hzcheng 已提交
671
  len = pBlock->len;
H
TD-166  
hzcheng 已提交
672
  pBlock->len = htonl(pBlock->len);
H
TD-100  
hzcheng 已提交
673
  return len;
674 675
}

S
slguan 已提交
676 677 678
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockList) {
  SSqlCmd* pCmd = &pSql->cmd;

H
hjxilinx 已提交
679
  void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
S
slguan 已提交
680 681 682 683
  SDataBlockList* pVnodeDataBlockList = tscCreateBlockArrayList();

  for (int32_t i = 0; i < pTableDataBlockList->nSize; ++i) {
    STableDataBlocks* pOneTableBlock = pTableDataBlockList->pData[i];
S
slguan 已提交
684

H
hjxilinx 已提交
685
    STableDataBlocks* dataBuf = NULL;
H
hjxilinx 已提交
686 687 688
    
    int32_t ret =
        tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
H
hjxilinx 已提交
689
                                tsInsertHeadSize, 0, pOneTableBlock->tableId, pOneTableBlock->pTableMeta, &dataBuf);
H
hjxilinx 已提交
690
    if (ret != TSDB_CODE_SUCCESS) {
691
      tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret);
H
hjxilinx 已提交
692
      taosHashCleanup(pVnodeDataBlockHashList);
693
      tscDestroyBlockArrayList(pVnodeDataBlockList);
H
hjxilinx 已提交
694 695
      return ret;
    }
S
slguan 已提交
696

697
    int64_t destSize = dataBuf->size + pOneTableBlock->size + pOneTableBlock->size*sizeof(int32_t)*2;
S
slguan 已提交
698 699 700 701 702 703 704 705 706
    if (dataBuf->nAllocSize < destSize) {
      while (dataBuf->nAllocSize < destSize) {
        dataBuf->nAllocSize = dataBuf->nAllocSize * 1.5;
      }

      char* tmp = realloc(dataBuf->pData, dataBuf->nAllocSize);
      if (tmp != NULL) {
        dataBuf->pData = tmp;
        memset(dataBuf->pData + dataBuf->size, 0, dataBuf->nAllocSize - dataBuf->size);
707
      } else {  // failed to allocate memory, free already allocated memory and return error code
S
slguan 已提交
708 709
        tscError("%p failed to allocate memory for merging submit block, size:%d", pSql, dataBuf->nAllocSize);

H
hjxilinx 已提交
710
        taosHashCleanup(pVnodeDataBlockHashList);
S
slguan 已提交
711
        tscDestroyBlockArrayList(pVnodeDataBlockList);
712
        tfree(dataBuf->pData);
S
slguan 已提交
713

714
        return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
715 716 717
      }
    }

718
    SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
H
hjxilinx 已提交
719
    tscSortRemoveDataBlockDupRows(pOneTableBlock);
S
slguan 已提交
720

721
    char* e = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1);
L
lihui 已提交
722
    
723 724
    tscTrace("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId,
        pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(e));
S
slguan 已提交
725

726 727 728
    int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + sizeof(int32_t) * 2);
    
    pBlocks->tid = htonl(pBlocks->tid);
S
slguan 已提交
729 730 731
    pBlocks->uid = htobe64(pBlocks->uid);
    pBlocks->sversion = htonl(pBlocks->sversion);
    pBlocks->numOfRows = htons(pBlocks->numOfRows);
732 733 734 735
    
    pBlocks->len = htonl(len);
    
    // erase the empty space reserved for binary data
H
TD-100  
hzcheng 已提交
736
    len = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock);
737
    dataBuf->size += (len + sizeof(SSubmitBlk));
S
slguan 已提交
738
    dataBuf->numOfTables += 1;
S
slguan 已提交
739 740 741 742 743 744 745 746
  }

  tscDestroyBlockArrayList(pTableDataBlockList);

  // free the table data blocks;
  pCmd->pDataBlocks = pVnodeDataBlockList;

  tscFreeUnusedDataBlocks(pCmd->pDataBlocks);
H
hjxilinx 已提交
747
  taosHashCleanup(pVnodeDataBlockHashList);
S
slguan 已提交
748 749

  return TSDB_CODE_SUCCESS;
S
slguan 已提交
750 751
}

H
hzcheng 已提交
752
void tscCloseTscObj(STscObj* pObj) {
H
hjxilinx 已提交
753 754
  assert(pObj != NULL);
  
H
hzcheng 已提交
755 756 757
  pObj->signature = NULL;
  taosTmrStopA(&(pObj->pTimer));
  pthread_mutex_destroy(&pObj->mutex);
758
  
759 760 761 762
  if (pObj->pDnodeConn != NULL) {
    rpcClose(pObj->pDnodeConn);
  }
  
763
  tscTrace("%p DB connection is closed, dnodeConn:%p", pObj, pObj->pDnodeConn);
H
hzcheng 已提交
764 765 766
  tfree(pObj);
}

H
hjxilinx 已提交
767
bool tscIsInsertData(char* sqlstr) {
768 769 770 771 772 773 774 775
  int32_t index = 0;

  do {
    SSQLToken t0 = tStrGetToken(sqlstr, &index, false, 0, NULL);
    if (t0.type != TK_LP) {
      return t0.type == TK_INSERT || t0.type == TK_IMPORT;
    }
  } while (1);
H
hzcheng 已提交
776 777
}

S
slguan 已提交
778
int tscAllocPayload(SSqlCmd* pCmd, int size) {
H
hzcheng 已提交
779 780 781 782 783
  assert(size > 0);

  if (pCmd->payload == NULL) {
    assert(pCmd->allocSize == 0);

784
    pCmd->payload = (char*)calloc(1, size);
785
    if (pCmd->payload == NULL) return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
786 787 788
    pCmd->allocSize = size;
  } else {
    if (pCmd->allocSize < size) {
789
      char* b = realloc(pCmd->payload, size);
790
      if (b == NULL) return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
791
      pCmd->payload = b;
H
hzcheng 已提交
792 793
      pCmd->allocSize = size;
    }
794
    
H
hjxilinx 已提交
795
    memset(pCmd->payload, 0, pCmd->allocSize);
H
hzcheng 已提交
796 797 798 799 800 801
  }

  assert(pCmd->allocSize >= size);
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
802 803
TAOS_FIELD tscCreateField(int8_t type, const char* name, int16_t bytes) {
  TAOS_FIELD f = { .type = type, .bytes = bytes, };
B
Bomin Zhang 已提交
804
  tstrncpy(f.name, name, sizeof(f.name));
H
hjxilinx 已提交
805
  return f;
H
hzcheng 已提交
806 807
}

H
hjxilinx 已提交
808 809 810 811
SFieldSupInfo* tscFieldInfoAppend(SFieldInfo* pFieldInfo, TAOS_FIELD* pField) {
  assert(pFieldInfo != NULL);
  taosArrayPush(pFieldInfo->pFields, pField);
  pFieldInfo->numOfOutput++;
H
hjxilinx 已提交
812
  
H
hjxilinx 已提交
813 814 815 816 817 818 819
  struct SFieldSupInfo info = {
    .pSqlExpr = NULL,
    .pArithExprInfo = NULL,
    .visible = true,
  };
  
  return taosArrayPush(pFieldInfo->pSupportInfo, &info);
H
hzcheng 已提交
820 821
}

H
hjxilinx 已提交
822 823
SFieldSupInfo* tscFieldInfoGetSupp(SFieldInfo* pFieldInfo, int32_t index) {
  return taosArrayGet(pFieldInfo->pSupportInfo, index);
H
hjxilinx 已提交
824 825
}

H
hjxilinx 已提交
826 827 828 829 830 831 832 833 834 835 836
SFieldSupInfo* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIELD* field) {
  taosArrayInsert(pFieldInfo->pFields, index, field);
  pFieldInfo->numOfOutput++;
  
  struct SFieldSupInfo info = {
      .pSqlExpr = NULL,
      .pArithExprInfo = NULL,
      .visible = true,
  };
  
  return taosArrayInsert(pFieldInfo->pSupportInfo, index, &info);
H
hjxilinx 已提交
837
}
H
hzcheng 已提交
838

H
hjxilinx 已提交
839
void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo) {
H
hjxilinx 已提交
840 841
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
  
H
hjxilinx 已提交
842
  SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
H
hjxilinx 已提交
843
  pExpr->offset = 0;
H
hjxilinx 已提交
844
  
H
hjxilinx 已提交
845
  for (int32_t i = 1; i < numOfExprs; ++i) {
H
hjxilinx 已提交
846 847
    SSqlExpr* prev = taosArrayGetP(pQueryInfo->exprList, i - 1);
    SSqlExpr* p = taosArrayGetP(pQueryInfo->exprList, i);
H
hjxilinx 已提交
848 849
  
    p->offset = prev->offset + prev->resBytes;
H
hzcheng 已提交
850 851 852
  }
}

853
void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo) {
H
hjxilinx 已提交
854
  if (tscSqlExprNumOfExprs(pQueryInfo) == 0) {
H
hzcheng 已提交
855 856
    return;
  }
H
hjxilinx 已提交
857
  
H
hjxilinx 已提交
858
  SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
H
hjxilinx 已提交
859
  pExpr->offset = 0;
H
hjxilinx 已提交
860
  
H
hjxilinx 已提交
861 862
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
  for (int32_t i = 1; i < numOfExprs; ++i) {
H
hjxilinx 已提交
863 864
    SSqlExpr* prev = taosArrayGetP(pQueryInfo->exprList, i - 1);
    SSqlExpr* p = taosArrayGetP(pQueryInfo->exprList, i);
H
hjxilinx 已提交
865 866
    
    p->offset = prev->offset + prev->resBytes;
H
hzcheng 已提交
867 868 869
  }
}

H
hjxilinx 已提交
870
void tscFieldInfoCopy(SFieldInfo* dst, const SFieldInfo* src) {
H
hjxilinx 已提交
871
  dst->numOfOutput = src->numOfOutput;
H
hzcheng 已提交
872

H
hjxilinx 已提交
873 874 875 876 877 878 879 880 881 882 883
  if (dst->pFields == NULL) {
    dst->pFields = taosArrayClone(src->pFields);
  } else {
    taosArrayCopy(dst->pFields, src->pFields);
  }
  
  if (dst->pSupportInfo == NULL) {
    dst->pSupportInfo = taosArrayClone(src->pSupportInfo);
  } else {
    taosArrayCopy(dst->pSupportInfo, src->pSupportInfo);
  }
H
hzcheng 已提交
884 885
}

H
hjxilinx 已提交
886 887
TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index) {
  return taosArrayGet(pFieldInfo->pFields, index);
888 889
}

H
hjxilinx 已提交
890
int32_t tscNumOfFields(SQueryInfo* pQueryInfo) { return pQueryInfo->fieldsInfo.numOfOutput; }
H
hzcheng 已提交
891

892
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index) {
H
hjxilinx 已提交
893 894
  SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, index);
  assert(pInfo != NULL);
H
hzcheng 已提交
895

H
hjxilinx 已提交
896
  return pInfo->pSqlExpr->offset;
H
hzcheng 已提交
897 898
}

H
hjxilinx 已提交
899
int32_t tscFieldInfoCompare(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2) {
900
  assert(pFieldInfo1 != NULL && pFieldInfo2 != NULL);
901

H
hjxilinx 已提交
902 903
  if (pFieldInfo1->numOfOutput != pFieldInfo2->numOfOutput) {
    return pFieldInfo1->numOfOutput - pFieldInfo2->numOfOutput;
904 905
  }

H
hjxilinx 已提交
906 907 908
  for (int32_t i = 0; i < pFieldInfo1->numOfOutput; ++i) {
    TAOS_FIELD* pField1 = tscFieldInfoGetField((SFieldInfo*) pFieldInfo1, i);
    TAOS_FIELD* pField2 = tscFieldInfoGetField((SFieldInfo*) pFieldInfo2, i);
909

H
hjxilinx 已提交
910 911
    if (pField1->type != pField2->type ||
        pField1->bytes != pField2->bytes ||
912 913 914 915 916 917 918 919
        strcasecmp(pField1->name, pField2->name) != 0) {
      return 1;
    }
  }

  return 0;
}

H
hjxilinx 已提交
920 921 922
int32_t tscGetResRowLength(SArray* pExprList) {
  size_t num = taosArrayGetSize(pExprList);
  if (num == 0) {
H
hzcheng 已提交
923 924
    return 0;
  }
H
hjxilinx 已提交
925 926
  
  int32_t size = 0;
H
hjxilinx 已提交
927 928 929
  for(int32_t i = 0; i < num; ++i) {
    SSqlExpr* pExpr = taosArrayGetP(pExprList, i);
    size += pExpr->resBytes;
H
hjxilinx 已提交
930 931 932
  }
  
  return size;
H
hzcheng 已提交
933 934
}

H
hjxilinx 已提交
935
void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
S
slguan 已提交
936
  if (pFieldInfo == NULL) {
H
hzcheng 已提交
937 938 939
    return;
  }

H
hjxilinx 已提交
940
  taosArrayDestroy(pFieldInfo->pFields);
H
hjxilinx 已提交
941
  
H
hjxilinx 已提交
942 943 944 945
  for(int32_t i = 0; i < pFieldInfo->numOfOutput; ++i) {
    SFieldSupInfo* pInfo = taosArrayGet(pFieldInfo->pSupportInfo, i);
    
    if (pInfo->pArithExprInfo != NULL) {
946
      tExprTreeDestroy(&pInfo->pArithExprInfo->pExpr, NULL);
947
      tfree(pInfo->pArithExprInfo);
H
hjxilinx 已提交
948 949 950
    }
  }
  
H
hjxilinx 已提交
951
  taosArrayDestroy(pFieldInfo->pSupportInfo);
S
slguan 已提交
952
  memset(pFieldInfo, 0, sizeof(SFieldInfo));
H
hzcheng 已提交
953 954
}

H
hjxilinx 已提交
955
static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
956
    int16_t size, int16_t interSize, bool isTagCol) {
H
hjxilinx 已提交
957
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pColIndex->tableIndex);
H
hjxilinx 已提交
958
  
H
hjxilinx 已提交
959
  SSqlExpr* pExpr = calloc(1, sizeof(SSqlExpr));
S
slguan 已提交
960
  pExpr->functionId = functionId;
961
  
962
  // set the correct columnIndex index
S
slguan 已提交
963 964 965
  if (pColIndex->columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
    pExpr->colInfo.colId = TSDB_TBNAME_COLUMN_INDEX;
  } else {
966 967 968
    if (isTagCol) {
      SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
      pExpr->colInfo.colId = pSchema[pColIndex->columnIndex].colId;
B
Bomin Zhang 已提交
969
      tstrncpy(pExpr->colInfo.name, pSchema[pColIndex->columnIndex].name, sizeof(pExpr->colInfo.name));
H
Haojun Liao 已提交
970 971
    } else if (pTableMetaInfo->pTableMeta != NULL) {
      // in handling select database/version/server_status(), the pTableMeta is NULL
972 973
      SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pColIndex->columnIndex);
      pExpr->colInfo.colId = pSchema->colId;
B
Bomin Zhang 已提交
974
      tstrncpy(pExpr->colInfo.name, pSchema->name, sizeof(pExpr->colInfo.name));
S
slguan 已提交
975
    }
H
hzcheng 已提交
976
  }
H
hjxilinx 已提交
977
  
978 979
  pExpr->colInfo.flag = isTagCol? TSDB_COL_TAG:TSDB_COL_NORMAL;
  
980
  pExpr->colInfo.colIndex = pColIndex->columnIndex;
H
hjxilinx 已提交
981 982
  pExpr->resType       = type;
  pExpr->resBytes      = size;
H
Haojun Liao 已提交
983 984 985 986 987 988
  pExpr->interBytes    = interSize;
  
  if (pTableMetaInfo->pTableMeta) {
    pExpr->uid = pTableMetaInfo->pTableMeta->uid;
  }
  
H
hjxilinx 已提交
989 990 991 992
  return pExpr;
}

SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
993
                           int16_t size, int16_t interSize, bool isTagCol) {
H
hjxilinx 已提交
994
  int32_t num = taosArrayGetSize(pQueryInfo->exprList);
H
hjxilinx 已提交
995
  if (index == num) {
996
    return tscSqlExprAppend(pQueryInfo, functionId, pColIndex, type, size, interSize, isTagCol);
H
hjxilinx 已提交
997 998
  }
  
999
  SSqlExpr* pExpr = doBuildSqlExpr(pQueryInfo, functionId, pColIndex, type, size, interSize, isTagCol);
H
hjxilinx 已提交
1000
  taosArrayInsert(pQueryInfo->exprList, index, &pExpr);
H
hjxilinx 已提交
1001 1002 1003 1004
  return pExpr;
}

SSqlExpr* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
1005 1006
    int16_t size, int16_t interSize, bool isTagCol) {
  SSqlExpr* pExpr = doBuildSqlExpr(pQueryInfo, functionId, pColIndex, type, size, interSize, isTagCol);
H
hjxilinx 已提交
1007
  taosArrayPush(pQueryInfo->exprList, &pExpr);
H
hzcheng 已提交
1008 1009 1010
  return pExpr;
}

1011 1012
SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex,
                           int16_t type, int16_t size) {
H
hjxilinx 已提交
1013
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
1014 1015
  SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, index);
  if (pExpr == NULL) {
H
hzcheng 已提交
1016 1017 1018
    return NULL;
  }

S
slguan 已提交
1019
  pExpr->functionId = functionId;
H
hzcheng 已提交
1020

1021
  pExpr->colInfo.colIndex = srcColumnIndex;
H
hjxilinx 已提交
1022
  pExpr->colInfo.colId = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, srcColumnIndex)->colId;
H
hzcheng 已提交
1023 1024 1025 1026 1027 1028 1029

  pExpr->resType = type;
  pExpr->resBytes = size;

  return pExpr;
}

1030
size_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo) {
H
hjxilinx 已提交
1031
  return taosArrayGetSize(pQueryInfo->exprList);
H
hjxilinx 已提交
1032 1033
}

S
slguan 已提交
1034
void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, int16_t tableIndex) {
H
hzcheng 已提交
1035 1036 1037 1038 1039 1040
  if (pExpr == NULL || argument == NULL || bytes == 0) {
    return;
  }

  // set parameter value
  // transfer to tVariant from byte data/no ascii data
S
slguan 已提交
1041
  tVariantCreateFromBinary(&pExpr->param[pExpr->numOfParams], argument, bytes, type);
H
hzcheng 已提交
1042 1043 1044 1045 1046

  pExpr->numOfParams += 1;
  assert(pExpr->numOfParams <= 3);
}

1047
SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index) {
H
hjxilinx 已提交
1048
  return taosArrayGetP(pQueryInfo->exprList, index);
H
hzcheng 已提交
1049 1050
}

H
hjxilinx 已提交
1051
void* sqlExprDestroy(SSqlExpr* pExpr) {
H
hjxilinx 已提交
1052 1053 1054 1055 1056 1057 1058 1059
  if (pExpr == NULL) {
    return NULL;
  }
  
  for(int32_t i = 0; i < tListLen(pExpr->param); ++i) {
    tVariantDestroy(&pExpr->param[i]);
  }
  
H
hjxilinx 已提交
1060 1061
  tfree(pExpr);
  
H
hjxilinx 已提交
1062
  return NULL;
H
hzcheng 已提交
1063 1064
}

H
hjxilinx 已提交
1065 1066 1067
/*
 * NOTE: Does not release SSqlExprInfo here.
 */
H
hjxilinx 已提交
1068 1069
void tscSqlExprInfoDestroy(SArray* pExprInfo) {
  size_t size = taosArrayGetSize(pExprInfo);
H
hjxilinx 已提交
1070
  
H
hjxilinx 已提交
1071 1072 1073
  for(int32_t i = 0; i < size; ++i) {
    SSqlExpr* pExpr = taosArrayGetP(pExprInfo, i);
    sqlExprDestroy(pExpr);
H
hjxilinx 已提交
1074 1075
  }
  
H
hjxilinx 已提交
1076
  taosArrayDestroy(pExprInfo);
H
hjxilinx 已提交
1077 1078
}

H
hjxilinx 已提交
1079 1080
void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy) {
  assert(src != NULL && dst != NULL);
H
hjxilinx 已提交
1081
  
H
hjxilinx 已提交
1082 1083 1084 1085 1086
  size_t size = taosArrayGetSize(src);
  for (int32_t i = 0; i < size; ++i) {
    SSqlExpr* pExpr = taosArrayGetP(src, i);
    
    if (pExpr->uid == uid) {
H
hjxilinx 已提交
1087 1088
      
      if (deepcopy) {
H
hjxilinx 已提交
1089 1090 1091 1092 1093 1094 1095 1096
        SSqlExpr* p1 = calloc(1, sizeof(SSqlExpr));
        *p1 = *pExpr;
  
        for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
          tVariantAssign(&p1->param[j], &pExpr->param[j]);
        }
        
        taosArrayPush(dst, &p1);
H
hjxilinx 已提交
1097
      } else {
H
hjxilinx 已提交
1098
        taosArrayPush(dst, &pExpr);
H
hjxilinx 已提交
1099
      }
S
slguan 已提交
1100 1101
    }
  }
H
hzcheng 已提交
1102 1103
}

1104
SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) {
1105
  // ignore the tbname columnIndex to be inserted into source list
S
slguan 已提交
1106
  if (pColIndex->columnIndex < 0) {
H
hzcheng 已提交
1107 1108
    return NULL;
  }
1109 1110
  
  size_t numOfCols = taosArrayGetSize(pColumnList);
S
slguan 已提交
1111 1112
  int16_t col = pColIndex->columnIndex;

H
hzcheng 已提交
1113
  int32_t i = 0;
1114 1115 1116
  while (i < numOfCols) {
    SColumn* pCol = taosArrayGetP(pColumnList, i);
    if (pCol->colIndex.columnIndex < col) {
S
slguan 已提交
1117
      i++;
1118
    } else if (pCol->colIndex.tableIndex < pColIndex->tableIndex) {
S
slguan 已提交
1119 1120 1121 1122
      i++;
    } else {
      break;
    }
H
hzcheng 已提交
1123 1124
  }

1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138
  if (i >= numOfCols || numOfCols == 0) {
    SColumn* b = calloc(1, sizeof(SColumn));
    b->colIndex = *pColIndex;
    
    taosArrayInsert(pColumnList, i, &b);
  } else {
    SColumn* pCol = taosArrayGetP(pColumnList, i);
  
    if (i < numOfCols && (pCol->colIndex.columnIndex > col || pCol->colIndex.tableIndex != pColIndex->tableIndex)) {
      SColumn* b = calloc(1, sizeof(SColumn));
      b->colIndex = *pColIndex;
      
      taosArrayInsert(pColumnList, i, &b);
    }
H
hzcheng 已提交
1139 1140
  }

1141
  return taosArrayGetP(pColumnList, i);
H
hzcheng 已提交
1142 1143
}

1144
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters) {
1145
  if (numOfFilters == 0) {
1146 1147
    assert(src == NULL);
    return NULL;
S
slguan 已提交
1148
  }
1149
  
1150 1151
  SColumnFilterInfo* pFilter = calloc(1, numOfFilters * sizeof(SColumnFilterInfo));
  
1152 1153
  memcpy(pFilter, src, sizeof(SColumnFilterInfo) * numOfFilters);
  for (int32_t j = 0; j < numOfFilters; ++j) {
1154
    
1155
    if (pFilter[j].filterstr) {
1156
      size_t len = (size_t) pFilter[j].len + 1 * TSDB_NCHAR_SIZE;
1157
      pFilter[j].pz = (int64_t) calloc(1, len);
1158
      
1159
      memcpy((char*)pFilter[j].pz, (char*)src[j].pz, (size_t)len);
1160
    }
S
slguan 已提交
1161
  }
1162 1163 1164 1165 1166
  
  assert(src->filterstr == 0 || src->filterstr == 1);
  assert(!(src->lowerRelOptr == TSDB_RELATION_INVALID && src->upperRelOptr == TSDB_RELATION_INVALID));
  
  return pFilter;
S
slguan 已提交
1167 1168
}

1169 1170 1171 1172
static void destroyFilterInfo(SColumnFilterInfo* pFilterInfo, int32_t numOfFilters) {
  for(int32_t i = 0; i < numOfFilters; ++i) {
    if (pFilterInfo[i].filterstr) {
      tfree(pFilterInfo[i].pz);
S
slguan 已提交
1173 1174
    }
  }
1175 1176
  
  tfree(pFilterInfo);
S
slguan 已提交
1177 1178
}

1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195
SColumn* tscColumnClone(const SColumn* src) {
  assert(src != NULL);
  
  SColumn* dst = calloc(1, sizeof(SColumn));
  
  dst->colIndex     = src->colIndex;
  dst->numOfFilters = src->numOfFilters;
  dst->filterInfo   = tscFilterInfoClone(src->filterInfo, src->numOfFilters);
  
  return dst;
}

static void tscColumnDestroy(SColumn* pCol) {
  destroyFilterInfo(pCol->filterInfo, pCol->numOfFilters);
  free(pCol);
}

H
hjxilinx 已提交
1196
void tscColumnListCopy(SArray* dst, const SArray* src, int16_t tableIndex) {
H
hjxilinx 已提交
1197
  assert(src != NULL && dst != NULL);
1198 1199 1200 1201
  
  size_t num = taosArrayGetSize(src);
  for (int32_t i = 0; i < num; ++i) {
    SColumn* pCol = taosArrayGetP(src, i);
H
hzcheng 已提交
1202

1203 1204 1205
    if (pCol->colIndex.tableIndex == tableIndex || tableIndex < 0) {
      SColumn* p = tscColumnClone(pCol);
      taosArrayPush(dst, &p);
S
slguan 已提交
1206 1207
    }
  }
H
hzcheng 已提交
1208 1209
}

1210 1211
void tscColumnListDestroy(SArray* pColumnList) {
  if (pColumnList == NULL) {
S
slguan 已提交
1212 1213 1214
    return;
  }

1215
  size_t num = taosArrayGetSize(pColumnList);
1216
  for (int32_t i = 0; i < num; ++i) {
1217
    SColumn* pCol = taosArrayGetP(pColumnList, i);
1218
    tscColumnDestroy(pCol);
S
slguan 已提交
1219 1220
  }

1221
  taosArrayDestroy(pColumnList);
1222
}
H
hzcheng 已提交
1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238

/*
 * 1. normal name, not a keyword or number
 * 2. name with quote
 * 3. string with only one delimiter '.'.
 *
 * only_one_part
 * 'only_one_part'
 * first_part.second_part
 * first_part.'second_part'
 * 'first_part'.second_part
 * 'first_part'.'second_part'
 * 'first_part.second_part'
 *
 */
static int32_t validateQuoteToken(SSQLToken* pToken) {
H
Haojun Liao 已提交
1239 1240
  strdequote(pToken->z);
  pToken->n = strtrim(pToken->z);
H
hzcheng 已提交
1241 1242

  int32_t k = tSQLGetToken(pToken->z, &pToken->type);
1243

H
huili 已提交
1244 1245
  if (pToken->type == TK_STRING) {
    return tscValidateName(pToken);
S
slguan 已提交
1246
  }
H
hzcheng 已提交
1247

H
huili 已提交
1248
  if (k != pToken->n || pToken->type != TK_ID) {
1249
    return TSDB_CODE_TSC_INVALID_SQL;
H
huili 已提交
1250
  }
H
hzcheng 已提交
1251 1252 1253 1254 1255
  return TSDB_CODE_SUCCESS;
}

int32_t tscValidateName(SSQLToken* pToken) {
  if (pToken->type != TK_STRING && pToken->type != TK_ID) {
1256
    return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
1257 1258
  }

S
slguan 已提交
1259
  char* sep = strnchr(pToken->z, TS_PATH_DELIMITER[0], pToken->n, true);
H
hzcheng 已提交
1260 1261
  if (sep == NULL) {  // single part
    if (pToken->type == TK_STRING) {
H
Haojun Liao 已提交
1262 1263
      strdequote(pToken->z);
      pToken->n = strtrim(pToken->z);
S
slguan 已提交
1264 1265 1266 1267

      int len = tSQLGetToken(pToken->z, &pToken->type);

      // single token, validate it
1268
      if (len == pToken->n) {
H
huili 已提交
1269
        return validateQuoteToken(pToken);
S
slguan 已提交
1270
      } else {
1271 1272
        sep = strnchr(pToken->z, TS_PATH_DELIMITER[0], pToken->n, true);
        if (sep == NULL) {
1273
          return TSDB_CODE_TSC_INVALID_SQL;
1274
        }
S
slguan 已提交
1275

H
huili 已提交
1276
        return tscValidateName(pToken);
1277
      }
H
hzcheng 已提交
1278 1279
    } else {
      if (isNumber(pToken)) {
1280
        return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
1281 1282 1283 1284 1285 1286
      }
    }
  } else {  // two part
    int32_t oldLen = pToken->n;
    char*   pStr = pToken->z;

H
huili 已提交
1287
    if (pToken->type == TK_SPACE) {
H
Haojun Liao 已提交
1288
      pToken->n = strtrim(pToken->z);
H
huili 已提交
1289 1290
    }

H
hzcheng 已提交
1291 1292
    pToken->n = tSQLGetToken(pToken->z, &pToken->type);
    if (pToken->z[pToken->n] != TS_PATH_DELIMITER[0]) {
1293
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
1294 1295 1296
    }

    if (pToken->type != TK_STRING && pToken->type != TK_ID) {
1297
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
1298 1299 1300
    }

    if (pToken->type == TK_STRING && validateQuoteToken(pToken) != TSDB_CODE_SUCCESS) {
1301
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
1302 1303 1304 1305 1306 1307 1308 1309
    }

    int32_t firstPartLen = pToken->n;

    pToken->z = sep + 1;
    pToken->n = oldLen - (sep - pStr) - 1;
    int32_t len = tSQLGetToken(pToken->z, &pToken->type);
    if (len != pToken->n || (pToken->type != TK_STRING && pToken->type != TK_ID)) {
1310
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
1311 1312 1313
    }

    if (pToken->type == TK_STRING && validateQuoteToken(pToken) != TSDB_CODE_SUCCESS) {
1314
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
1315 1316 1317 1318
    }

    // re-build the whole name string
    if (pStr[firstPartLen] == TS_PATH_DELIMITER[0]) {
H
hjxilinx 已提交
1319
      // first part do not have quote do nothing
H
hzcheng 已提交
1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340
    } else {
      pStr[firstPartLen] = TS_PATH_DELIMITER[0];
      memmove(&pStr[firstPartLen + 1], pToken->z, pToken->n);
      pStr[firstPartLen + sizeof(TS_PATH_DELIMITER[0]) + pToken->n] = 0;
    }
    pToken->n += (firstPartLen + sizeof(TS_PATH_DELIMITER[0]));
    pToken->z = pStr;
  }

  return TSDB_CODE_SUCCESS;
}

void tscIncStreamExecutionCount(void* pStream) {
  if (pStream == NULL) {
    return;
  }

  SSqlStream* ps = (SSqlStream*)pStream;
  ps->num += 1;
}

H
hjxilinx 已提交
1341 1342
bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId) {
  if (pTableMetaInfo->pTableMeta == NULL) {
H
hzcheng 已提交
1343 1344 1345
    return false;
  }

1346
  if (colId == TSDB_TBNAME_COLUMN_INDEX) {
H
hzcheng 已提交
1347 1348 1349
    return true;
  }

H
hjxilinx 已提交
1350
  SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1351
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1352 1353
  
  int32_t  numOfTotal = tinfo.numOfTags + tinfo.numOfColumns;
H
hzcheng 已提交
1354 1355 1356 1357 1358 1359 1360 1361 1362 1363

  for (int32_t i = 0; i < numOfTotal; ++i) {
    if (pSchema[i].colId == colId) {
      return true;
    }
  }

  return false;
}

S
slguan 已提交
1364 1365
void tscTagCondCopy(STagCond* dest, const STagCond* src) {
  memset(dest, 0, sizeof(STagCond));
H
hjxilinx 已提交
1366

H
hjxilinx 已提交
1367 1368 1369
  if (src->tbnameCond.cond != NULL) {
    dest->tbnameCond.cond = strdup(src->tbnameCond.cond);
  }
S
slguan 已提交
1370 1371 1372 1373

  dest->tbnameCond.uid = src->tbnameCond.uid;

  memcpy(&dest->joinInfo, &src->joinInfo, sizeof(SJoinInfo));
1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393
  dest->relType = src->relType;
  
  if (src->pCond == NULL) {
    return;
  }
  
  size_t s = taosArrayGetSize(src->pCond);
  dest->pCond = taosArrayInit(s, sizeof(SCond));
  
  for (int32_t i = 0; i < s; ++i) {
    SCond* pCond = taosArrayGet(src->pCond, i);
    
    SCond c = {0};
    c.len = pCond->len;
    c.uid = pCond->uid;
    
    if (pCond->len > 0) {
      assert(pCond->cond != NULL);
      c.cond = malloc(c.len);
      memcpy(c.cond, pCond->cond, c.len);
H
hjxilinx 已提交
1394
    }
1395 1396
    
    taosArrayPush(dest->pCond, &c);
H
hzcheng 已提交
1397 1398 1399
  }
}

1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410
void tscTagCondRelease(STagCond* pTagCond) {
  free(pTagCond->tbnameCond.cond);
  
  if (pTagCond->pCond != NULL) {
    size_t s = taosArrayGetSize(pTagCond->pCond);
    for (int32_t i = 0; i < s; ++i) {
      SCond* p = taosArrayGet(pTagCond->pCond, i);
      tfree(p->cond);
    }
  
    taosArrayDestroy(pTagCond->pCond);
H
hzcheng 已提交
1411 1412
  }

1413
  memset(pTagCond, 0, sizeof(STagCond));
H
hzcheng 已提交
1414 1415
}

1416
void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo) {
H
hjxilinx 已提交
1417
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
1418
  SSchema*        pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1419 1420 1421
  
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
  for (int32_t i = 0; i < numOfExprs; ++i) {
1422
    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
S
slguan 已提交
1423
    pColInfo[i].functionId = pExpr->functionId;
H
hzcheng 已提交
1424

S
slguan 已提交
1425
    if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
H
hjxilinx 已提交
1426
      SSchema* pTagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
1427 1428 1429
      
      int16_t index = pExpr->colInfo.colIndex;
      pColInfo[i].type = (index != -1) ? pTagSchema[index].type : TSDB_DATA_TYPE_BINARY;
H
hzcheng 已提交
1430
    } else {
1431
      pColInfo[i].type = pSchema[pExpr->colInfo.colIndex].type;
H
hzcheng 已提交
1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443
    }
  }
}

void tscSetFreeHeatBeat(STscObj* pObj) {
  if (pObj == NULL || pObj->signature != pObj || pObj->pHb == NULL) {
    return;
  }

  SSqlObj* pHeatBeat = pObj->pHb;
  assert(pHeatBeat == pHeatBeat->signature);

S
slguan 已提交
1444
  // to denote the heart-beat timer close connection and free all allocated resources
1445 1446
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pHeatBeat->cmd, 0);
  pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
H
hzcheng 已提交
1447 1448 1449 1450
}

bool tscShouldFreeHeatBeat(SSqlObj* pHb) {
  assert(pHb == pHb->signature);
1451 1452 1453

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pHb->cmd, 0);
  return pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE;
H
hzcheng 已提交
1454 1455 1456
}

/*
weixin_48148422's avatar
weixin_48148422 已提交
1457
 * the following four kinds of SqlObj should not be freed
H
hzcheng 已提交
1458 1459 1460
 * 1. SqlObj for stream computing
 * 2. main SqlObj
 * 3. heartbeat SqlObj
weixin_48148422's avatar
weixin_48148422 已提交
1461
 * 4. SqlObj for subscription
H
hzcheng 已提交
1462 1463 1464 1465 1466 1467
 *
 * If res code is error and SqlObj does not belong to above types, it should be
 * automatically freed for async query, ignoring that connection should be kept.
 *
 * If connection need to be recycled, the SqlObj also should be freed.
 */
H
hjxilinx 已提交
1468
bool tscShouldBeFreed(SSqlObj* pSql) {
H
Haojun Liao 已提交
1469
  if (pSql == NULL || pSql->signature != pSql) {
H
hzcheng 已提交
1470 1471
    return false;
  }
H
Haojun Liao 已提交
1472 1473 1474
  
  assert(pSql->fp != NULL);
  
H
hzcheng 已提交
1475
  STscObj* pTscObj = pSql->pTscObj;
H
Haojun Liao 已提交
1476
  if (pSql->pStream != NULL || pTscObj->pHb == pSql || pSql->pSubscription != NULL) {
H
hzcheng 已提交
1477 1478 1479
    return false;
  }

H
Haojun Liao 已提交
1480
  // only the table meta and super table vgroup query will free resource automatically
H
hzcheng 已提交
1481
  int32_t command = pSql->cmd.command;
H
Haojun Liao 已提交
1482
  if (command == TSDB_SQL_META || command == TSDB_SQL_STABLEVGROUP) {
1483
    return true;
H
hzcheng 已提交
1484
  }
H
Haojun Liao 已提交
1485

H
Haojun Liao 已提交
1486
  return false;
H
hzcheng 已提交
1487 1488
}

1489 1490 1491
/**
 *
 * @param pCmd
1492
 * @param clauseIndex denote the index of the union sub clause, usually are 0, if no union query exists.
1493 1494 1495
 * @param tableIndex  denote the table index for join query, where more than one table exists
 * @return
 */
1496
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd* pCmd, int32_t clauseIndex, int32_t tableIndex) {
1497
  if (pCmd == NULL || pCmd->numOfClause == 0) {
S
slguan 已提交
1498 1499 1500
    return NULL;
  }

1501
  assert(clauseIndex >= 0 && clauseIndex < pCmd->numOfClause);
1502

1503
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hjxilinx 已提交
1504
  return tscGetMetaInfo(pQueryInfo, tableIndex);
S
slguan 已提交
1505 1506
}

H
hjxilinx 已提交
1507
STableMetaInfo* tscGetMetaInfo(SQueryInfo* pQueryInfo, int32_t tableIndex) {
H
hjxilinx 已提交
1508
  assert(pQueryInfo != NULL);
1509

H
hjxilinx 已提交
1510
  if (pQueryInfo->pTableMetaInfo == NULL) {
1511 1512 1513 1514
    assert(pQueryInfo->numOfTables == 0);
    return NULL;
  }

H
hjxilinx 已提交
1515
  assert(tableIndex >= 0 && tableIndex <= pQueryInfo->numOfTables && pQueryInfo->pTableMetaInfo != NULL);
1516

H
hjxilinx 已提交
1517
  return pQueryInfo->pTableMetaInfo[tableIndex];
1518 1519 1520
}

SQueryInfo* tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex) {
1521
  assert(pCmd != NULL && subClauseIndex >= 0 && subClauseIndex < TSDB_MAX_UNION_CLAUSE);
1522

1523
  if (pCmd->pQueryInfo == NULL || subClauseIndex >= pCmd->numOfClause) {
1524 1525 1526 1527 1528 1529
    return NULL;
  }

  return pCmd->pQueryInfo[subClauseIndex];
}

1530
int32_t tscGetQueryInfoDetailSafely(SSqlCmd* pCmd, int32_t subClauseIndex, SQueryInfo** pQueryInfo) {
1531
  int32_t ret = TSDB_CODE_SUCCESS;
1532

1533
  *pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex);
1534

1535 1536 1537 1538
  while ((*pQueryInfo) == NULL) {
    if ((ret = tscAddSubqueryInfo(pCmd)) != TSDB_CODE_SUCCESS) {
      return ret;
    }
1539

1540 1541
    (*pQueryInfo) = tscGetQueryInfoDetail(pCmd, subClauseIndex);
  }
1542

1543 1544 1545
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
1546
STableMetaInfo* tscGetTableMetaInfoByUid(SQueryInfo* pQueryInfo, uint64_t uid, int32_t* index) {
S
slguan 已提交
1547
  int32_t k = -1;
1548 1549

  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
1550
    if (pQueryInfo->pTableMetaInfo[i]->pTableMeta->uid == uid) {
S
slguan 已提交
1551 1552 1553 1554 1555 1556 1557 1558 1559
      k = i;
      break;
    }
  }

  if (index != NULL) {
    *index = k;
  }

H
hjxilinx 已提交
1560
  assert(k != -1);
H
hjxilinx 已提交
1561
  return tscGetMetaInfo(pQueryInfo, k);
S
slguan 已提交
1562 1563
}

H
hjxilinx 已提交
1564 1565 1566 1567 1568 1569 1570 1571 1572
void tscInitQueryInfo(SQueryInfo* pQueryInfo) {
  assert(pQueryInfo->fieldsInfo.pFields == NULL);
  pQueryInfo->fieldsInfo.pFields = taosArrayInit(4, sizeof(TAOS_FIELD));
  
  assert(pQueryInfo->fieldsInfo.pSupportInfo == NULL);
  pQueryInfo->fieldsInfo.pSupportInfo = taosArrayInit(4, sizeof(SFieldSupInfo));
  
  assert(pQueryInfo->exprList == NULL);
  pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES);
1573
  pQueryInfo->colList  = taosArrayInit(4, POINTER_BYTES);
H
hjxilinx 已提交
1574 1575
}

1576
int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) {
1577
  assert(pCmd != NULL);
1578

H
hjxilinx 已提交
1579
  // todo refactor: remove this structure
1580 1581
  size_t s = pCmd->numOfClause + 1;
  char*  tmp = realloc(pCmd->pQueryInfo, s * POINTER_BYTES);
1582
  if (tmp == NULL) {
1583
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1584
  }
1585 1586 1587 1588

  pCmd->pQueryInfo = (SQueryInfo**)tmp;

  SQueryInfo* pQueryInfo = calloc(1, sizeof(SQueryInfo));
H
hjxilinx 已提交
1589
  tscInitQueryInfo(pQueryInfo);
H
hjxilinx 已提交
1590
  
1591 1592 1593
  pQueryInfo->msg = pCmd->payload;  // pointer to the parent error message buffer

  pCmd->pQueryInfo[pCmd->numOfClause++] = pQueryInfo;
1594 1595 1596
  return TSDB_CODE_SUCCESS;
}

1597
static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) {
1598
  tscTagCondRelease(&pQueryInfo->tagCond);
H
hjxilinx 已提交
1599
  tscFieldInfoClear(&pQueryInfo->fieldsInfo);
1600

H
hjxilinx 已提交
1601 1602
  tscSqlExprInfoDestroy(pQueryInfo->exprList);
  pQueryInfo->exprList = NULL;
1603

1604
  tscColumnListDestroy(pQueryInfo->colList);
H
hjxilinx 已提交
1605
  pQueryInfo->colList = NULL;
1606

1607 1608 1609 1610 1611
  if (pQueryInfo->groupbyExpr.columnInfo != NULL) {
    taosArrayDestroy(pQueryInfo->groupbyExpr.columnInfo);
    pQueryInfo->groupbyExpr.columnInfo = NULL;
  }
  
1612
  pQueryInfo->tsBuf = tsBufDestory(pQueryInfo->tsBuf);
1613

1614
  tfree(pQueryInfo->fillVal);
1615
}
1616

1617
void tscClearSubqueryInfo(SSqlCmd* pCmd) {
1618
  for (int32_t i = 0; i < pCmd->numOfClause; ++i) {
1619
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i);
1620
    freeQueryInfoImpl(pQueryInfo);
1621
  }
1622 1623
}

1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644
void doRemoveTableMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFromCache) {
  if (index < 0 || index >= pQueryInfo->numOfTables) {
    return;
  }
  
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index);
  
  tscClearTableMetaInfo(pTableMetaInfo, removeFromCache);
  free(pTableMetaInfo);
  
  int32_t after = pQueryInfo->numOfTables - index - 1;
  if (after > 0) {
    memmove(&pQueryInfo->pTableMetaInfo[index], &pQueryInfo->pTableMetaInfo[index + 1], after * POINTER_BYTES);
  }
  
  pQueryInfo->numOfTables -= 1;
}

void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) {
  tscTrace("%p deref the table meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables);
  
1645 1646 1647 1648 1649
  for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
  
    tscClearTableMetaInfo(pTableMetaInfo, removeFromCache);
    free(pTableMetaInfo);
1650 1651 1652 1653 1654
  }
  
  tfree(pQueryInfo->pTableMetaInfo);
}

1655
STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta,
1656
                                    SVgroupsInfo* vgroupList, SArray* pTagCols) {
H
hjxilinx 已提交
1657
  void* pAlloc = realloc(pQueryInfo->pTableMetaInfo, (pQueryInfo->numOfTables + 1) * POINTER_BYTES);
S
slguan 已提交
1658 1659 1660 1661
  if (pAlloc == NULL) {
    return NULL;
  }

H
hjxilinx 已提交
1662 1663
  pQueryInfo->pTableMetaInfo = pAlloc;
  pQueryInfo->pTableMetaInfo[pQueryInfo->numOfTables] = calloc(1, sizeof(STableMetaInfo));
S
slguan 已提交
1664

H
hjxilinx 已提交
1665
  STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[pQueryInfo->numOfTables];
H
hjxilinx 已提交
1666
  assert(pTableMetaInfo != NULL);
S
slguan 已提交
1667 1668

  if (name != NULL) {
B
Bomin Zhang 已提交
1669
    tstrncpy(pTableMetaInfo->name, name, sizeof(pTableMetaInfo->name));
S
slguan 已提交
1670 1671
  }

H
hjxilinx 已提交
1672
  pTableMetaInfo->pTableMeta = pTableMeta;
1673 1674
  
  if (vgroupList != NULL) {
H
hjxilinx 已提交
1675
    size_t size = sizeof(SVgroupsInfo) + sizeof(SCMVgroupInfo) * vgroupList->numOfVgroups;
1676 1677
    pTableMetaInfo->vgroupList = malloc(size);
    memcpy(pTableMetaInfo->vgroupList, vgroupList, size);
1678
  }
S
slguan 已提交
1679

1680 1681 1682
  pTableMetaInfo->tagColList = taosArrayInit(4, POINTER_BYTES);
  if (pTagCols != NULL) {
    tscColumnListCopy(pTableMetaInfo->tagColList, pTagCols, -1);
S
slguan 已提交
1683
  }
1684
  
1685
  pQueryInfo->numOfTables += 1;
H
hjxilinx 已提交
1686
  return pTableMetaInfo;
S
slguan 已提交
1687 1688
}

H
hjxilinx 已提交
1689
STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo* pQueryInfo) {
1690
  return tscAddTableMetaInfo(pQueryInfo, NULL, NULL, NULL, NULL);
1691
}
S
slguan 已提交
1692

1693
void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache) {
H
hjxilinx 已提交
1694
  if (pTableMetaInfo == NULL) {
S
slguan 已提交
1695 1696 1697
    return;
  }

H
hjxilinx 已提交
1698
  taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache);
1699
  tfree(pTableMetaInfo->vgroupList);
1700
  
1701 1702
  tscColumnListDestroy(pTableMetaInfo->tagColList);
  pTableMetaInfo->tagColList = NULL;
S
slguan 已提交
1703 1704 1705
}

void tscResetForNextRetrieve(SSqlRes* pRes) {
H
hjxilinx 已提交
1706 1707 1708
  if (pRes == NULL) {
    return;
  }
1709

S
slguan 已提交
1710 1711 1712 1713
  pRes->row = 0;
  pRes->numOfRows = 0;
}

1714
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, int32_t cmd, SSqlObj* pPrevSql) {
1715
  SSqlCmd* pCmd = &pSql->cmd;
S
slguan 已提交
1716 1717
  SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
  if (pNew == NULL) {
1718
    tscError("%p new subquery failed, tableIndex:%d", pSql, tableIndex);
S
slguan 已提交
1719 1720
    return NULL;
  }
1721 1722
  
  STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, tableIndex);
S
slguan 已提交
1723 1724 1725 1726 1727 1728

  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

  pNew->sqlstr = strdup(pSql->sqlstr);
  if (pNew->sqlstr == NULL) {
H
hjxilinx 已提交
1729
    tscError("%p new subquery failed, tableIndex:%d, vgroupIndex:%d", pSql, tableIndex, pTableMetaInfo->vgroupIndex);
S
slguan 已提交
1730 1731 1732 1733 1734

    free(pNew);
    return NULL;
  }

1735 1736 1737 1738 1739 1740
  SSqlCmd* pnCmd = &pNew->cmd;
  memcpy(pnCmd, pCmd, sizeof(SSqlCmd));
  
  pnCmd->command = cmd;
  pnCmd->payload = NULL;
  pnCmd->allocSize = 0;
S
slguan 已提交
1741

1742 1743 1744 1745
  pnCmd->pQueryInfo = NULL;
  pnCmd->numOfClause = 0;
  pnCmd->clauseIndex = 0;
  pnCmd->pDataBlocks = NULL;
1746

1747
  if (tscAddSubqueryInfo(pnCmd) != TSDB_CODE_SUCCESS) {
1748 1749 1750
    tscFreeSqlObj(pNew);
    return NULL;
  }
1751

1752
  SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(pnCmd, 0);
1753
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1754

H
hjxilinx 已提交
1755 1756 1757
  pNewQueryInfo->command = pQueryInfo->command;
  pNewQueryInfo->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
  pNewQueryInfo->intervalTime = pQueryInfo->intervalTime;
H
hjxilinx 已提交
1758 1759 1760 1761
  pNewQueryInfo->slidingTime  = pQueryInfo->slidingTime;
  pNewQueryInfo->type   = pQueryInfo->type;
  pNewQueryInfo->window = pQueryInfo->window;
  pNewQueryInfo->limit  = pQueryInfo->limit;
H
hjxilinx 已提交
1762
  pNewQueryInfo->slimit = pQueryInfo->slimit;
H
hjxilinx 已提交
1763
  pNewQueryInfo->order  = pQueryInfo->order;
H
Haojun Liao 已提交
1764 1765
  pNewQueryInfo->tsBuf  = NULL;
  pNewQueryInfo->fillType = pQueryInfo->fillType;
1766
  pNewQueryInfo->fillVal  = NULL;
H
Haojun Liao 已提交
1767
  pNewQueryInfo->clauseLimit = pQueryInfo->clauseLimit;
1768
  pNewQueryInfo->numOfTables = 0;
H
Haojun Liao 已提交
1769
  pNewQueryInfo->pTableMetaInfo = NULL;
H
hjxilinx 已提交
1770
  
H
hjxilinx 已提交
1771 1772 1773 1774 1775
  pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr;
  if (pQueryInfo->groupbyExpr.columnInfo != NULL) {
    pNewQueryInfo->groupbyExpr.columnInfo = taosArrayClone(pQueryInfo->groupbyExpr.columnInfo);
  }
  
1776
  tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond);
1777

1778
  if (pQueryInfo->fillType != TSDB_FILL_NONE) {
1779 1780
    pNewQueryInfo->fillVal = malloc(pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t));
    memcpy(pNewQueryInfo->fillVal, pQueryInfo->fillVal, pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t));
1781
  }
1782

1783
  if (tscAllocPayload(pnCmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
1784
    tscError("%p new subquery failed, tableIndex:%d, vgroupIndex:%d", pSql, tableIndex, pTableMetaInfo->vgroupIndex);
S
slguan 已提交
1785 1786 1787
    tscFreeSqlObj(pNew);
    return NULL;
  }
1788
  
H
hjxilinx 已提交
1789
  tscColumnListCopy(pNewQueryInfo->colList, pQueryInfo->colList, (int16_t)tableIndex);
1790

S
slguan 已提交
1791 1792
  // set the correct query type
  if (pPrevSql != NULL) {
1793
    SQueryInfo* pPrevQueryInfo = tscGetQueryInfoDetail(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex);
1794
    pNewQueryInfo->type = pPrevQueryInfo->type;
S
slguan 已提交
1795
  } else {
1796
    pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;  // it must be the subquery
S
slguan 已提交
1797 1798
  }

H
hjxilinx 已提交
1799
  uint64_t uid = pTableMetaInfo->pTableMeta->uid;
H
hjxilinx 已提交
1800
  tscSqlExprCopy(pNewQueryInfo->exprList, pQueryInfo->exprList, uid, true);
S
slguan 已提交
1801

H
hjxilinx 已提交
1802
  int32_t numOfOutput = tscSqlExprNumOfExprs(pNewQueryInfo);
S
slguan 已提交
1803

H
hjxilinx 已提交
1804
  if (numOfOutput > 0) {  // todo refactor to extract method
H
hjxilinx 已提交
1805
    size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
H
hjxilinx 已提交
1806 1807 1808
    SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
    
    for (int32_t i = 0; i < numOfExprs; ++i) {
1809
      SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
H
hjxilinx 已提交
1810
      
S
slguan 已提交
1811
      if (pExpr->uid == uid) {
H
hjxilinx 已提交
1812 1813 1814 1815 1816
        TAOS_FIELD* p = tscFieldInfoGetField(pFieldInfo, i);
        SFieldSupInfo* pInfo = tscFieldInfoGetSupp(pFieldInfo, i);
  
        SFieldSupInfo* pInfo1 = tscFieldInfoAppend(&pNewQueryInfo->fieldsInfo, p);
        *pInfo1 = *pInfo;
S
slguan 已提交
1817 1818 1819
      }
    }

H
hjxilinx 已提交
1820
    // make sure the the sqlExpr for each fields is correct
H
hjxilinx 已提交
1821
// todo handle the agg arithmetic expression
H
hjxilinx 已提交
1822 1823
    for(int32_t f = 0; f < pNewQueryInfo->fieldsInfo.numOfOutput; ++f) {
      TAOS_FIELD* field = tscFieldInfoGetField(&pNewQueryInfo->fieldsInfo, f);
H
hjxilinx 已提交
1824 1825
      numOfExprs = tscSqlExprNumOfExprs(pNewQueryInfo);
      
H
hjxilinx 已提交
1826
      for(int32_t k1 = 0; k1 < numOfExprs; ++k1) {
H
hjxilinx 已提交
1827
        SSqlExpr* pExpr1 = tscSqlExprGet(pNewQueryInfo, k1);
H
hjxilinx 已提交
1828 1829
        
        if (strcmp(field->name, pExpr1->aliasName) == 0) {  // eatablish link according to the result field name
H
hjxilinx 已提交
1830 1831
          SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pNewQueryInfo->fieldsInfo, f);
          pInfo->pSqlExpr = pExpr1;
H
hjxilinx 已提交
1832 1833 1834
        }
      }
    }
H
hjxilinx 已提交
1835 1836
  
    tscFieldInfoUpdateOffset(pNewQueryInfo);
S
slguan 已提交
1837 1838 1839 1840
  }

  pNew->fp = fp;
  pNew->param = param;
1841
  pNew->maxRetry = TSDB_MAX_REPLICA_NUM;
H
hjxilinx 已提交
1842

1843
  char* name = pTableMetaInfo->name;
H
hjxilinx 已提交
1844
  STableMetaInfo* pFinalInfo = NULL;
S
slguan 已提交
1845 1846

  if (pPrevSql == NULL) {
1847
    STableMeta* pTableMeta = taosCacheAcquireByData(tscCacheHandle, pTableMetaInfo->pTableMeta);  // get by name may failed due to the cache cleanup
1848
    assert(pTableMeta != NULL);
1849
    pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList, pTableMetaInfo->tagColList);
1850 1851
  } else {  // transfer the ownership of pTableMeta to the newly create sql object.
    STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
1852

1853
    STableMeta*  pPrevTableMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pTableMeta);
1854
    
1855 1856
    SVgroupsInfo* pVgroupsInfo = pPrevInfo->vgroupList;
    pPrevInfo->vgroupList = NULL;
1857
    pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pPrevTableMeta, pVgroupsInfo, pTableMetaInfo->tagColList);
S
slguan 已提交
1858 1859
  }

L
lihui 已提交
1860
  if (pFinalInfo->pTableMeta == NULL) {
H
Haojun Liao 已提交
1861
    tscError("%p new subquery failed for get tableMeta is NULL from cache", pSql);
L
lihui 已提交
1862 1863 1864 1865 1866 1867
    tscFreeSqlObj(pNew);
    return NULL;
  }
  
  assert(pNewQueryInfo->numOfTables == 1);
  
weixin_48148422's avatar
weixin_48148422 已提交
1868
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
1869
    assert(pFinalInfo->vgroupList != NULL);
S
slguan 已提交
1870
  }
H
hjxilinx 已提交
1871
  
1872
  if (cmd == TSDB_SQL_SELECT) {
1873 1874
    size_t size = taosArrayGetSize(pNewQueryInfo->colList);
    
1875
    tscTrace(
H
hjxilinx 已提交
1876
        "%p new subquery:%p, tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%d, colList:%d,"
1877
        "fieldInfo:%d, name:%s, qrang:%" PRId64 " - %" PRId64 " order:%d, limit:%" PRId64,
H
hjxilinx 已提交
1878
        pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo),
H
hjxilinx 已提交
1879 1880
        size, pNewQueryInfo->fieldsInfo.numOfOutput, pFinalInfo->name, pNewQueryInfo->window.skey,
        pNewQueryInfo->window.ekey, pNewQueryInfo->order.order, pNewQueryInfo->limit.limit);
1881 1882 1883
    
    tscPrintSelectClause(pNew, 0);
  } else {
H
hjxilinx 已提交
1884
    tscTrace("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vgroupIndex);
1885
  }
1886

S
slguan 已提交
1887 1888 1889
  return pNew;
}

H
hjxilinx 已提交
1890 1891 1892 1893 1894
/**
 * To decide if current is a two-stage super table query, join query, or insert. And invoke different
 * procedure accordingly
 * @param pSql
 */
H
hzcheng 已提交
1895 1896
void tscDoQuery(SSqlObj* pSql) {
  SSqlCmd* pCmd = &pSql->cmd;
H
hjxilinx 已提交
1897
  SSqlRes* pRes = &pSql->res;
H
hjxilinx 已提交
1898
  
H
hjxilinx 已提交
1899
  pRes->code = TSDB_CODE_SUCCESS;
H
hjxilinx 已提交
1900
  
H
hzcheng 已提交
1901 1902
  if (pCmd->command > TSDB_SQL_LOCAL) {
    tscProcessLocalCmd(pSql);
H
hjxilinx 已提交
1903 1904 1905 1906 1907 1908 1909 1910 1911
    return;
  }
  
  if (pCmd->command == TSDB_SQL_SELECT) {
    tscAddIntoSqlList(pSql);
  }

  if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) {
    tscProcessMultiVnodesInsertFromFile(pSql);
H
hzcheng 已提交
1912
  } else {
H
hjxilinx 已提交
1913 1914 1915 1916 1917 1918
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
    uint16_t type = pQueryInfo->type;
  
    if (pSql->fp == (void(*)())tscHandleMultivnodeInsert) {  // multi-vnodes insertion
      tscHandleMultivnodeInsert(pSql);
      return;
H
hzcheng 已提交
1919
    }
H
hjxilinx 已提交
1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934
  
    if (QUERY_IS_JOIN_QUERY(type)) {
      if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) {
        tscHandleMasterJoinQuery(pSql);
        return;
      } else {
        // for first stage sub query, iterate all vnodes to get all timestamp
        if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
//          doProcessSql(pSql);
          assert(0);
        }
      }
    } else if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {  // super table query
      tscHandleMasterSTableQuery(pSql);
      return;
S
slguan 已提交
1935
    }
H
hjxilinx 已提交
1936 1937
    
    tscProcessSql(pSql);
H
hzcheng 已提交
1938 1939
  }
}
S
slguan 已提交
1940

H
hjxilinx 已提交
1941
int16_t tscGetJoinTagColIndexByUid(STagCond* pTagCond, uint64_t uid) {
S
slguan 已提交
1942 1943 1944 1945 1946 1947
  if (pTagCond->joinInfo.left.uid == uid) {
    return pTagCond->joinInfo.left.tagCol;
  } else {
    return pTagCond->joinInfo.right.tagCol;
  }
}
1948

H
Haojun Liao 已提交
1949 1950
bool tscIsUpdateQuery(SSqlObj* pSql) {
  if (pSql == NULL || pSql->signature != pSql) {
1951 1952
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    return TSDB_CODE_TSC_DISCONNECTED;
1953 1954
  }

H
Haojun Liao 已提交
1955 1956
  SSqlCmd* pCmd = &pSql->cmd;
  return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) || TSDB_SQL_USE_DB == pCmd->command);
H
hjxilinx 已提交
1957
}
1958

H
hjxilinx 已提交
1959 1960 1961 1962 1963
int32_t tscInvalidSQLErrMsg(char* msg, const char* additionalInfo, const char* sql) {
  const char* msgFormat1 = "invalid SQL: %s";
  const char* msgFormat2 = "invalid SQL: syntax error near \"%s\" (%s)";
  const char* msgFormat3 = "invalid SQL: syntax error near \"%s\"";

H
hjxilinx 已提交
1964
  const int32_t BACKWARD_CHAR_STEP = 0;
H
hjxilinx 已提交
1965

H
hjxilinx 已提交
1966 1967 1968
  if (sql == NULL) {
    assert(additionalInfo != NULL);
    sprintf(msg, msgFormat1, additionalInfo);
1969
    return TSDB_CODE_TSC_INVALID_SQL;
H
hjxilinx 已提交
1970
  }
H
hjxilinx 已提交
1971 1972

  char buf[64] = {0};  // only extract part of sql string
H
hjxilinx 已提交
1973
  strncpy(buf, (sql - BACKWARD_CHAR_STEP), tListLen(buf) - 1);
H
hjxilinx 已提交
1974

H
hjxilinx 已提交
1975 1976 1977
  if (additionalInfo != NULL) {
    sprintf(msg, msgFormat2, buf, additionalInfo);
  } else {
H
hjxilinx 已提交
1978
    sprintf(msg, msgFormat3, buf);  // no additional information for invalid sql error
H
hjxilinx 已提交
1979
  }
H
hjxilinx 已提交
1980

1981
  return TSDB_CODE_TSC_INVALID_SQL;
1982
}
H
hjxilinx 已提交
1983

H
hjxilinx 已提交
1984 1985
bool tscHasReachLimitation(SQueryInfo* pQueryInfo, SSqlRes* pRes) {
  assert(pQueryInfo != NULL && pQueryInfo->clauseLimit != 0);
H
Haojun Liao 已提交
1986
  return (pQueryInfo->clauseLimit > 0 && pRes->numOfClauseTotal >= pQueryInfo->clauseLimit);
H
hjxilinx 已提交
1987
}
1988 1989

char* tscGetErrorMsgPayload(SSqlCmd* pCmd) { return pCmd->payload; }
1990 1991 1992

/**
 *  If current vnode query does not return results anymore (pRes->numOfRows == 0), try the next vnode if exists,
1993
 *  while multi-vnode super table projection query and the result does not reach the limitation.
1994
 */
1995
bool hasMoreVnodesToTry(SSqlObj* pSql) {
1996 1997 1998 1999 2000
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;
  if (pCmd->command != TSDB_SQL_FETCH) {
    return false;
  }
2001

2002
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
2003
  
2004 2005 2006
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  assert(pRes->completed);
  
2007
  // for normal table, no need to try any more if results are all retrieved from one vnode
H
hjLiao 已提交
2008
  if (!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) || (pTableMetaInfo->vgroupList == NULL)) {
2009
    return false;
2010
  }
2011
  
2012 2013 2014
  int32_t numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
  return tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) &&
         (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vgroupIndex < numOfVgroups - 1);
2015 2016
}

2017 2018 2019 2020 2021 2022
void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

2023 2024 2025 2026
  /*
   * no result returned from the current virtual node anymore, try the next vnode if exists
   * if case of: multi-vnode super table projection query
   */
2027
  assert(pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes));
2028

H
hjxilinx 已提交
2029
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
2030 2031 2032
  
  int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
  while (++pTableMetaInfo->vgroupIndex < totalVgroups) {
2033
    tscTrace("%p results from vgroup index:%d completed, try next:%d. total vgroups:%d. current numOfRes:%d", pSql,
H
Haojun Liao 已提交
2034
             pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, pRes->numOfClauseTotal);
2035

2036 2037 2038 2039 2040 2041
    /*
     * update the limit and offset value for the query on the next vnode,
     * according to current retrieval results
     *
     * NOTE:
     * if the pRes->offset is larger than 0, the start returned position has not reached yet.
H
Haojun Liao 已提交
2042
     * Therefore, the pRes->numOfRows, as well as pRes->numOfClauseTotal, must be 0.
2043 2044 2045
     * The pRes->offset value will be updated by virtual node, during query execution.
     */
    if (pQueryInfo->clauseLimit >= 0) {
H
Haojun Liao 已提交
2046
      pQueryInfo->limit.limit = pQueryInfo->clauseLimit - pRes->numOfClauseTotal;
2047
    }
2048

2049 2050
    pQueryInfo->limit.offset = pRes->offset;
    assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0));
2051 2052 2053
    
    tscTrace("%p new query to next vgroup, index:%d, limit:%" PRId64 ", offset:%" PRId64 ", glimit:%" PRId64,
        pSql, pTableMetaInfo->vgroupIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, pQueryInfo->clauseLimit);
2054

2055 2056 2057 2058 2059 2060 2061 2062
    /*
     * For project query with super table join, the numOfSub is equalled to the number of all subqueries.
     * Therefore, we need to reset the value of numOfSubs to be 0.
     *
     * For super table join with projection query, if anyone of the subquery is exhausted, the query completed.
     */
    pSql->numOfSubs = 0;
    pCmd->command = TSDB_SQL_SELECT;
2063

2064
    tscResetForNextRetrieve(pRes);
2065

2066
    // set the callback function
2067
    pSql->fp = fp;
2068 2069
    int32_t ret = tscProcessSql(pSql);
    if (ret == TSDB_CODE_SUCCESS) {
2070
      return;
2071
    } else {// todo check for failure
2072 2073 2074
    }
  }
}
2075 2076 2077 2078 2079 2080 2081 2082

void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) {
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;

  // current subclause is completed, try the next subclause
  assert(pCmd->clauseIndex < pCmd->numOfClause - 1);

H
hjxilinx 已提交
2083
  pCmd->clauseIndex++;
2084 2085 2086 2087
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

  pSql->cmd.command = pQueryInfo->command;

2088
  //backup the total number of result first
H
Haojun Liao 已提交
2089
  int64_t num = pRes->numOfTotal + pRes->numOfClauseTotal;
H
hjxilinx 已提交
2090
  tscFreeSqlResult(pSql);
2091 2092 2093
  
  pRes->numOfTotal = num;
  
2094
  tfree(pSql->pSubs);
2095 2096
  pSql->numOfSubs = 0;
  
2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108
  if (pSql->fp != NULL) {
    pSql->fp = queryFp;
    assert(queryFp != NULL);
  }

  tscTrace("%p try data in the next subclause:%d, total subclause:%d", pSql, pCmd->clauseIndex, pCmd->numOfClause);
  if (pCmd->command > TSDB_SQL_LOCAL) {
    tscProcessLocalCmd(pSql);
  } else {
    tscProcessSql(pSql);
  }
}
H
hjxilinx 已提交
2109

2110
void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) {
H
Haojun Liao 已提交
2111
  SFieldSupInfo* pInfo = taosArrayGet(pFieldInfo->pSupportInfo, columnIndex);//tscFieldInfoGetSupp(pFieldInfo, columnIndex);
2112 2113
  assert(pInfo->pSqlExpr != NULL);

H
hjxilinx 已提交
2114
  int32_t type = pInfo->pSqlExpr->resType;
2115 2116
  int32_t bytes = pInfo->pSqlExpr->resBytes;
  
2117
  char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows + bytes * pRes->row;
H
hjxilinx 已提交
2118
  
H
hjxilinx 已提交
2119
  if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) {
2120
    int32_t realLen = varDataLen(pData);
H
hjxilinx 已提交
2121 2122
    assert(realLen <= bytes - VARSTR_HEADER_SIZE);
    
H
Haojun Liao 已提交
2123 2124 2125 2126 2127 2128
    if (isNull(pData, type)) {
      pRes->tsrow[columnIndex] = NULL;
    } else {
      pRes->tsrow[columnIndex] = pData + VARSTR_HEADER_SIZE;
    }
  
2129
    if (realLen < pInfo->pSqlExpr->resBytes - VARSTR_HEADER_SIZE) { // todo refactor
2130
      *(pData + realLen + VARSTR_HEADER_SIZE) = 0;
H
hjxilinx 已提交
2131 2132
    }
    
2133
    pRes->length[columnIndex] = realLen;
H
hjxilinx 已提交
2134
  } else {
2135 2136
    assert(bytes == tDataTypeDesc[type].nSize);
    
H
Haojun Liao 已提交
2137 2138 2139 2140 2141 2142
    if (isNull(pData, type)) {
      pRes->tsrow[columnIndex] = NULL;
    } else {
      pRes->tsrow[columnIndex] = pData;
    }
    
2143
    pRes->length[columnIndex] = bytes;
H
hjxilinx 已提交
2144
  }
H
hjxilinx 已提交
2145 2146
}

weixin_48148422's avatar
weixin_48148422 已提交
2147 2148 2149
void* malloc_throw(size_t size) {
  void* p = malloc(size);
  if (p == NULL) {
2150
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
weixin_48148422's avatar
weixin_48148422 已提交
2151 2152 2153 2154 2155
  }
  return p;
}

void* calloc_throw(size_t nmemb, size_t size) {
weixin_48148422's avatar
weixin_48148422 已提交
2156
  void* p = calloc(nmemb, size);
weixin_48148422's avatar
weixin_48148422 已提交
2157
  if (p == NULL) {
2158
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
weixin_48148422's avatar
weixin_48148422 已提交
2159 2160 2161 2162 2163 2164 2165
  }
  return p;
}

char* strdup_throw(const char* str) {
  char* p = strdup(str);
  if (p == NULL) {
2166
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
weixin_48148422's avatar
weixin_48148422 已提交
2167 2168 2169
  }
  return p;
}
2170 2171 2172 2173 2174 2175

int tscSetMgmtIpListFromCfg(const char *first, const char *second) {
  tscMgmtIpSet.numOfIps = 0;
  tscMgmtIpSet.inUse = 0;

  if (first && first[0] != 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
2176
    if (strlen(first) >= TSDB_EP_LEN) {
2177
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
2178 2179
      return -1;
    }
2180 2181 2182 2183 2184
    taosGetFqdnPortFromEp(first, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]);
    tscMgmtIpSet.numOfIps++;
  }

  if (second && second[0] != 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
2185
    if (strlen(second) >= TSDB_EP_LEN) {
2186
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
2187 2188
      return -1;
    }
2189 2190 2191 2192
    taosGetFqdnPortFromEp(second, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]);
    tscMgmtIpSet.numOfIps++;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
2193
  if ( tscMgmtIpSet.numOfIps == 0) {
2194
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
2195 2196
    return -1;
  }
2197 2198 2199

  return 0;
}