tscUtil.c 64.5 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "os.h"
H
Haojun Liao 已提交
17 18
#include "hash.h"
#include "tscUtil.h"
H
hzcheng 已提交
19
#include "taosmsg.h"
H
Haojun Liao 已提交
20
#include "qast.h"
H
hzcheng 已提交
21 22 23
#include "tcache.h"
#include "tkey.h"
#include "tmd5.h"
24
#include "tscLocalMerge.h"
H
Haojun Liao 已提交
25 26
#include "tscLog.h"
#include "tscProfile.h"
H
hjxilinx 已提交
27
#include "tscSubquery.h"
H
hzcheng 已提交
28 29 30
#include "tschemautil.h"
#include "tsclient.h"
#include "ttimer.h"
31
#include "ttokendef.h"
H
hzcheng 已提交
32

33 34
static void freeQueryInfoImpl(SQueryInfo* pQueryInfo);
static void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache);
35

36
  SCond* tsGetSTableQueryCond(STagCond* pTagCond, uint64_t uid) {
37 38 39 40 41 42 43 44 45 46
  if (pTagCond->pCond == NULL) {
    return NULL;
  }
  
  size_t size = taosArrayGetSize(pTagCond->pCond);
  for (int32_t i = 0; i < size; ++i) {
    SCond* pCond = taosArrayGet(pTagCond->pCond, i);
    
    if (uid == pCond->uid) {
      return pCond;
S
slguan 已提交
47 48 49 50 51 52
    }
  }

  return NULL;
}

53 54
void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, SBufferWriter* bw) {
  if (tbufTell(bw) == 0) {
S
slguan 已提交
55
    return;
H
hzcheng 已提交
56
  }
57 58 59
  
  SCond cond = {
    .uid = uid,
60
    .len = tbufTell(bw),
61 62 63
    .cond = NULL,
  };
  
64
  cond.cond = tbufGetData(bw, true);
65 66 67 68 69 70
  
  if (pTagCond->pCond == NULL) {
    pTagCond->pCond = taosArrayInit(3, sizeof(SCond));
  }
  
  taosArrayPush(pTagCond->pCond, &cond);
S
slguan 已提交
71 72
}

H
hjxilinx 已提交
73
bool tscQueryTags(SQueryInfo* pQueryInfo) {
H
hjxilinx 已提交
74
  for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
H
Haojun Liao 已提交
75 76 77 78 79 80 81 82
    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
    int32_t functId = pExpr->functionId;

    // "select count(tbname)" query
    if (functId == TSDB_FUNC_COUNT && pExpr->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
      continue;
    }

83
    if (functId != TSDB_FUNC_TAGPRJ && functId != TSDB_FUNC_TID_TAG) {
S
slguan 已提交
84 85 86
      return false;
    }
  }
H
hzcheng 已提交
87

S
slguan 已提交
88
  return true;
H
hzcheng 已提交
89 90
}

H
Haojun Liao 已提交
91 92
// todo refactor, extract methods and move the common module
void tscGetDBInfoFromTableFullName(char* tableId, char* db) {
S
slguan 已提交
93
  char* st = strstr(tableId, TS_PATH_DELIMITER);
H
hzcheng 已提交
94 95 96
  if (st != NULL) {
    char* end = strstr(st + 1, TS_PATH_DELIMITER);
    if (end != NULL) {
S
slguan 已提交
97 98
      memcpy(db, tableId, (end - tableId));
      db[end - tableId] = 0;
H
hzcheng 已提交
99 100 101 102 103 104 105
      return;
    }
  }

  db[0] = 0;
}

H
hjxilinx 已提交
106
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
107 108 109
  if (pQueryInfo == NULL) {
    return false;
  }
110

H
hjxilinx 已提交
111
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
H
hjxilinx 已提交
112
  if (pTableMetaInfo == NULL) {
113 114 115
    return false;
  }
  
H
hjxilinx 已提交
116
  // for select query super table, the super table vgroup list can not be null in any cases.
weixin_48148422's avatar
weixin_48148422 已提交
117
  if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
118
    assert(pTableMetaInfo->vgroupList != NULL);
119 120
  }
  
H
hjxilinx 已提交
121 122
  if ((pQueryInfo->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE) {
    return false;
S
slguan 已提交
123 124
  }

125 126
  // for ordered projection query, iterate all qualified vnodes sequentially
  if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, tableIndex)) {
H
hzcheng 已提交
127 128 129
    return false;
  }

130
  if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->command == TSDB_SQL_SELECT) {
weixin_48148422's avatar
weixin_48148422 已提交
131
    return UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
H
hzcheng 已提交
132 133 134 135 136
  }

  return false;
}

137
bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
H
hjxilinx 已提交
138
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
139
  
H
hzcheng 已提交
140
  /*
141
   * In following cases, return false for non ordered project query on super table
H
Haojun Liao 已提交
142
   * 1. failed to get tableMeta from server; 2. not a super table; 3. limitation is 0;
143
   * 4. show queries, instead of a select query
H
hzcheng 已提交
144
   */
H
hjxilinx 已提交
145
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
weixin_48148422's avatar
weixin_48148422 已提交
146
  if (pTableMetaInfo == NULL || !UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) ||
H
hjxilinx 已提交
147
      pQueryInfo->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || numOfExprs == 0) {
H
hzcheng 已提交
148 149
    return false;
  }
150
  
H
hjxilinx 已提交
151
  for (int32_t i = 0; i < numOfExprs; ++i) {
152
    int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId;
H
Haojun Liao 已提交
153 154 155 156 157 158 159 160

    if (functionId != TSDB_FUNC_PRJ &&
        functionId != TSDB_FUNC_TAGPRJ &&
        functionId != TSDB_FUNC_TAG &&
        functionId != TSDB_FUNC_TS &&
        functionId != TSDB_FUNC_ARITHM &&
        functionId != TSDB_FUNC_TS_COMP &&
        functionId != TSDB_FUNC_TID_TAG) {
S
slguan 已提交
161
      return false;
H
hzcheng 已提交
162 163
    }
  }
164
  
S
slguan 已提交
165
  return true;
H
hzcheng 已提交
166 167
}

H
Haojun Liao 已提交
168
// not order by timestamp projection query on super table
169 170 171 172 173
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
  if (!tscIsProjectionQueryOnSTable(pQueryInfo, tableIndex)) {
    return false;
  }
  
174
  // order by columnIndex exists, not a non-ordered projection query
175 176 177 178 179 180 181 182
  return pQueryInfo->order.orderColId < 0;
}

bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
  if (!tscIsProjectionQueryOnSTable(pQueryInfo, tableIndex)) {
    return false;
  }
  
183
  // order by columnIndex exists, a non-ordered projection query
184 185 186
  return pQueryInfo->order.orderColId >= 0;
}

H
Haojun Liao 已提交
187 188 189 190
bool tscIsProjectionQuery(SQueryInfo* pQueryInfo) {
  size_t size = tscSqlExprNumOfExprs(pQueryInfo);

  for (int32_t i = 0; i < size; ++i) {
191
    int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId;
H
Haojun Liao 已提交
192 193 194

    if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TAG &&
        functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_ARITHM) {
H
hjxilinx 已提交
195 196 197
      return false;
    }
  }
H
hjxilinx 已提交
198

H
hjxilinx 已提交
199 200 201
  return true;
}

202
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo) {
H
hjxilinx 已提交
203 204 205
  size_t size = tscSqlExprNumOfExprs(pQueryInfo);
  
  for (int32_t i = 0; i < size; ++i) {
206
    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
H
Haojun Liao 已提交
207 208 209 210
    assert(pExpr != NULL);
//    if (pExpr == NULL) {
//      return false;
//    }
H
hzcheng 已提交
211

S
slguan 已提交
212
    int32_t functionId = pExpr->functionId;
H
hzcheng 已提交
213 214 215 216 217 218 219 220
    if (functionId == TSDB_FUNC_TAG) {
      continue;
    }

    if (functionId != TSDB_FUNC_INTERP) {
      return false;
    }
  }
H
Haojun Liao 已提交
221

H
hzcheng 已提交
222 223 224
  return true;
}

225
bool tscIsTWAQuery(SQueryInfo* pQueryInfo) {
H
hjxilinx 已提交
226 227
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
  for (int32_t i = 0; i < numOfExprs; ++i) {
228
    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
S
slguan 已提交
229 230 231 232 233 234 235 236 237 238 239
    if (pExpr == NULL) {
      continue;
    }

    int32_t functionId = pExpr->functionId;
    if (functionId == TSDB_FUNC_TWA) {
      return true;
    }
  }

  return false;
H
hzcheng 已提交
240 241
}

242 243
void tscClearInterpInfo(SQueryInfo* pQueryInfo) {
  if (!tscIsPointInterpQuery(pQueryInfo)) {
H
hzcheng 已提交
244 245 246
    return;
  }

247
  pQueryInfo->fillType = TSDB_FILL_NONE;
248
  tfree(pQueryInfo->fillVal);
H
hzcheng 已提交
249 250
}

251
int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
H
hzcheng 已提交
252
  if (pRes->tsrow == NULL) {
H
hjxilinx 已提交
253 254
    int32_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput;
    pRes->numOfCols = numOfOutput;
255
  
256 257 258
    pRes->tsrow  = calloc(numOfOutput, POINTER_BYTES);
    pRes->length = calloc(numOfOutput, sizeof(int32_t));  // todo refactor
    pRes->buffer = calloc(numOfOutput, POINTER_BYTES);
259 260
  
    // not enough memory
H
hjxilinx 已提交
261
    if (pRes->tsrow == NULL || (pRes->buffer == NULL && pRes->numOfCols > 0)) {
262 263
      tfree(pRes->tsrow);
      tfree(pRes->buffer);
264
      tfree(pRes->length);
265
    
266
      pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
267 268 269 270 271 272 273 274
      return pRes->code;
    }
  }

  return TSDB_CODE_SUCCESS;
}

void tscDestroyResPointerInfo(SSqlRes* pRes) {
H
hjxilinx 已提交
275
  if (pRes->buffer != NULL) { // free all buffers containing the multibyte string
H
hjxilinx 已提交
276
    for (int i = 0; i < pRes->numOfCols; i++) {
H
hzcheng 已提交
277 278
      tfree(pRes->buffer[i]);
    }
279
    
H
hjxilinx 已提交
280
    pRes->numOfCols = 0;
H
hzcheng 已提交
281
  }
282 283
  
  tfree(pRes->pRsp);
H
hzcheng 已提交
284
  tfree(pRes->tsrow);
285
  tfree(pRes->length);
286 287 288 289 290
  
  tfree(pRes->pGroupRec);
  tfree(pRes->pColumnIndex);
  tfree(pRes->buffer);
  
291 292 293 294 295
  if (pRes->pArithSup != NULL) {
    tfree(pRes->pArithSup->data);
    tfree(pRes->pArithSup);
  }
  
296
  pRes->data = NULL;  // pRes->data points to the buffer of pRsp, no need to free
H
hzcheng 已提交
297 298
}

299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
static void tscFreeQueryInfo(SSqlCmd* pCmd) {
  if (pCmd == NULL || pCmd->numOfClause == 0) {
    return;
  }
  
  for (int32_t i = 0; i < pCmd->numOfClause; ++i) {
    char* addr = (char*)pCmd - offsetof(SSqlObj, cmd);
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i);
    
    freeQueryInfoImpl(pQueryInfo);
    clearAllTableMetaInfo(pQueryInfo, (const char*)addr, false);
    tfree(pQueryInfo);
  }
  
  pCmd->numOfClause = 0;
  tfree(pCmd->pQueryInfo);
}

317 318
void tscResetSqlCmdObj(SSqlCmd* pCmd) {
  pCmd->command   = 0;
319
  pCmd->numOfCols = 0;
320 321 322 323
  pCmd->count     = 0;
  pCmd->curSql    = NULL;
  pCmd->msgType   = 0;
  pCmd->parseFinished = 0;
324
  pCmd->autoCreated = 0;
325 326
  
  taosHashCleanup(pCmd->pTableList);
327
  pCmd->pTableList = NULL;
328
  
S
slguan 已提交
329
  pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
330
  
331
  tscFreeQueryInfo(pCmd);
H
hzcheng 已提交
332 333
}

H
hjxilinx 已提交
334
void tscFreeSqlResult(SSqlObj* pSql) {
335 336
  tscDestroyLocalReducer(pSql);
  
H
hjxilinx 已提交
337
  SSqlRes* pRes = &pSql->res;
338
  tscDestroyResPointerInfo(pRes);
339
  
H
hjxilinx 已提交
340
  memset(&pSql->res, 0, sizeof(SSqlRes));
341 342
}

H
hjxilinx 已提交
343
void tscPartiallyFreeSqlObj(SSqlObj* pSql) {
S
slguan 已提交
344 345 346
  if (pSql == NULL || pSql->signature != pSql) {
    return;
  }
H
hzcheng 已提交
347 348 349 350 351

  SSqlCmd* pCmd = &pSql->cmd;
  STscObj* pObj = pSql->pTscObj;

  int32_t cmd = pCmd->command;
H
hjxilinx 已提交
352
  if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_LOCALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
353
      cmd == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
H
hzcheng 已提交
354 355
    tscRemoveFromSqlList(pSql);
  }
356
  
H
hzcheng 已提交
357
  // pSql->sqlstr will be used by tscBuildQueryStreamDesc
L
lihui 已提交
358 359 360 361 362
  if (pObj->signature == pObj) {
    pthread_mutex_lock(&pObj->mutex);
    tfree(pSql->sqlstr);
    pthread_mutex_unlock(&pObj->mutex);
  }
363
  
weixin_48148422's avatar
weixin_48148422 已提交
364
  tscFreeSqlResult(pSql);
H
[TD-98]  
hjxilinx 已提交
365
  
366
  tfree(pSql->pSubs);
H
[TD-98]  
hjxilinx 已提交
367 368
  pSql->numOfSubs = 0;
  
369
  tscResetSqlCmdObj(pCmd);
H
hzcheng 已提交
370 371 372
}

void tscFreeSqlObj(SSqlObj* pSql) {
H
Haojun Liao 已提交
373 374 375 376
  if (pSql == NULL || pSql->signature != pSql) {
    return;
  }
  
377
  tscDebug("%p start to free sql object", pSql);
H
hjxilinx 已提交
378
  tscPartiallyFreeSqlObj(pSql);
H
hzcheng 已提交
379 380 381

  pSql->signature = NULL;
  pSql->fp = NULL;
382
  
H
hzcheng 已提交
383 384
  SSqlCmd* pCmd = &pSql->cmd;

S
slguan 已提交
385
  memset(pCmd->payload, 0, (size_t)pCmd->allocSize);
H
hzcheng 已提交
386 387
  tfree(pCmd->payload);
  pCmd->allocSize = 0;
388 389
  
  tfree(pSql->sqlstr);
H
Haojun Liao 已提交
390
  sem_destroy(&pSql->rspSem);
H
hzcheng 已提交
391 392 393
  free(pSql);
}

S
slguan 已提交
394 395
void tscDestroyDataBlock(STableDataBlocks* pDataBlock) {
  if (pDataBlock == NULL) {
H
hzcheng 已提交
396 397 398
    return;
  }

S
slguan 已提交
399
  tfree(pDataBlock->pData);
S
slguan 已提交
400
  tfree(pDataBlock->params);
H
hjxilinx 已提交
401

H
hjxilinx 已提交
402
  // free the refcount for metermeta
H
hjxilinx 已提交
403
  taosCacheRelease(tscCacheHandle, (void**)&(pDataBlock->pTableMeta), false);
S
slguan 已提交
404
  tfree(pDataBlock);
H
hzcheng 已提交
405 406
}

407 408
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes,
                                   uint32_t offset) {
S
slguan 已提交
409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430
  uint32_t needed = pDataBlock->numOfParams + 1;
  if (needed > pDataBlock->numOfAllocedParams) {
    needed *= 2;
    void* tmp = realloc(pDataBlock->params, needed * sizeof(SParamInfo));
    if (tmp == NULL) {
      return NULL;
    }
    pDataBlock->params = (SParamInfo*)tmp;
    pDataBlock->numOfAllocedParams = needed;
  }

  SParamInfo* param = pDataBlock->params + pDataBlock->numOfParams;
  param->idx = -1;
  param->type = type;
  param->timePrec = timePrec;
  param->bytes = bytes;
  param->offset = offset;

  ++pDataBlock->numOfParams;
  return param;
}

431 432
void*  tscDestroyBlockArrayList(SArray* pDataBlockList) {
  if (pDataBlockList == NULL) {
S
slguan 已提交
433 434 435
    return NULL;
  }

436 437 438 439
  size_t size = taosArrayGetSize(pDataBlockList);
  for (int32_t i = 0; i < size; i++) {
    void* d = taosArrayGetP(pDataBlockList, i);
    tscDestroyDataBlock(d);
S
slguan 已提交
440 441
  }

442
  taosArrayDestroy(pDataBlockList);
S
slguan 已提交
443
  return NULL;
H
hzcheng 已提交
444 445
}

S
slguan 已提交
446
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
H
hjxilinx 已提交
447
  SSqlCmd* pCmd = &pSql->cmd;
H
hjxilinx 已提交
448
  assert(pDataBlock->pTableMeta != NULL);
H
hjxilinx 已提交
449

S
slguan 已提交
450
  pCmd->numOfTablesInSubmit = pDataBlock->numOfTables;
451

452
  assert(pCmd->numOfClause == 1);
453
  STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
454

455
  // set the correct table meta object, the table meta has been locked in pDataBlocks, so it must be in the cache
H
hjxilinx 已提交
456
  if (pTableMetaInfo->pTableMeta != pDataBlock->pTableMeta) {
457
    tstrncpy(pTableMetaInfo->name, pDataBlock->tableId, sizeof(pTableMetaInfo->name));
H
hjxilinx 已提交
458
    taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), false);
459

H
hjxilinx 已提交
460
    pTableMetaInfo->pTableMeta = taosCacheTransfer(tscCacheHandle, (void**)&pDataBlock->pTableMeta);
H
hjxilinx 已提交
461
  } else {
H
hjxilinx 已提交
462
    assert(strncmp(pTableMetaInfo->name, pDataBlock->tableId, tListLen(pDataBlock->tableId)) == 0);
H
hjxilinx 已提交
463
  }
H
hjxilinx 已提交
464

465 466
  /*
   * the submit message consists of : [RPC header|message body|digest]
467
   * the dataBlock only includes the RPC Header buffer and actual submit message body, space for digest needs
468 469
   * additional space.
   */
470
  int ret = tscAllocPayload(pCmd, pDataBlock->size + 100);
H
hjxilinx 已提交
471 472 473
  if (TSDB_CODE_SUCCESS != ret) {
    return ret;
  }
H
hjxilinx 已提交
474

475 476
  assert(pDataBlock->size <= pDataBlock->nAllocSize);
  memcpy(pCmd->payload, pDataBlock->pData, pDataBlock->size);
H
hjxilinx 已提交
477

478 479 480 481
  /*
   * the payloadLen should be actual message body size
   * the old value of payloadLen is the allocated payload size
   */
482
  pCmd->payloadLen = pDataBlock->size;
H
hjxilinx 已提交
483

484
  assert(pCmd->allocSize >= pCmd->payloadLen + 100 && pCmd->payloadLen > 0);
H
hjxilinx 已提交
485
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
486 487
}

488 489 490 491 492 493 494 495
//void tscFreeUnusedDataBlocks(SDataBlockList* pList) {
//  /* release additional memory consumption */
//  for (int32_t i = 0; i < pList->nSize; ++i) {
//    STableDataBlocks* pDataBlock = pList->pData[i];
//    pDataBlock->pData = realloc(pDataBlock->pData, pDataBlock->size);
//    pDataBlock->nAllocSize = (uint32_t)pDataBlock->size;
//  }
//}
H
hzcheng 已提交
496

H
hjxilinx 已提交
497 498 499 500 501 502
/**
 * create the in-memory buffer for each table to keep the submitted data block
 * @param initialSize
 * @param rowSize
 * @param startOffset
 * @param name
H
hjxilinx 已提交
503
 * @param dataBlocks
H
hjxilinx 已提交
504 505
 * @return
 */
H
hjxilinx 已提交
506
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name,
H
hjxilinx 已提交
507
                           STableMeta* pTableMeta, STableDataBlocks** dataBlocks) {
H
hjxilinx 已提交
508
  STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks));
H
hjxilinx 已提交
509 510
  if (dataBuf == NULL) {
    tscError("failed to allocated memory, reason:%s", strerror(errno));
511
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
512 513 514
  }

  dataBuf->nAllocSize = (uint32_t)initialSize;
L
[#1102]  
lihui 已提交
515
  dataBuf->headerSize = startOffset; // the header size will always be the startOffset value, reserved for the subumit block header
H
hjxilinx 已提交
516 517 518 519
  if (dataBuf->nAllocSize <= dataBuf->headerSize) {
    dataBuf->nAllocSize = dataBuf->headerSize*2;
  }
  
H
hjxilinx 已提交
520 521 522
  dataBuf->pData = calloc(1, dataBuf->nAllocSize);
  dataBuf->ordered = true;
  dataBuf->prevTS = INT64_MIN;
S
slguan 已提交
523 524 525

  dataBuf->rowSize = rowSize;
  dataBuf->size = startOffset;
S
slguan 已提交
526 527
  dataBuf->tsSource = -1;

B
Bomin Zhang 已提交
528
  tstrncpy(dataBuf->tableId, name, sizeof(dataBuf->tableId));
H
hjxilinx 已提交
529 530

  /*
531
   * The table meta may be released since the table meta cache are completed clean by other thread
532 533
   * due to operation such as drop database. So here we add the reference count directly instead of invoke
   * taosGetDataFromCache, which may return NULL value.
H
hjxilinx 已提交
534
   */
H
hjxilinx 已提交
535 536
  dataBuf->pTableMeta = taosCacheAcquireByData(tscCacheHandle, pTableMeta);
  assert(initialSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);
537

538 539
  *dataBlocks = dataBuf;
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
540 541
}

542
int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t id, int32_t size,
H
hjxilinx 已提交
543
                                int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta,
H
hjxilinx 已提交
544 545
                                STableDataBlocks** dataBlocks) {
  *dataBlocks = NULL;
S
slguan 已提交
546

H
hjxilinx 已提交
547
  STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id));
S
slguan 已提交
548
  if (t1 != NULL) {
H
hjxilinx 已提交
549
    *dataBlocks = *t1;
S
slguan 已提交
550 551
  }

H
hjxilinx 已提交
552
  if (*dataBlocks == NULL) {
H
hjxilinx 已提交
553
    int32_t ret = tscCreateDataBlock((size_t)size, rowSize, startOffset, tableId, pTableMeta, dataBlocks);
H
hjxilinx 已提交
554 555 556 557
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }

H
hjxilinx 已提交
558
    taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES);
559
    taosArrayPush(pDataBlockList, dataBlocks);
S
slguan 已提交
560 561
  }

H
hjxilinx 已提交
562
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
563 564
}

565
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bool includeSchema) {
566
  // TODO: optimize this function, handle the case while binary is not presented
H
TD-166  
hzcheng 已提交
567
  STableMeta*   pTableMeta = pTableDataBlock->pTableMeta;
568
  STableComInfo tinfo = tscGetTableInfo(pTableMeta);
H
TD-166  
hzcheng 已提交
569 570 571
  SSchema*      pSchema = tscGetTableSchema(pTableMeta);

  SSubmitBlk* pBlock = pDataBlock;
572 573
  memcpy(pDataBlock, pTableDataBlock->pData, sizeof(SSubmitBlk));
  pDataBlock += sizeof(SSubmitBlk);
H
TD-166  
hzcheng 已提交
574

H
hjxilinx 已提交
575
  int32_t flen = 0;  // original total length of row
576 577 578 579 580 581

  // schema needs to be included into the submit data block
  if (includeSchema) {
    int32_t numOfCols = tscGetNumOfColumns(pTableDataBlock->pTableMeta);
    for(int32_t j = 0; j < numOfCols; ++j) {
      STColumn* pCol = (STColumn*) pDataBlock;
H
Hongze Cheng 已提交
582
      pCol->colId = htons(pSchema[j].colId);
583
      pCol->type  = pSchema[j].type;
H
Hongze Cheng 已提交
584
      pCol->bytes = htons(pSchema[j].bytes);
585 586 587 588 589 590 591 592 593 594 595 596 597 598
      pCol->offset = 0;

      pDataBlock += sizeof(STColumn);
      flen += TYPE_BYTES[pSchema[j].type];
    }

    int32_t schemaSize = sizeof(STColumn) * numOfCols;
    pBlock->schemaLen = schemaSize;
  } else {
    for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
      flen += TYPE_BYTES[pSchema[j].type];
    }

    pBlock->schemaLen = 0;
599
  }
H
TD-166  
hzcheng 已提交
600

601
  char* p = pTableDataBlock->pData + sizeof(SSubmitBlk);
602
  pBlock->dataLen = 0;
H
hjxilinx 已提交
603 604 605
  int32_t numOfRows = htons(pBlock->numOfRows);
  
  for (int32_t i = 0; i < numOfRows; ++i) {
606
    SDataRow trow = (SDataRow) pDataBlock;
H
TD-166  
hzcheng 已提交
607
    dataRowSetLen(trow, TD_DATA_ROW_HEAD_SIZE + flen);
H
TD-90  
Hongze Cheng 已提交
608
    dataRowSetVersion(trow, pTableMeta->sversion);
H
TD-166  
hzcheng 已提交
609 610 611

    int toffset = 0;
    for (int32_t j = 0; j < tinfo.numOfColumns; j++) {
H
TD-166  
hzcheng 已提交
612
      tdAppendColVal(trow, p, pSchema[j].type, pSchema[j].bytes, toffset);
H
TD-166  
hzcheng 已提交
613 614 615 616 617
      toffset += TYPE_BYTES[pSchema[j].type];
      p += pSchema[j].bytes;
    }

    pDataBlock += dataRowLen(trow);
618
    pBlock->dataLen += dataRowLen(trow);
619
  }
H
TD-166  
hzcheng 已提交
620

621 622 623 624
  int32_t len = pBlock->dataLen + pBlock->schemaLen;
  pBlock->dataLen = htonl(pBlock->dataLen);
  pBlock->schemaLen = htonl(pBlock->schemaLen);

H
TD-100  
hzcheng 已提交
625
  return len;
626 627
}

628 629 630 631 632 633 634 635 636 637 638 639
static int32_t getRowExpandSize(STableMeta* pTableMeta) {
  int32_t result = TD_DATA_ROW_HEAD_SIZE;
  int32_t columns = tscGetNumOfColumns(pTableMeta);
  SSchema* pSchema = tscGetTableSchema(pTableMeta);
  for(int32_t i = 0; i < columns; i++) {
    if (IS_VAR_DATA_TYPE((pSchema + i)->type)) {
      result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
    }
  }
  return result;
}

640
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
S
slguan 已提交
641 642
  SSqlCmd* pCmd = &pSql->cmd;

643
  // the maximum expanded size in byte when a row-wise data is converted to SDataRow format
644 645
  STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, 0);
  int32_t expandSize = getRowExpandSize(pOneTableBlock->pTableMeta);
H
Haojun Liao 已提交
646

H
hjxilinx 已提交
647
  void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
648
  SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
S
slguan 已提交
649

650 651
  size_t total = taosArrayGetSize(pTableDataBlockList);
  for (int32_t i = 0; i < total; ++i) {
652
    pOneTableBlock = taosArrayGetP(pTableDataBlockList, i);
H
hjxilinx 已提交
653
    STableDataBlocks* dataBuf = NULL;
H
hjxilinx 已提交
654 655 656
    
    int32_t ret =
        tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
H
hjxilinx 已提交
657
                                tsInsertHeadSize, 0, pOneTableBlock->tableId, pOneTableBlock->pTableMeta, &dataBuf);
H
hjxilinx 已提交
658
    if (ret != TSDB_CODE_SUCCESS) {
659
      tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret);
H
hjxilinx 已提交
660
      taosHashCleanup(pVnodeDataBlockHashList);
661
      tscDestroyBlockArrayList(pVnodeDataBlockList);
H
hjxilinx 已提交
662 663
      return ret;
    }
S
slguan 已提交
664

H
Haojun Liao 已提交
665
    SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
H
Hongze Cheng 已提交
666
    int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
H
Haojun Liao 已提交
667

S
slguan 已提交
668 669 670 671 672 673 674 675 676
    if (dataBuf->nAllocSize < destSize) {
      while (dataBuf->nAllocSize < destSize) {
        dataBuf->nAllocSize = dataBuf->nAllocSize * 1.5;
      }

      char* tmp = realloc(dataBuf->pData, dataBuf->nAllocSize);
      if (tmp != NULL) {
        dataBuf->pData = tmp;
        memset(dataBuf->pData + dataBuf->size, 0, dataBuf->nAllocSize - dataBuf->size);
677
      } else {  // failed to allocate memory, free already allocated memory and return error code
S
slguan 已提交
678 679
        tscError("%p failed to allocate memory for merging submit block, size:%d", pSql, dataBuf->nAllocSize);

H
hjxilinx 已提交
680
        taosHashCleanup(pVnodeDataBlockHashList);
S
slguan 已提交
681
        tscDestroyBlockArrayList(pVnodeDataBlockList);
682
        tfree(dataBuf->pData);
S
slguan 已提交
683

684
        return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
685 686 687
      }
    }

H
hjxilinx 已提交
688
    tscSortRemoveDataBlockDupRows(pOneTableBlock);
689
    char* ekey = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1);
L
lihui 已提交
690
    
691
    tscDebug("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId,
692
        pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey));
S
slguan 已提交
693

H
Hongze Cheng 已提交
694
    int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + expandSize) + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
H
Haojun Liao 已提交
695

696
    pBlocks->tid = htonl(pBlocks->tid);
S
slguan 已提交
697 698 699
    pBlocks->uid = htobe64(pBlocks->uid);
    pBlocks->sversion = htonl(pBlocks->sversion);
    pBlocks->numOfRows = htons(pBlocks->numOfRows);
700
    pBlocks->schemaLen = 0;
H
Haojun Liao 已提交
701

702
    // erase the empty space reserved for binary data
703
    int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, pCmd->submitSchema);
H
Haojun Liao 已提交
704 705 706 707 708 709
    assert(finalLen <= len);

    dataBuf->size += (finalLen + sizeof(SSubmitBlk));
    assert(dataBuf->size <= dataBuf->nAllocSize);

    // the length does not include the SSubmitBlk structure
710
    pBlocks->dataLen = htonl(finalLen);
H
Haojun Liao 已提交
711

S
slguan 已提交
712
    dataBuf->numOfTables += 1;
S
slguan 已提交
713 714 715 716 717 718 719
  }

  tscDestroyBlockArrayList(pTableDataBlockList);

  // free the table data blocks;
  pCmd->pDataBlocks = pVnodeDataBlockList;

720
//  tscFreeUnusedDataBlocks(pCmd->pDataBlocks);
H
hjxilinx 已提交
721
  taosHashCleanup(pVnodeDataBlockHashList);
S
slguan 已提交
722 723

  return TSDB_CODE_SUCCESS;
S
slguan 已提交
724 725
}

H
hzcheng 已提交
726
void tscCloseTscObj(STscObj* pObj) {
H
hjxilinx 已提交
727 728
  assert(pObj != NULL);
  
H
hzcheng 已提交
729 730 731
  pObj->signature = NULL;
  taosTmrStopA(&(pObj->pTimer));
  pthread_mutex_destroy(&pObj->mutex);
732
  
733 734 735 736
  if (pObj->pDnodeConn != NULL) {
    rpcClose(pObj->pDnodeConn);
  }
  
737
  tscDebug("%p DB connection is closed, dnodeConn:%p", pObj, pObj->pDnodeConn);
H
hzcheng 已提交
738 739 740
  tfree(pObj);
}

H
hjxilinx 已提交
741
bool tscIsInsertData(char* sqlstr) {
742 743 744 745 746 747 748 749
  int32_t index = 0;

  do {
    SSQLToken t0 = tStrGetToken(sqlstr, &index, false, 0, NULL);
    if (t0.type != TK_LP) {
      return t0.type == TK_INSERT || t0.type == TK_IMPORT;
    }
  } while (1);
H
hzcheng 已提交
750 751
}

S
slguan 已提交
752
int tscAllocPayload(SSqlCmd* pCmd, int size) {
H
hzcheng 已提交
753 754 755 756 757
  assert(size > 0);

  if (pCmd->payload == NULL) {
    assert(pCmd->allocSize == 0);

758
    pCmd->payload = (char*)calloc(1, size);
759
    if (pCmd->payload == NULL) return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
760 761 762
    pCmd->allocSize = size;
  } else {
    if (pCmd->allocSize < size) {
763
      char* b = realloc(pCmd->payload, size);
764
      if (b == NULL) return TSDB_CODE_TSC_OUT_OF_MEMORY;
S
slguan 已提交
765
      pCmd->payload = b;
H
hzcheng 已提交
766 767
      pCmd->allocSize = size;
    }
768
    
H
hjxilinx 已提交
769
    memset(pCmd->payload, 0, pCmd->allocSize);
H
hzcheng 已提交
770 771 772 773 774 775
  }

  assert(pCmd->allocSize >= size);
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
776 777
TAOS_FIELD tscCreateField(int8_t type, const char* name, int16_t bytes) {
  TAOS_FIELD f = { .type = type, .bytes = bytes, };
B
Bomin Zhang 已提交
778
  tstrncpy(f.name, name, sizeof(f.name));
H
hjxilinx 已提交
779
  return f;
H
hzcheng 已提交
780 781
}

H
hjxilinx 已提交
782 783 784 785
SFieldSupInfo* tscFieldInfoAppend(SFieldInfo* pFieldInfo, TAOS_FIELD* pField) {
  assert(pFieldInfo != NULL);
  taosArrayPush(pFieldInfo->pFields, pField);
  pFieldInfo->numOfOutput++;
H
hjxilinx 已提交
786
  
H
hjxilinx 已提交
787 788 789 790 791 792 793
  struct SFieldSupInfo info = {
    .pSqlExpr = NULL,
    .pArithExprInfo = NULL,
    .visible = true,
  };
  
  return taosArrayPush(pFieldInfo->pSupportInfo, &info);
H
hzcheng 已提交
794 795
}

H
hjxilinx 已提交
796
SFieldSupInfo* tscFieldInfoGetSupp(SFieldInfo* pFieldInfo, int32_t index) {
H
Haojun Liao 已提交
797
  return TARRAY_GET_ELEM(pFieldInfo->pSupportInfo, index);
H
hjxilinx 已提交
798 799
}

H
hjxilinx 已提交
800 801 802 803 804 805 806 807 808 809 810
SFieldSupInfo* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIELD* field) {
  taosArrayInsert(pFieldInfo->pFields, index, field);
  pFieldInfo->numOfOutput++;
  
  struct SFieldSupInfo info = {
      .pSqlExpr = NULL,
      .pArithExprInfo = NULL,
      .visible = true,
  };
  
  return taosArrayInsert(pFieldInfo->pSupportInfo, index, &info);
H
hjxilinx 已提交
811
}
H
hzcheng 已提交
812

H
hjxilinx 已提交
813
void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo) {
H
hjxilinx 已提交
814 815
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
  
H
hjxilinx 已提交
816
  SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
H
hjxilinx 已提交
817
  pExpr->offset = 0;
H
hjxilinx 已提交
818
  
H
hjxilinx 已提交
819
  for (int32_t i = 1; i < numOfExprs; ++i) {
H
hjxilinx 已提交
820 821
    SSqlExpr* prev = taosArrayGetP(pQueryInfo->exprList, i - 1);
    SSqlExpr* p = taosArrayGetP(pQueryInfo->exprList, i);
H
hjxilinx 已提交
822 823
  
    p->offset = prev->offset + prev->resBytes;
H
hzcheng 已提交
824 825 826
  }
}

827
void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo) {
H
hjxilinx 已提交
828
  if (tscSqlExprNumOfExprs(pQueryInfo) == 0) {
H
hzcheng 已提交
829 830
    return;
  }
H
hjxilinx 已提交
831
  
H
hjxilinx 已提交
832
  SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
H
hjxilinx 已提交
833
  pExpr->offset = 0;
H
hjxilinx 已提交
834
  
H
hjxilinx 已提交
835 836
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
  for (int32_t i = 1; i < numOfExprs; ++i) {
H
hjxilinx 已提交
837 838
    SSqlExpr* prev = taosArrayGetP(pQueryInfo->exprList, i - 1);
    SSqlExpr* p = taosArrayGetP(pQueryInfo->exprList, i);
H
hjxilinx 已提交
839 840
    
    p->offset = prev->offset + prev->resBytes;
H
hzcheng 已提交
841 842 843
  }
}

H
hjxilinx 已提交
844
void tscFieldInfoCopy(SFieldInfo* dst, const SFieldInfo* src) {
H
hjxilinx 已提交
845
  dst->numOfOutput = src->numOfOutput;
H
hzcheng 已提交
846

H
hjxilinx 已提交
847 848 849 850 851 852 853 854 855 856 857
  if (dst->pFields == NULL) {
    dst->pFields = taosArrayClone(src->pFields);
  } else {
    taosArrayCopy(dst->pFields, src->pFields);
  }
  
  if (dst->pSupportInfo == NULL) {
    dst->pSupportInfo = taosArrayClone(src->pSupportInfo);
  } else {
    taosArrayCopy(dst->pSupportInfo, src->pSupportInfo);
  }
H
hzcheng 已提交
858 859
}

H
hjxilinx 已提交
860
TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index) {
H
Haojun Liao 已提交
861
  return TARRAY_GET_ELEM(pFieldInfo->pFields, index);
862 863
}

864
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index) {
H
hjxilinx 已提交
865 866
  SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, index);
  assert(pInfo != NULL);
H
hzcheng 已提交
867

H
hjxilinx 已提交
868
  return pInfo->pSqlExpr->offset;
H
hzcheng 已提交
869 870
}

H
hjxilinx 已提交
871
int32_t tscFieldInfoCompare(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2) {
872
  assert(pFieldInfo1 != NULL && pFieldInfo2 != NULL);
873

H
hjxilinx 已提交
874 875
  if (pFieldInfo1->numOfOutput != pFieldInfo2->numOfOutput) {
    return pFieldInfo1->numOfOutput - pFieldInfo2->numOfOutput;
876 877
  }

H
hjxilinx 已提交
878 879 880
  for (int32_t i = 0; i < pFieldInfo1->numOfOutput; ++i) {
    TAOS_FIELD* pField1 = tscFieldInfoGetField((SFieldInfo*) pFieldInfo1, i);
    TAOS_FIELD* pField2 = tscFieldInfoGetField((SFieldInfo*) pFieldInfo2, i);
881

H
hjxilinx 已提交
882 883
    if (pField1->type != pField2->type ||
        pField1->bytes != pField2->bytes ||
884 885 886 887 888 889 890 891
        strcasecmp(pField1->name, pField2->name) != 0) {
      return 1;
    }
  }

  return 0;
}

H
hjxilinx 已提交
892 893 894
int32_t tscGetResRowLength(SArray* pExprList) {
  size_t num = taosArrayGetSize(pExprList);
  if (num == 0) {
H
hzcheng 已提交
895 896
    return 0;
  }
H
hjxilinx 已提交
897 898
  
  int32_t size = 0;
H
hjxilinx 已提交
899 900 901
  for(int32_t i = 0; i < num; ++i) {
    SSqlExpr* pExpr = taosArrayGetP(pExprList, i);
    size += pExpr->resBytes;
H
hjxilinx 已提交
902 903 904
  }
  
  return size;
H
hzcheng 已提交
905 906
}

H
hjxilinx 已提交
907
void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
S
slguan 已提交
908
  if (pFieldInfo == NULL) {
H
hzcheng 已提交
909 910 911
    return;
  }

H
hjxilinx 已提交
912
  taosArrayDestroy(pFieldInfo->pFields);
H
hjxilinx 已提交
913
  
H
hjxilinx 已提交
914 915 916 917
  for(int32_t i = 0; i < pFieldInfo->numOfOutput; ++i) {
    SFieldSupInfo* pInfo = taosArrayGet(pFieldInfo->pSupportInfo, i);
    
    if (pInfo->pArithExprInfo != NULL) {
918
      tExprTreeDestroy(&pInfo->pArithExprInfo->pExpr, NULL);
919
      tfree(pInfo->pArithExprInfo);
H
hjxilinx 已提交
920 921 922
    }
  }
  
H
hjxilinx 已提交
923
  taosArrayDestroy(pFieldInfo->pSupportInfo);
S
slguan 已提交
924
  memset(pFieldInfo, 0, sizeof(SFieldInfo));
H
hzcheng 已提交
925 926
}

H
hjxilinx 已提交
927
static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
928
    int16_t size, int16_t interSize, bool isTagCol) {
H
hjxilinx 已提交
929
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pColIndex->tableIndex);
H
hjxilinx 已提交
930
  
H
hjxilinx 已提交
931
  SSqlExpr* pExpr = calloc(1, sizeof(SSqlExpr));
S
slguan 已提交
932
  pExpr->functionId = functionId;
933
  
934
  // set the correct columnIndex index
S
slguan 已提交
935 936 937
  if (pColIndex->columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
    pExpr->colInfo.colId = TSDB_TBNAME_COLUMN_INDEX;
  } else {
938 939 940
    if (isTagCol) {
      SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
      pExpr->colInfo.colId = pSchema[pColIndex->columnIndex].colId;
B
Bomin Zhang 已提交
941
      tstrncpy(pExpr->colInfo.name, pSchema[pColIndex->columnIndex].name, sizeof(pExpr->colInfo.name));
H
Haojun Liao 已提交
942 943
    } else if (pTableMetaInfo->pTableMeta != NULL) {
      // in handling select database/version/server_status(), the pTableMeta is NULL
944 945
      SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pColIndex->columnIndex);
      pExpr->colInfo.colId = pSchema->colId;
B
Bomin Zhang 已提交
946
      tstrncpy(pExpr->colInfo.name, pSchema->name, sizeof(pExpr->colInfo.name));
S
slguan 已提交
947
    }
H
hzcheng 已提交
948
  }
H
hjxilinx 已提交
949
  
950 951
  pExpr->colInfo.flag = isTagCol? TSDB_COL_TAG:TSDB_COL_NORMAL;
  
952
  pExpr->colInfo.colIndex = pColIndex->columnIndex;
H
hjxilinx 已提交
953 954
  pExpr->resType       = type;
  pExpr->resBytes      = size;
H
Haojun Liao 已提交
955 956 957 958 959 960
  pExpr->interBytes    = interSize;
  
  if (pTableMetaInfo->pTableMeta) {
    pExpr->uid = pTableMetaInfo->pTableMeta->uid;
  }
  
H
hjxilinx 已提交
961 962 963 964
  return pExpr;
}

SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
965
                           int16_t size, int16_t interSize, bool isTagCol) {
H
hjxilinx 已提交
966
  int32_t num = taosArrayGetSize(pQueryInfo->exprList);
H
hjxilinx 已提交
967
  if (index == num) {
968
    return tscSqlExprAppend(pQueryInfo, functionId, pColIndex, type, size, interSize, isTagCol);
H
hjxilinx 已提交
969 970
  }
  
971
  SSqlExpr* pExpr = doBuildSqlExpr(pQueryInfo, functionId, pColIndex, type, size, interSize, isTagCol);
H
hjxilinx 已提交
972
  taosArrayInsert(pQueryInfo->exprList, index, &pExpr);
H
hjxilinx 已提交
973 974 975 976
  return pExpr;
}

SSqlExpr* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
977 978
    int16_t size, int16_t interSize, bool isTagCol) {
  SSqlExpr* pExpr = doBuildSqlExpr(pQueryInfo, functionId, pColIndex, type, size, interSize, isTagCol);
H
hjxilinx 已提交
979
  taosArrayPush(pQueryInfo->exprList, &pExpr);
H
hzcheng 已提交
980 981 982
  return pExpr;
}

983 984
SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex,
                           int16_t type, int16_t size) {
H
hjxilinx 已提交
985
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
986 987
  SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, index);
  if (pExpr == NULL) {
H
hzcheng 已提交
988 989 990
    return NULL;
  }

S
slguan 已提交
991
  pExpr->functionId = functionId;
H
hzcheng 已提交
992

993
  pExpr->colInfo.colIndex = srcColumnIndex;
H
hjxilinx 已提交
994
  pExpr->colInfo.colId = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, srcColumnIndex)->colId;
H
hzcheng 已提交
995 996 997 998 999 1000 1001

  pExpr->resType = type;
  pExpr->resBytes = size;

  return pExpr;
}

1002
size_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo) {
H
hjxilinx 已提交
1003
  return taosArrayGetSize(pQueryInfo->exprList);
H
hjxilinx 已提交
1004 1005
}

S
slguan 已提交
1006
void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, int16_t tableIndex) {
H
hzcheng 已提交
1007 1008 1009 1010 1011 1012
  if (pExpr == NULL || argument == NULL || bytes == 0) {
    return;
  }

  // set parameter value
  // transfer to tVariant from byte data/no ascii data
S
slguan 已提交
1013
  tVariantCreateFromBinary(&pExpr->param[pExpr->numOfParams], argument, bytes, type);
H
hzcheng 已提交
1014 1015 1016 1017 1018

  pExpr->numOfParams += 1;
  assert(pExpr->numOfParams <= 3);
}

1019
SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index) {
H
hjxilinx 已提交
1020
  return taosArrayGetP(pQueryInfo->exprList, index);
H
hzcheng 已提交
1021 1022
}

H
hjxilinx 已提交
1023
void* sqlExprDestroy(SSqlExpr* pExpr) {
H
hjxilinx 已提交
1024 1025 1026 1027 1028 1029 1030 1031
  if (pExpr == NULL) {
    return NULL;
  }
  
  for(int32_t i = 0; i < tListLen(pExpr->param); ++i) {
    tVariantDestroy(&pExpr->param[i]);
  }
  
H
hjxilinx 已提交
1032 1033
  tfree(pExpr);
  
H
hjxilinx 已提交
1034
  return NULL;
H
hzcheng 已提交
1035 1036
}

H
hjxilinx 已提交
1037 1038 1039
/*
 * NOTE: Does not release SSqlExprInfo here.
 */
H
hjxilinx 已提交
1040 1041
void tscSqlExprInfoDestroy(SArray* pExprInfo) {
  size_t size = taosArrayGetSize(pExprInfo);
H
hjxilinx 已提交
1042
  
H
hjxilinx 已提交
1043 1044 1045
  for(int32_t i = 0; i < size; ++i) {
    SSqlExpr* pExpr = taosArrayGetP(pExprInfo, i);
    sqlExprDestroy(pExpr);
H
hjxilinx 已提交
1046 1047
  }
  
H
hjxilinx 已提交
1048
  taosArrayDestroy(pExprInfo);
H
hjxilinx 已提交
1049 1050
}

H
hjxilinx 已提交
1051 1052
void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy) {
  assert(src != NULL && dst != NULL);
H
hjxilinx 已提交
1053
  
H
hjxilinx 已提交
1054 1055 1056 1057 1058
  size_t size = taosArrayGetSize(src);
  for (int32_t i = 0; i < size; ++i) {
    SSqlExpr* pExpr = taosArrayGetP(src, i);
    
    if (pExpr->uid == uid) {
H
hjxilinx 已提交
1059 1060
      
      if (deepcopy) {
H
hjxilinx 已提交
1061 1062 1063 1064 1065 1066 1067 1068
        SSqlExpr* p1 = calloc(1, sizeof(SSqlExpr));
        *p1 = *pExpr;
  
        for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
          tVariantAssign(&p1->param[j], &pExpr->param[j]);
        }
        
        taosArrayPush(dst, &p1);
H
hjxilinx 已提交
1069
      } else {
H
hjxilinx 已提交
1070
        taosArrayPush(dst, &pExpr);
H
hjxilinx 已提交
1071
      }
S
slguan 已提交
1072 1073
    }
  }
H
hzcheng 已提交
1074 1075
}

1076
SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) {
1077
  // ignore the tbname columnIndex to be inserted into source list
S
slguan 已提交
1078
  if (pColIndex->columnIndex < 0) {
H
hzcheng 已提交
1079 1080
    return NULL;
  }
1081 1082
  
  size_t numOfCols = taosArrayGetSize(pColumnList);
S
slguan 已提交
1083 1084
  int16_t col = pColIndex->columnIndex;

H
hzcheng 已提交
1085
  int32_t i = 0;
1086 1087 1088
  while (i < numOfCols) {
    SColumn* pCol = taosArrayGetP(pColumnList, i);
    if (pCol->colIndex.columnIndex < col) {
S
slguan 已提交
1089
      i++;
1090
    } else if (pCol->colIndex.tableIndex < pColIndex->tableIndex) {
S
slguan 已提交
1091 1092 1093 1094
      i++;
    } else {
      break;
    }
H
hzcheng 已提交
1095 1096
  }

1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110
  if (i >= numOfCols || numOfCols == 0) {
    SColumn* b = calloc(1, sizeof(SColumn));
    b->colIndex = *pColIndex;
    
    taosArrayInsert(pColumnList, i, &b);
  } else {
    SColumn* pCol = taosArrayGetP(pColumnList, i);
  
    if (i < numOfCols && (pCol->colIndex.columnIndex > col || pCol->colIndex.tableIndex != pColIndex->tableIndex)) {
      SColumn* b = calloc(1, sizeof(SColumn));
      b->colIndex = *pColIndex;
      
      taosArrayInsert(pColumnList, i, &b);
    }
H
hzcheng 已提交
1111 1112
  }

1113
  return taosArrayGetP(pColumnList, i);
H
hzcheng 已提交
1114 1115
}

1116 1117 1118 1119
static void destroyFilterInfo(SColumnFilterInfo* pFilterInfo, int32_t numOfFilters) {
  for(int32_t i = 0; i < numOfFilters; ++i) {
    if (pFilterInfo[i].filterstr) {
      tfree(pFilterInfo[i].pz);
S
slguan 已提交
1120 1121
    }
  }
1122 1123
  
  tfree(pFilterInfo);
S
slguan 已提交
1124 1125
}

1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142
SColumn* tscColumnClone(const SColumn* src) {
  assert(src != NULL);
  
  SColumn* dst = calloc(1, sizeof(SColumn));
  
  dst->colIndex     = src->colIndex;
  dst->numOfFilters = src->numOfFilters;
  dst->filterInfo   = tscFilterInfoClone(src->filterInfo, src->numOfFilters);
  
  return dst;
}

static void tscColumnDestroy(SColumn* pCol) {
  destroyFilterInfo(pCol->filterInfo, pCol->numOfFilters);
  free(pCol);
}

H
hjxilinx 已提交
1143
void tscColumnListCopy(SArray* dst, const SArray* src, int16_t tableIndex) {
H
hjxilinx 已提交
1144
  assert(src != NULL && dst != NULL);
1145 1146 1147 1148
  
  size_t num = taosArrayGetSize(src);
  for (int32_t i = 0; i < num; ++i) {
    SColumn* pCol = taosArrayGetP(src, i);
H
hzcheng 已提交
1149

1150 1151 1152
    if (pCol->colIndex.tableIndex == tableIndex || tableIndex < 0) {
      SColumn* p = tscColumnClone(pCol);
      taosArrayPush(dst, &p);
S
slguan 已提交
1153 1154
    }
  }
H
hzcheng 已提交
1155 1156
}

1157 1158
void tscColumnListDestroy(SArray* pColumnList) {
  if (pColumnList == NULL) {
S
slguan 已提交
1159 1160 1161
    return;
  }

1162
  size_t num = taosArrayGetSize(pColumnList);
1163
  for (int32_t i = 0; i < num; ++i) {
1164
    SColumn* pCol = taosArrayGetP(pColumnList, i);
1165
    tscColumnDestroy(pCol);
S
slguan 已提交
1166 1167
  }

1168
  taosArrayDestroy(pColumnList);
1169
}
H
hzcheng 已提交
1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185

/*
 * 1. normal name, not a keyword or number
 * 2. name with quote
 * 3. string with only one delimiter '.'.
 *
 * only_one_part
 * 'only_one_part'
 * first_part.second_part
 * first_part.'second_part'
 * 'first_part'.second_part
 * 'first_part'.'second_part'
 * 'first_part.second_part'
 *
 */
static int32_t validateQuoteToken(SSQLToken* pToken) {
H
Haojun Liao 已提交
1186 1187
  strdequote(pToken->z);
  pToken->n = strtrim(pToken->z);
H
hzcheng 已提交
1188 1189

  int32_t k = tSQLGetToken(pToken->z, &pToken->type);
1190

H
huili 已提交
1191 1192
  if (pToken->type == TK_STRING) {
    return tscValidateName(pToken);
S
slguan 已提交
1193
  }
H
hzcheng 已提交
1194

H
huili 已提交
1195
  if (k != pToken->n || pToken->type != TK_ID) {
1196
    return TSDB_CODE_TSC_INVALID_SQL;
H
huili 已提交
1197
  }
H
hzcheng 已提交
1198 1199 1200 1201 1202
  return TSDB_CODE_SUCCESS;
}

int32_t tscValidateName(SSQLToken* pToken) {
  if (pToken->type != TK_STRING && pToken->type != TK_ID) {
1203
    return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
1204 1205
  }

S
slguan 已提交
1206
  char* sep = strnchr(pToken->z, TS_PATH_DELIMITER[0], pToken->n, true);
H
hzcheng 已提交
1207 1208
  if (sep == NULL) {  // single part
    if (pToken->type == TK_STRING) {
H
Haojun Liao 已提交
1209 1210
      strdequote(pToken->z);
      pToken->n = strtrim(pToken->z);
S
slguan 已提交
1211 1212 1213 1214

      int len = tSQLGetToken(pToken->z, &pToken->type);

      // single token, validate it
1215
      if (len == pToken->n) {
H
huili 已提交
1216
        return validateQuoteToken(pToken);
S
slguan 已提交
1217
      } else {
1218 1219
        sep = strnchr(pToken->z, TS_PATH_DELIMITER[0], pToken->n, true);
        if (sep == NULL) {
1220
          return TSDB_CODE_TSC_INVALID_SQL;
1221
        }
S
slguan 已提交
1222

H
huili 已提交
1223
        return tscValidateName(pToken);
1224
      }
H
hzcheng 已提交
1225 1226
    } else {
      if (isNumber(pToken)) {
1227
        return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
1228 1229 1230 1231 1232 1233
      }
    }
  } else {  // two part
    int32_t oldLen = pToken->n;
    char*   pStr = pToken->z;

H
huili 已提交
1234
    if (pToken->type == TK_SPACE) {
H
Haojun Liao 已提交
1235
      pToken->n = strtrim(pToken->z);
H
huili 已提交
1236 1237
    }

H
hzcheng 已提交
1238 1239
    pToken->n = tSQLGetToken(pToken->z, &pToken->type);
    if (pToken->z[pToken->n] != TS_PATH_DELIMITER[0]) {
1240
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
1241 1242 1243
    }

    if (pToken->type != TK_STRING && pToken->type != TK_ID) {
1244
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
1245 1246 1247
    }

    if (pToken->type == TK_STRING && validateQuoteToken(pToken) != TSDB_CODE_SUCCESS) {
1248
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
1249 1250 1251 1252 1253 1254 1255 1256
    }

    int32_t firstPartLen = pToken->n;

    pToken->z = sep + 1;
    pToken->n = oldLen - (sep - pStr) - 1;
    int32_t len = tSQLGetToken(pToken->z, &pToken->type);
    if (len != pToken->n || (pToken->type != TK_STRING && pToken->type != TK_ID)) {
1257
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
1258 1259 1260
    }

    if (pToken->type == TK_STRING && validateQuoteToken(pToken) != TSDB_CODE_SUCCESS) {
1261
      return TSDB_CODE_TSC_INVALID_SQL;
H
hzcheng 已提交
1262 1263 1264 1265
    }

    // re-build the whole name string
    if (pStr[firstPartLen] == TS_PATH_DELIMITER[0]) {
H
hjxilinx 已提交
1266
      // first part do not have quote do nothing
H
hzcheng 已提交
1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287
    } else {
      pStr[firstPartLen] = TS_PATH_DELIMITER[0];
      memmove(&pStr[firstPartLen + 1], pToken->z, pToken->n);
      pStr[firstPartLen + sizeof(TS_PATH_DELIMITER[0]) + pToken->n] = 0;
    }
    pToken->n += (firstPartLen + sizeof(TS_PATH_DELIMITER[0]));
    pToken->z = pStr;
  }

  return TSDB_CODE_SUCCESS;
}

void tscIncStreamExecutionCount(void* pStream) {
  if (pStream == NULL) {
    return;
  }

  SSqlStream* ps = (SSqlStream*)pStream;
  ps->num += 1;
}

H
hjxilinx 已提交
1288 1289
bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId) {
  if (pTableMetaInfo->pTableMeta == NULL) {
H
hzcheng 已提交
1290 1291 1292
    return false;
  }

1293
  if (colId == TSDB_TBNAME_COLUMN_INDEX) {
H
hzcheng 已提交
1294 1295 1296
    return true;
  }

H
hjxilinx 已提交
1297
  SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1298
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1299 1300
  
  int32_t  numOfTotal = tinfo.numOfTags + tinfo.numOfColumns;
H
hzcheng 已提交
1301 1302 1303 1304 1305 1306 1307 1308 1309 1310

  for (int32_t i = 0; i < numOfTotal; ++i) {
    if (pSchema[i].colId == colId) {
      return true;
    }
  }

  return false;
}

S
slguan 已提交
1311 1312
void tscTagCondCopy(STagCond* dest, const STagCond* src) {
  memset(dest, 0, sizeof(STagCond));
H
hjxilinx 已提交
1313

H
hjxilinx 已提交
1314 1315 1316
  if (src->tbnameCond.cond != NULL) {
    dest->tbnameCond.cond = strdup(src->tbnameCond.cond);
  }
S
slguan 已提交
1317 1318 1319 1320

  dest->tbnameCond.uid = src->tbnameCond.uid;

  memcpy(&dest->joinInfo, &src->joinInfo, sizeof(SJoinInfo));
1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340
  dest->relType = src->relType;
  
  if (src->pCond == NULL) {
    return;
  }
  
  size_t s = taosArrayGetSize(src->pCond);
  dest->pCond = taosArrayInit(s, sizeof(SCond));
  
  for (int32_t i = 0; i < s; ++i) {
    SCond* pCond = taosArrayGet(src->pCond, i);
    
    SCond c = {0};
    c.len = pCond->len;
    c.uid = pCond->uid;
    
    if (pCond->len > 0) {
      assert(pCond->cond != NULL);
      c.cond = malloc(c.len);
      memcpy(c.cond, pCond->cond, c.len);
H
hjxilinx 已提交
1341
    }
1342 1343
    
    taosArrayPush(dest->pCond, &c);
H
hzcheng 已提交
1344 1345 1346
  }
}

1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357
void tscTagCondRelease(STagCond* pTagCond) {
  free(pTagCond->tbnameCond.cond);
  
  if (pTagCond->pCond != NULL) {
    size_t s = taosArrayGetSize(pTagCond->pCond);
    for (int32_t i = 0; i < s; ++i) {
      SCond* p = taosArrayGet(pTagCond->pCond, i);
      tfree(p->cond);
    }
  
    taosArrayDestroy(pTagCond->pCond);
H
hzcheng 已提交
1358 1359
  }

1360
  memset(pTagCond, 0, sizeof(STagCond));
H
hzcheng 已提交
1361 1362
}

1363
void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo) {
H
hjxilinx 已提交
1364
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
1365
  SSchema*        pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1366 1367 1368
  
  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
  for (int32_t i = 0; i < numOfExprs; ++i) {
1369
    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
S
slguan 已提交
1370
    pColInfo[i].functionId = pExpr->functionId;
H
hzcheng 已提交
1371

S
slguan 已提交
1372
    if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
H
hjxilinx 已提交
1373
      SSchema* pTagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
1374 1375 1376
      
      int16_t index = pExpr->colInfo.colIndex;
      pColInfo[i].type = (index != -1) ? pTagSchema[index].type : TSDB_DATA_TYPE_BINARY;
H
hzcheng 已提交
1377
    } else {
1378
      pColInfo[i].type = pSchema[pExpr->colInfo.colIndex].type;
H
hzcheng 已提交
1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390
    }
  }
}

void tscSetFreeHeatBeat(STscObj* pObj) {
  if (pObj == NULL || pObj->signature != pObj || pObj->pHb == NULL) {
    return;
  }

  SSqlObj* pHeatBeat = pObj->pHb;
  assert(pHeatBeat == pHeatBeat->signature);

S
slguan 已提交
1391
  // to denote the heart-beat timer close connection and free all allocated resources
1392 1393
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pHeatBeat->cmd, 0);
  pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
H
hzcheng 已提交
1394 1395 1396 1397
}

bool tscShouldFreeHeatBeat(SSqlObj* pHb) {
  assert(pHb == pHb->signature);
1398 1399 1400

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pHb->cmd, 0);
  return pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE;
H
hzcheng 已提交
1401 1402 1403
}

/*
weixin_48148422's avatar
weixin_48148422 已提交
1404
 * the following four kinds of SqlObj should not be freed
H
hzcheng 已提交
1405 1406 1407
 * 1. SqlObj for stream computing
 * 2. main SqlObj
 * 3. heartbeat SqlObj
weixin_48148422's avatar
weixin_48148422 已提交
1408
 * 4. SqlObj for subscription
H
hzcheng 已提交
1409 1410 1411 1412 1413 1414
 *
 * If res code is error and SqlObj does not belong to above types, it should be
 * automatically freed for async query, ignoring that connection should be kept.
 *
 * If connection need to be recycled, the SqlObj also should be freed.
 */
H
hjxilinx 已提交
1415
bool tscShouldBeFreed(SSqlObj* pSql) {
H
Haojun Liao 已提交
1416
  if (pSql == NULL || pSql->signature != pSql) {
H
hzcheng 已提交
1417 1418
    return false;
  }
H
Haojun Liao 已提交
1419
  
H
hzcheng 已提交
1420
  STscObj* pTscObj = pSql->pTscObj;
H
Haojun Liao 已提交
1421
  if (pSql->pStream != NULL || pTscObj->pHb == pSql || pSql->pSubscription != NULL) {
H
hzcheng 已提交
1422 1423 1424
    return false;
  }

H
Haojun Liao 已提交
1425
  // only the table meta and super table vgroup query will free resource automatically
H
hzcheng 已提交
1426
  int32_t command = pSql->cmd.command;
H
Haojun Liao 已提交
1427
  if (command == TSDB_SQL_META || command == TSDB_SQL_STABLEVGROUP) {
1428
    return true;
H
hzcheng 已提交
1429
  }
H
Haojun Liao 已提交
1430

H
Haojun Liao 已提交
1431
  return false;
H
hzcheng 已提交
1432 1433
}

1434 1435 1436
/**
 *
 * @param pCmd
1437
 * @param clauseIndex denote the index of the union sub clause, usually are 0, if no union query exists.
1438 1439 1440
 * @param tableIndex  denote the table index for join query, where more than one table exists
 * @return
 */
1441
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd* pCmd, int32_t clauseIndex, int32_t tableIndex) {
1442
  if (pCmd == NULL || pCmd->numOfClause == 0) {
S
slguan 已提交
1443 1444 1445
    return NULL;
  }

1446
  assert(clauseIndex >= 0 && clauseIndex < pCmd->numOfClause);
1447

1448
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
H
hjxilinx 已提交
1449
  return tscGetMetaInfo(pQueryInfo, tableIndex);
S
slguan 已提交
1450 1451
}

H
hjxilinx 已提交
1452
STableMetaInfo* tscGetMetaInfo(SQueryInfo* pQueryInfo, int32_t tableIndex) {
H
hjxilinx 已提交
1453
  assert(pQueryInfo != NULL);
1454

H
hjxilinx 已提交
1455
  if (pQueryInfo->pTableMetaInfo == NULL) {
1456 1457 1458 1459
    assert(pQueryInfo->numOfTables == 0);
    return NULL;
  }

H
hjxilinx 已提交
1460
  assert(tableIndex >= 0 && tableIndex <= pQueryInfo->numOfTables && pQueryInfo->pTableMetaInfo != NULL);
1461

H
hjxilinx 已提交
1462
  return pQueryInfo->pTableMetaInfo[tableIndex];
1463 1464
}

1465
int32_t tscGetQueryInfoDetailSafely(SSqlCmd* pCmd, int32_t subClauseIndex, SQueryInfo** pQueryInfo) {
1466
  int32_t ret = TSDB_CODE_SUCCESS;
1467

1468
  *pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex);
1469

1470 1471 1472 1473
  while ((*pQueryInfo) == NULL) {
    if ((ret = tscAddSubqueryInfo(pCmd)) != TSDB_CODE_SUCCESS) {
      return ret;
    }
1474

1475 1476
    (*pQueryInfo) = tscGetQueryInfoDetail(pCmd, subClauseIndex);
  }
1477

1478 1479 1480
  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
1481
STableMetaInfo* tscGetTableMetaInfoByUid(SQueryInfo* pQueryInfo, uint64_t uid, int32_t* index) {
S
slguan 已提交
1482
  int32_t k = -1;
1483 1484

  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
hjxilinx 已提交
1485
    if (pQueryInfo->pTableMetaInfo[i]->pTableMeta->uid == uid) {
S
slguan 已提交
1486 1487 1488 1489 1490 1491 1492 1493 1494
      k = i;
      break;
    }
  }

  if (index != NULL) {
    *index = k;
  }

H
hjxilinx 已提交
1495
  assert(k != -1);
H
hjxilinx 已提交
1496
  return tscGetMetaInfo(pQueryInfo, k);
S
slguan 已提交
1497 1498
}

H
hjxilinx 已提交
1499 1500 1501 1502 1503 1504 1505 1506 1507
void tscInitQueryInfo(SQueryInfo* pQueryInfo) {
  assert(pQueryInfo->fieldsInfo.pFields == NULL);
  pQueryInfo->fieldsInfo.pFields = taosArrayInit(4, sizeof(TAOS_FIELD));
  
  assert(pQueryInfo->fieldsInfo.pSupportInfo == NULL);
  pQueryInfo->fieldsInfo.pSupportInfo = taosArrayInit(4, sizeof(SFieldSupInfo));
  
  assert(pQueryInfo->exprList == NULL);
  pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES);
1508
  pQueryInfo->colList  = taosArrayInit(4, POINTER_BYTES);
H
hjxilinx 已提交
1509 1510
}

1511
int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) {
1512
  assert(pCmd != NULL);
1513

H
hjxilinx 已提交
1514
  // todo refactor: remove this structure
1515 1516
  size_t s = pCmd->numOfClause + 1;
  char*  tmp = realloc(pCmd->pQueryInfo, s * POINTER_BYTES);
1517
  if (tmp == NULL) {
1518
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
1519
  }
1520 1521 1522 1523

  pCmd->pQueryInfo = (SQueryInfo**)tmp;

  SQueryInfo* pQueryInfo = calloc(1, sizeof(SQueryInfo));
H
hjxilinx 已提交
1524
  tscInitQueryInfo(pQueryInfo);
H
hjxilinx 已提交
1525
  
1526 1527 1528
  pQueryInfo->msg = pCmd->payload;  // pointer to the parent error message buffer

  pCmd->pQueryInfo[pCmd->numOfClause++] = pQueryInfo;
1529 1530 1531
  return TSDB_CODE_SUCCESS;
}

1532
static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) {
1533
  tscTagCondRelease(&pQueryInfo->tagCond);
H
hjxilinx 已提交
1534
  tscFieldInfoClear(&pQueryInfo->fieldsInfo);
1535

H
hjxilinx 已提交
1536 1537
  tscSqlExprInfoDestroy(pQueryInfo->exprList);
  pQueryInfo->exprList = NULL;
1538

1539
  tscColumnListDestroy(pQueryInfo->colList);
H
hjxilinx 已提交
1540
  pQueryInfo->colList = NULL;
1541

1542 1543 1544 1545 1546
  if (pQueryInfo->groupbyExpr.columnInfo != NULL) {
    taosArrayDestroy(pQueryInfo->groupbyExpr.columnInfo);
    pQueryInfo->groupbyExpr.columnInfo = NULL;
  }
  
1547
  pQueryInfo->tsBuf = tsBufDestory(pQueryInfo->tsBuf);
1548

1549
  tfree(pQueryInfo->fillVal);
1550
}
1551

1552
void tscClearSubqueryInfo(SSqlCmd* pCmd) {
1553
  for (int32_t i = 0; i < pCmd->numOfClause; ++i) {
1554
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i);
1555
    freeQueryInfoImpl(pQueryInfo);
1556
  }
1557 1558
}

1559
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) {
1560
  tscDebug("%p deref the table meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables);
1561
  
1562 1563 1564 1565 1566
  for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
  
    tscClearTableMetaInfo(pTableMetaInfo, removeFromCache);
    free(pTableMetaInfo);
1567 1568 1569 1570 1571
  }
  
  tfree(pQueryInfo->pTableMetaInfo);
}

1572
STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta,
1573
                                    SVgroupsInfo* vgroupList, SArray* pTagCols) {
H
hjxilinx 已提交
1574
  void* pAlloc = realloc(pQueryInfo->pTableMetaInfo, (pQueryInfo->numOfTables + 1) * POINTER_BYTES);
S
slguan 已提交
1575 1576 1577 1578
  if (pAlloc == NULL) {
    return NULL;
  }

H
hjxilinx 已提交
1579 1580
  pQueryInfo->pTableMetaInfo = pAlloc;
  pQueryInfo->pTableMetaInfo[pQueryInfo->numOfTables] = calloc(1, sizeof(STableMetaInfo));
S
slguan 已提交
1581

H
hjxilinx 已提交
1582
  STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[pQueryInfo->numOfTables];
H
hjxilinx 已提交
1583
  assert(pTableMetaInfo != NULL);
S
slguan 已提交
1584 1585

  if (name != NULL) {
B
Bomin Zhang 已提交
1586
    tstrncpy(pTableMetaInfo->name, name, sizeof(pTableMetaInfo->name));
S
slguan 已提交
1587 1588
  }

H
hjxilinx 已提交
1589
  pTableMetaInfo->pTableMeta = pTableMeta;
1590 1591
  
  if (vgroupList != NULL) {
H
hjxilinx 已提交
1592
    size_t size = sizeof(SVgroupsInfo) + sizeof(SCMVgroupInfo) * vgroupList->numOfVgroups;
1593 1594
    pTableMetaInfo->vgroupList = malloc(size);
    memcpy(pTableMetaInfo->vgroupList, vgroupList, size);
1595
  }
S
slguan 已提交
1596

1597 1598 1599
  pTableMetaInfo->tagColList = taosArrayInit(4, POINTER_BYTES);
  if (pTagCols != NULL) {
    tscColumnListCopy(pTableMetaInfo->tagColList, pTagCols, -1);
S
slguan 已提交
1600
  }
1601
  
1602
  pQueryInfo->numOfTables += 1;
H
hjxilinx 已提交
1603
  return pTableMetaInfo;
S
slguan 已提交
1604 1605
}

H
hjxilinx 已提交
1606
STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo* pQueryInfo) {
1607
  return tscAddTableMetaInfo(pQueryInfo, NULL, NULL, NULL, NULL);
1608
}
S
slguan 已提交
1609

1610
void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache) {
H
hjxilinx 已提交
1611
  if (pTableMetaInfo == NULL) {
S
slguan 已提交
1612 1613 1614
    return;
  }

H
hjxilinx 已提交
1615
  taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache);
1616
  tfree(pTableMetaInfo->vgroupList);
1617
  
1618 1619
  tscColumnListDestroy(pTableMetaInfo->tagColList);
  pTableMetaInfo->tagColList = NULL;
S
slguan 已提交
1620 1621 1622
}

void tscResetForNextRetrieve(SSqlRes* pRes) {
H
hjxilinx 已提交
1623 1624 1625
  if (pRes == NULL) {
    return;
  }
1626

S
slguan 已提交
1627 1628 1629 1630
  pRes->row = 0;
  pRes->numOfRows = 0;
}

H
Haojun Liao 已提交
1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642
SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) {
  SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
  if (pNew == NULL) {
    tscError("%p new subquery failed, tableIndex:%d", pSql, 0);
    return NULL;
  }

  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

  SSqlCmd* pCmd = &pNew->cmd;
  pCmd->command = cmd;
H
Haojun Liao 已提交
1643
  pCmd->parseFinished = 1;
H
Haojun Liao 已提交
1644 1645 1646 1647 1648 1649 1650 1651 1652 1653

  if (tscAddSubqueryInfo(pCmd) != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pNew);
    return NULL;
  }

  pNew->fp = fp;
  pNew->param = param;
  pNew->maxRetry = TSDB_MAX_REPLICA_NUM;

H
Haojun Liao 已提交
1654 1655 1656 1657 1658 1659 1660 1661
  pNew->sqlstr = strdup(pSql->sqlstr);
  if (pNew->sqlstr == NULL) {
    tscError("%p new subquery failed", pSql);

    free(pNew);
    return NULL;
  }

H
Haojun Liao 已提交
1662 1663 1664 1665 1666 1667 1668 1669 1670 1671
  SQueryInfo* pQueryInfo = NULL;
  tscGetQueryInfoDetailSafely(pCmd, 0, &pQueryInfo);

  assert(pSql->cmd.clauseIndex == 0);
  STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);

  tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL);
  return pNew;
}

1672
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, int32_t cmd, SSqlObj* pPrevSql) {
1673
  SSqlCmd* pCmd = &pSql->cmd;
S
slguan 已提交
1674 1675
  SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
  if (pNew == NULL) {
1676
    tscError("%p new subquery failed, tableIndex:%d", pSql, tableIndex);
S
slguan 已提交
1677 1678
    return NULL;
  }
1679 1680
  
  STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, tableIndex);
S
slguan 已提交
1681 1682 1683 1684

  pNew->pTscObj = pSql->pTscObj;
  pNew->signature = pNew;

H
Haojun Liao 已提交
1685 1686 1687 1688 1689 1690 1691 1692
  pNew->sqlstr = strdup(pSql->sqlstr);
  if (pNew->sqlstr == NULL) {
    tscError("%p new subquery failed, tableIndex:%d, vgroupIndex:%d", pSql, tableIndex, pTableMetaInfo->vgroupIndex);

    free(pNew);
    return NULL;
  }

1693 1694 1695 1696 1697 1698
  SSqlCmd* pnCmd = &pNew->cmd;
  memcpy(pnCmd, pCmd, sizeof(SSqlCmd));
  
  pnCmd->command = cmd;
  pnCmd->payload = NULL;
  pnCmd->allocSize = 0;
S
slguan 已提交
1699

1700 1701 1702 1703
  pnCmd->pQueryInfo = NULL;
  pnCmd->numOfClause = 0;
  pnCmd->clauseIndex = 0;
  pnCmd->pDataBlocks = NULL;
H
Haojun Liao 已提交
1704
  pnCmd->parseFinished = 1;
1705

1706
  if (tscAddSubqueryInfo(pnCmd) != TSDB_CODE_SUCCESS) {
1707 1708 1709
    tscFreeSqlObj(pNew);
    return NULL;
  }
1710

1711
  SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(pnCmd, 0);
1712
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1713

H
hjxilinx 已提交
1714 1715 1716
  pNewQueryInfo->command = pQueryInfo->command;
  pNewQueryInfo->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
  pNewQueryInfo->intervalTime = pQueryInfo->intervalTime;
H
hjxilinx 已提交
1717 1718 1719 1720
  pNewQueryInfo->slidingTime  = pQueryInfo->slidingTime;
  pNewQueryInfo->type   = pQueryInfo->type;
  pNewQueryInfo->window = pQueryInfo->window;
  pNewQueryInfo->limit  = pQueryInfo->limit;
H
hjxilinx 已提交
1721
  pNewQueryInfo->slimit = pQueryInfo->slimit;
H
hjxilinx 已提交
1722
  pNewQueryInfo->order  = pQueryInfo->order;
H
Haojun Liao 已提交
1723 1724
  pNewQueryInfo->tsBuf  = NULL;
  pNewQueryInfo->fillType = pQueryInfo->fillType;
1725
  pNewQueryInfo->fillVal  = NULL;
H
Haojun Liao 已提交
1726
  pNewQueryInfo->clauseLimit = pQueryInfo->clauseLimit;
1727
  pNewQueryInfo->numOfTables = 0;
H
Haojun Liao 已提交
1728
  pNewQueryInfo->pTableMetaInfo = NULL;
H
hjxilinx 已提交
1729
  
H
hjxilinx 已提交
1730 1731 1732 1733 1734
  pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr;
  if (pQueryInfo->groupbyExpr.columnInfo != NULL) {
    pNewQueryInfo->groupbyExpr.columnInfo = taosArrayClone(pQueryInfo->groupbyExpr.columnInfo);
  }
  
1735
  tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond);
1736

1737
  if (pQueryInfo->fillType != TSDB_FILL_NONE) {
1738 1739
    pNewQueryInfo->fillVal = malloc(pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t));
    memcpy(pNewQueryInfo->fillVal, pQueryInfo->fillVal, pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t));
1740
  }
1741

1742
  if (tscAllocPayload(pnCmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
1743
    tscError("%p new subquery failed, tableIndex:%d, vgroupIndex:%d", pSql, tableIndex, pTableMetaInfo->vgroupIndex);
S
slguan 已提交
1744 1745 1746
    tscFreeSqlObj(pNew);
    return NULL;
  }
1747
  
H
hjxilinx 已提交
1748
  tscColumnListCopy(pNewQueryInfo->colList, pQueryInfo->colList, (int16_t)tableIndex);
1749

S
slguan 已提交
1750 1751
  // set the correct query type
  if (pPrevSql != NULL) {
1752
    SQueryInfo* pPrevQueryInfo = tscGetQueryInfoDetail(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex);
1753
    pNewQueryInfo->type = pPrevQueryInfo->type;
S
slguan 已提交
1754
  } else {
H
Haojun Liao 已提交
1755
    TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY);// it must be the subquery
S
slguan 已提交
1756 1757
  }

H
hjxilinx 已提交
1758
  uint64_t uid = pTableMetaInfo->pTableMeta->uid;
H
hjxilinx 已提交
1759
  tscSqlExprCopy(pNewQueryInfo->exprList, pQueryInfo->exprList, uid, true);
S
slguan 已提交
1760

H
hjxilinx 已提交
1761
  int32_t numOfOutput = tscSqlExprNumOfExprs(pNewQueryInfo);
S
slguan 已提交
1762

H
hjxilinx 已提交
1763
  if (numOfOutput > 0) {  // todo refactor to extract method
H
hjxilinx 已提交
1764
    size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
H
hjxilinx 已提交
1765 1766 1767
    SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
    
    for (int32_t i = 0; i < numOfExprs; ++i) {
1768
      SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
H
hjxilinx 已提交
1769
      
S
slguan 已提交
1770
      if (pExpr->uid == uid) {
H
hjxilinx 已提交
1771 1772 1773 1774 1775
        TAOS_FIELD* p = tscFieldInfoGetField(pFieldInfo, i);
        SFieldSupInfo* pInfo = tscFieldInfoGetSupp(pFieldInfo, i);
  
        SFieldSupInfo* pInfo1 = tscFieldInfoAppend(&pNewQueryInfo->fieldsInfo, p);
        *pInfo1 = *pInfo;
S
slguan 已提交
1776 1777 1778
      }
    }

H
hjxilinx 已提交
1779
    // make sure the the sqlExpr for each fields is correct
H
Haojun Liao 已提交
1780 1781 1782
    // todo handle the agg arithmetic expression
    numOfExprs = tscSqlExprNumOfExprs(pNewQueryInfo);

H
hjxilinx 已提交
1783 1784
    for(int32_t f = 0; f < pNewQueryInfo->fieldsInfo.numOfOutput; ++f) {
      TAOS_FIELD* field = tscFieldInfoGetField(&pNewQueryInfo->fieldsInfo, f);
H
Haojun Liao 已提交
1785 1786
      bool matched = false;

H
hjxilinx 已提交
1787
      for(int32_t k1 = 0; k1 < numOfExprs; ++k1) {
H
hjxilinx 已提交
1788
        SSqlExpr* pExpr1 = tscSqlExprGet(pNewQueryInfo, k1);
H
Haojun Liao 已提交
1789 1790

        if (strcmp(field->name, pExpr1->aliasName) == 0) {  // establish link according to the result field name
H
hjxilinx 已提交
1791 1792
          SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pNewQueryInfo->fieldsInfo, f);
          pInfo->pSqlExpr = pExpr1;
H
Haojun Liao 已提交
1793 1794 1795

          matched = true;
          break;
H
hjxilinx 已提交
1796 1797
        }
      }
H
Haojun Liao 已提交
1798 1799

      assert(matched);
H
hjxilinx 已提交
1800
    }
H
hjxilinx 已提交
1801 1802
  
    tscFieldInfoUpdateOffset(pNewQueryInfo);
S
slguan 已提交
1803 1804 1805 1806
  }

  pNew->fp = fp;
  pNew->param = param;
1807
  pNew->maxRetry = TSDB_MAX_REPLICA_NUM;
H
hjxilinx 已提交
1808

1809
  char* name = pTableMetaInfo->name;
H
hjxilinx 已提交
1810
  STableMetaInfo* pFinalInfo = NULL;
S
slguan 已提交
1811 1812

  if (pPrevSql == NULL) {
1813
    STableMeta* pTableMeta = taosCacheAcquireByData(tscCacheHandle, pTableMetaInfo->pTableMeta);  // get by name may failed due to the cache cleanup
1814
    assert(pTableMeta != NULL);
H
Haojun Liao 已提交
1815

1816
    pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList, pTableMetaInfo->tagColList);
1817 1818
  } else {  // transfer the ownership of pTableMeta to the newly create sql object.
    STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
1819

1820
    STableMeta*  pPrevTableMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pTableMeta);
1821
    
1822 1823
    SVgroupsInfo* pVgroupsInfo = pPrevInfo->vgroupList;
    pPrevInfo->vgroupList = NULL;
1824
    pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pPrevTableMeta, pVgroupsInfo, pTableMetaInfo->tagColList);
S
slguan 已提交
1825 1826
  }

L
lihui 已提交
1827
  if (pFinalInfo->pTableMeta == NULL) {
H
Haojun Liao 已提交
1828
    tscError("%p new subquery failed for get tableMeta is NULL from cache", pSql);
L
lihui 已提交
1829 1830 1831 1832 1833 1834
    tscFreeSqlObj(pNew);
    return NULL;
  }
  
  assert(pNewQueryInfo->numOfTables == 1);
  
weixin_48148422's avatar
weixin_48148422 已提交
1835
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
1836
    assert(pFinalInfo->vgroupList != NULL);
S
slguan 已提交
1837
  }
B
Bomin Zhang 已提交
1838

1839
  if (cmd == TSDB_SQL_SELECT) {
1840 1841
    size_t size = taosArrayGetSize(pNewQueryInfo->colList);
    
1842
    tscDebug(
S
scripts  
Shengliang Guan 已提交
1843
        "%p new subquery:%p, tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%zu, colList:%zu,"
1844
        "fieldInfo:%d, name:%s, qrang:%" PRId64 " - %" PRId64 " order:%d, limit:%" PRId64,
H
hjxilinx 已提交
1845
        pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo),
H
hjxilinx 已提交
1846 1847
        size, pNewQueryInfo->fieldsInfo.numOfOutput, pFinalInfo->name, pNewQueryInfo->window.skey,
        pNewQueryInfo->window.ekey, pNewQueryInfo->order.order, pNewQueryInfo->limit.limit);
1848 1849 1850
    
    tscPrintSelectClause(pNew, 0);
  } else {
1851
    tscDebug("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vgroupIndex);
1852
  }
1853

S
slguan 已提交
1854 1855 1856
  return pNew;
}

H
hjxilinx 已提交
1857 1858 1859 1860 1861
/**
 * To decide if current is a two-stage super table query, join query, or insert. And invoke different
 * procedure accordingly
 * @param pSql
 */
H
hzcheng 已提交
1862 1863
void tscDoQuery(SSqlObj* pSql) {
  SSqlCmd* pCmd = &pSql->cmd;
H
hjxilinx 已提交
1864
  SSqlRes* pRes = &pSql->res;
H
hjxilinx 已提交
1865
  
H
hjxilinx 已提交
1866
  pRes->code = TSDB_CODE_SUCCESS;
H
hjxilinx 已提交
1867
  
H
hzcheng 已提交
1868 1869
  if (pCmd->command > TSDB_SQL_LOCAL) {
    tscProcessLocalCmd(pSql);
H
hjxilinx 已提交
1870 1871 1872 1873 1874 1875 1876 1877
    return;
  }
  
  if (pCmd->command == TSDB_SQL_SELECT) {
    tscAddIntoSqlList(pSql);
  }

  if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) {
H
Haojun Liao 已提交
1878
    tscProcessMultiVnodesImportFromFile(pSql);
H
hzcheng 已提交
1879
  } else {
H
hjxilinx 已提交
1880 1881 1882 1883 1884 1885
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
    uint16_t type = pQueryInfo->type;
  
    if (pSql->fp == (void(*)())tscHandleMultivnodeInsert) {  // multi-vnodes insertion
      tscHandleMultivnodeInsert(pSql);
      return;
H
hzcheng 已提交
1886
    }
H
hjxilinx 已提交
1887 1888
  
    if (QUERY_IS_JOIN_QUERY(type)) {
H
Haojun Liao 已提交
1889
      if (!TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_SUBQUERY)) {
H
hjxilinx 已提交
1890
        tscHandleMasterJoinQuery(pSql);
H
Haojun Liao 已提交
1891 1892 1893 1894 1895 1896 1897 1898 1899
      } else { // for first stage sub query, iterate all vnodes to get all timestamp
        if (!TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
          tscProcessSql(pSql);
        } else { // secondary stage join query.
          if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {  // super table query
            tscHandleMasterSTableQuery(pSql);
          } else {
            tscProcessSql(pSql);
          }
H
hjxilinx 已提交
1900 1901
        }
      }
H
Haojun Liao 已提交
1902 1903

      return;
H
hjxilinx 已提交
1904 1905 1906
    } else if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {  // super table query
      tscHandleMasterSTableQuery(pSql);
      return;
S
slguan 已提交
1907
    }
H
hjxilinx 已提交
1908 1909
    
    tscProcessSql(pSql);
H
hzcheng 已提交
1910 1911
  }
}
S
slguan 已提交
1912

H
Haojun Liao 已提交
1913
int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid) {
S
slguan 已提交
1914
  if (pTagCond->joinInfo.left.uid == uid) {
H
Haojun Liao 已提交
1915 1916 1917
    return pTagCond->joinInfo.left.tagColId;
  } else if (pTagCond->joinInfo.right.uid == uid) {
    return pTagCond->joinInfo.right.tagColId;
H
Haojun Liao 已提交
1918
  } else {
H
Haojun Liao 已提交
1919
    assert(0);
S
slguan 已提交
1920 1921
  }
}
1922

H
Haojun Liao 已提交
1923 1924
bool tscIsUpdateQuery(SSqlObj* pSql) {
  if (pSql == NULL || pSql->signature != pSql) {
1925 1926
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    return TSDB_CODE_TSC_DISCONNECTED;
1927 1928
  }

H
Haojun Liao 已提交
1929 1930
  SSqlCmd* pCmd = &pSql->cmd;
  return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) || TSDB_SQL_USE_DB == pCmd->command);
H
hjxilinx 已提交
1931
}
1932

H
hjxilinx 已提交
1933 1934 1935 1936 1937
int32_t tscInvalidSQLErrMsg(char* msg, const char* additionalInfo, const char* sql) {
  const char* msgFormat1 = "invalid SQL: %s";
  const char* msgFormat2 = "invalid SQL: syntax error near \"%s\" (%s)";
  const char* msgFormat3 = "invalid SQL: syntax error near \"%s\"";

H
hjxilinx 已提交
1938
  const int32_t BACKWARD_CHAR_STEP = 0;
H
hjxilinx 已提交
1939

H
hjxilinx 已提交
1940 1941 1942
  if (sql == NULL) {
    assert(additionalInfo != NULL);
    sprintf(msg, msgFormat1, additionalInfo);
1943
    return TSDB_CODE_TSC_INVALID_SQL;
H
hjxilinx 已提交
1944
  }
H
hjxilinx 已提交
1945 1946

  char buf[64] = {0};  // only extract part of sql string
H
hjxilinx 已提交
1947
  strncpy(buf, (sql - BACKWARD_CHAR_STEP), tListLen(buf) - 1);
H
hjxilinx 已提交
1948

H
hjxilinx 已提交
1949 1950 1951
  if (additionalInfo != NULL) {
    sprintf(msg, msgFormat2, buf, additionalInfo);
  } else {
H
hjxilinx 已提交
1952
    sprintf(msg, msgFormat3, buf);  // no additional information for invalid sql error
H
hjxilinx 已提交
1953
  }
H
hjxilinx 已提交
1954

1955
  return TSDB_CODE_TSC_INVALID_SQL;
1956
}
H
hjxilinx 已提交
1957

H
hjxilinx 已提交
1958 1959
bool tscHasReachLimitation(SQueryInfo* pQueryInfo, SSqlRes* pRes) {
  assert(pQueryInfo != NULL && pQueryInfo->clauseLimit != 0);
H
Haojun Liao 已提交
1960
  return (pQueryInfo->clauseLimit > 0 && pRes->numOfClauseTotal >= pQueryInfo->clauseLimit);
H
hjxilinx 已提交
1961
}
1962

1963 1964 1965 1966 1967
bool tscResultsetFetchCompleted(TAOS_RES *result) {
  SSqlRes* pRes = result;
  return pRes->completed; 
}

1968
char* tscGetErrorMsgPayload(SSqlCmd* pCmd) { return pCmd->payload; }
1969 1970 1971

/**
 *  If current vnode query does not return results anymore (pRes->numOfRows == 0), try the next vnode if exists,
1972
 *  while multi-vnode super table projection query and the result does not reach the limitation.
1973
 */
1974
bool hasMoreVnodesToTry(SSqlObj* pSql) {
1975 1976 1977 1978 1979
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;
  if (pCmd->command != TSDB_SQL_FETCH) {
    return false;
  }
1980

H
Haojun Liao 已提交
1981
  assert(pRes->completed);
1982 1983
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
1984

1985
  // for normal table, no need to try any more if results are all retrieved from one vnode
H
hjLiao 已提交
1986
  if (!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) || (pTableMetaInfo->vgroupList == NULL)) {
1987
    return false;
1988
  }
1989
  
1990 1991 1992
  int32_t numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
  return tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) &&
         (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vgroupIndex < numOfVgroups - 1);
1993 1994
}

1995 1996 1997 1998 1999 2000
void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

2001 2002 2003 2004
  /*
   * no result returned from the current virtual node anymore, try the next vnode if exists
   * if case of: multi-vnode super table projection query
   */
2005
  assert(pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes));
H
hjxilinx 已提交
2006
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
2007 2008 2009
  
  int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
  while (++pTableMetaInfo->vgroupIndex < totalVgroups) {
2010
    tscDebug("%p results from vgroup index:%d completed, try next:%d. total vgroups:%d. current numOfRes:%" PRId64, pSql,
H
Haojun Liao 已提交
2011
             pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, pRes->numOfClauseTotal);
2012

2013 2014 2015 2016 2017 2018
    /*
     * update the limit and offset value for the query on the next vnode,
     * according to current retrieval results
     *
     * NOTE:
     * if the pRes->offset is larger than 0, the start returned position has not reached yet.
H
Haojun Liao 已提交
2019
     * Therefore, the pRes->numOfRows, as well as pRes->numOfClauseTotal, must be 0.
2020 2021 2022
     * The pRes->offset value will be updated by virtual node, during query execution.
     */
    if (pQueryInfo->clauseLimit >= 0) {
H
Haojun Liao 已提交
2023
      pQueryInfo->limit.limit = pQueryInfo->clauseLimit - pRes->numOfClauseTotal;
2024
    }
2025

2026 2027
    pQueryInfo->limit.offset = pRes->offset;
    assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0));
2028
    
2029
    tscDebug("%p new query to next vgroup, index:%d, limit:%" PRId64 ", offset:%" PRId64 ", glimit:%" PRId64,
2030
        pSql, pTableMetaInfo->vgroupIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, pQueryInfo->clauseLimit);
2031

2032 2033 2034 2035 2036 2037 2038 2039
    /*
     * For project query with super table join, the numOfSub is equalled to the number of all subqueries.
     * Therefore, we need to reset the value of numOfSubs to be 0.
     *
     * For super table join with projection query, if anyone of the subquery is exhausted, the query completed.
     */
    pSql->numOfSubs = 0;
    pCmd->command = TSDB_SQL_SELECT;
2040

2041
    tscResetForNextRetrieve(pRes);
2042

2043
    // set the callback function
2044
    pSql->fp = fp;
2045 2046
    int32_t ret = tscProcessSql(pSql);
    if (ret == TSDB_CODE_SUCCESS) {
2047
      return;
2048
    } else {// todo check for failure
2049 2050 2051
    }
  }
}
2052 2053 2054 2055 2056 2057 2058 2059

void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) {
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;

  // current subclause is completed, try the next subclause
  assert(pCmd->clauseIndex < pCmd->numOfClause - 1);

H
hjxilinx 已提交
2060
  pCmd->clauseIndex++;
2061 2062 2063 2064
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

  pSql->cmd.command = pQueryInfo->command;

2065
  //backup the total number of result first
H
Haojun Liao 已提交
2066
  int64_t num = pRes->numOfTotal + pRes->numOfClauseTotal;
H
hjxilinx 已提交
2067
  tscFreeSqlResult(pSql);
2068 2069 2070
  
  pRes->numOfTotal = num;
  
2071
  tfree(pSql->pSubs);
2072 2073
  pSql->numOfSubs = 0;
  
2074 2075 2076 2077 2078
  if (pSql->fp != NULL) {
    pSql->fp = queryFp;
    assert(queryFp != NULL);
  }

2079
  tscDebug("%p try data in the next subclause:%d, total subclause:%d", pSql, pCmd->clauseIndex, pCmd->numOfClause);
2080 2081 2082 2083 2084 2085
  if (pCmd->command > TSDB_SQL_LOCAL) {
    tscProcessLocalCmd(pSql);
  } else {
    tscProcessSql(pSql);
  }
}
H
hjxilinx 已提交
2086

H
Haojun Liao 已提交
2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122
//void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) {
//  SFieldSupInfo* pInfo = TARRAY_GET_ELEM(pFieldInfo->pSupportInfo, columnIndex);
//  assert(pInfo->pSqlExpr != NULL);
//
//  int32_t type = pInfo->pSqlExpr->resType;
//  int32_t bytes = pInfo->pSqlExpr->resBytes;
//
//  char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows + bytes * pRes->row;
//
//  if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) {
//    int32_t realLen = varDataLen(pData);
//    assert(realLen <= bytes - VARSTR_HEADER_SIZE);
//
//    if (isNull(pData, type)) {
//      pRes->tsrow[columnIndex] = NULL;
//    } else {
//      pRes->tsrow[columnIndex] = ((tstr*)pData)->data;
//    }
//
//    if (realLen < pInfo->pSqlExpr->resBytes - VARSTR_HEADER_SIZE) { // todo refactor
//      *(pData + realLen + VARSTR_HEADER_SIZE) = 0;
//    }
//
//    pRes->length[columnIndex] = realLen;
//  } else {
//    assert(bytes == tDataTypeDesc[type].nSize);
//
//    if (isNull(pData, type)) {
//      pRes->tsrow[columnIndex] = NULL;
//    } else {
//      pRes->tsrow[columnIndex] = pData;
//    }
//
//    pRes->length[columnIndex] = bytes;
//  }
//}
H
hjxilinx 已提交
2123

weixin_48148422's avatar
weixin_48148422 已提交
2124 2125 2126
void* malloc_throw(size_t size) {
  void* p = malloc(size);
  if (p == NULL) {
2127
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
weixin_48148422's avatar
weixin_48148422 已提交
2128 2129 2130 2131 2132
  }
  return p;
}

void* calloc_throw(size_t nmemb, size_t size) {
weixin_48148422's avatar
weixin_48148422 已提交
2133
  void* p = calloc(nmemb, size);
weixin_48148422's avatar
weixin_48148422 已提交
2134
  if (p == NULL) {
2135
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
weixin_48148422's avatar
weixin_48148422 已提交
2136 2137 2138 2139 2140 2141 2142
  }
  return p;
}

char* strdup_throw(const char* str) {
  char* p = strdup(str);
  if (p == NULL) {
2143
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
weixin_48148422's avatar
weixin_48148422 已提交
2144 2145 2146
  }
  return p;
}
2147 2148 2149 2150 2151 2152

int tscSetMgmtIpListFromCfg(const char *first, const char *second) {
  tscMgmtIpSet.numOfIps = 0;
  tscMgmtIpSet.inUse = 0;

  if (first && first[0] != 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
2153
    if (strlen(first) >= TSDB_EP_LEN) {
2154
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
2155 2156
      return -1;
    }
2157 2158 2159 2160 2161
    taosGetFqdnPortFromEp(first, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]);
    tscMgmtIpSet.numOfIps++;
  }

  if (second && second[0] != 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
2162
    if (strlen(second) >= TSDB_EP_LEN) {
2163
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
2164 2165
      return -1;
    }
2166 2167 2168 2169
    taosGetFqdnPortFromEp(second, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]);
    tscMgmtIpSet.numOfIps++;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
2170
  if ( tscMgmtIpSet.numOfIps == 0) {
2171
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
2172 2173
    return -1;
  }
2174 2175 2176

  return 0;
}