tscSubquery.c 77.4 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
H
Hui Li 已提交
15 16
#define _GNU_SOURCE
 
H
Haojun Liao 已提交
17
#include "os.h"
H
hjxilinx 已提交
18

H
Haojun Liao 已提交
19 20
#include "qAst.h"
#include "qTsbuf.h"
H
Haojun Liao 已提交
21
#include "tcompare.h"
S
slguan 已提交
22
#include "tscLog.h"
H
Haojun Liao 已提交
23 24
#include "tscSubquery.h"
#include "tschemautil.h"
25
#include "tsclient.h"
H
hjxilinx 已提交
26 27 28 29

typedef struct SInsertSupporter {
  SSubqueryState* pState;
  SSqlObj*  pSql;
30
  int32_t   index;
H
hjxilinx 已提交
31 32
} SInsertSupporter;

H
hjxilinx 已提交
33
static void freeJoinSubqueryObj(SSqlObj* pSql);
H
Haojun Liao 已提交
34
static bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql);
H
hjxilinx 已提交
35

36
static bool tsCompare(int32_t order, int64_t left, int64_t right) {
37
  if (order == TSDB_ORDER_ASC) {
H
hjxilinx 已提交
38 39 40 41 42 43
    return left < right;
  } else {
    return left > right;
  }
}

H
Haojun Liao 已提交
44 45
static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJoinSupporter* pSupporter2, STimeWindow * win) {
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
H
hjxilinx 已提交
46

H
Haojun Liao 已提交
47 48 49 50 51
  STSBuf* output1 = tsBufCreate(true, pQueryInfo->order.order);
  STSBuf* output2 = tsBufCreate(true, pQueryInfo->order.order);

  win->skey = INT64_MAX;
  win->ekey = INT64_MIN;
H
hjxilinx 已提交
52 53 54 55 56 57 58 59 60 61

  SLimitVal* pLimit = &pQueryInfo->limit;
  int32_t    order = pQueryInfo->order.order;
  
  SQueryInfo* pSubQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[0]->cmd, 0);
  SQueryInfo* pSubQueryInfo2 = tscGetQueryInfoDetail(&pSql->pSubs[1]->cmd, 0);
  
  pSubQueryInfo1->tsBuf = output1;
  pSubQueryInfo2->tsBuf = output2;

62 63 64 65 66 67
  // no result generated, return directly
  if (pSupporter1->pTSBuf == NULL || pSupporter2->pTSBuf == NULL) {
    tscDebug("%p at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql);
    return 0;
  }

H
hjxilinx 已提交
68 69 70 71 72 73 74
  tsBufResetPos(pSupporter1->pTSBuf);
  tsBufResetPos(pSupporter2->pTSBuf);

  if (!tsBufNextPos(pSupporter1->pTSBuf)) {
    tsBufFlush(output1);
    tsBufFlush(output2);

75
    tscDebug("%p input1 is empty, 0 for secondary query after ts blocks intersecting", pSql);
H
hjxilinx 已提交
76 77 78 79 80 81 82
    return 0;
  }

  if (!tsBufNextPos(pSupporter2->pTSBuf)) {
    tsBufFlush(output1);
    tsBufFlush(output2);

83
    tscDebug("%p input2 is empty, 0 for secondary query after ts blocks intersecting", pSql);
H
hjxilinx 已提交
84 85 86 87 88 89 90 91 92 93 94
    return 0;
  }

  int64_t numOfInput1 = 1;
  int64_t numOfInput2 = 1;

  while (1) {
    STSElem elem1 = tsBufGetElem(pSupporter1->pTSBuf);
    STSElem elem2 = tsBufGetElem(pSupporter2->pTSBuf);

#ifdef _DEBUG_VIEW
95
    tscInfo("%" PRId64 ", tags:%d \t %" PRId64 ", tags:%d", elem1.ts, elem1.tag, elem2.ts, elem2.tag);
H
hjxilinx 已提交
96 97
#endif

98 99
    int32_t res = tVariantCompare(&elem1.tag, &elem2.tag);
    if (res == -1 || (res == 0 && tsCompare(order, elem1.ts, elem2.ts))) {
H
hjxilinx 已提交
100 101 102 103 104
      if (!tsBufNextPos(pSupporter1->pTSBuf)) {
        break;
      }

      numOfInput1++;
105
    } else if ((res > 0) || (res == 0 && tsCompare(order, elem2.ts, elem1.ts))) {
H
hjxilinx 已提交
106 107 108 109 110 111 112 113 114 115 116
      if (!tsBufNextPos(pSupporter2->pTSBuf)) {
        break;
      }

      numOfInput2++;
    } else {
      /*
       * in case of stable query, limit/offset is not applied here. the limit/offset is applied to the
       * final results which is acquired after the secondry merge of in the client.
       */
      if (pLimit->offset == 0 || pQueryInfo->intervalTime > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
H
Haojun Liao 已提交
117 118
        if (win->skey > elem1.ts) {
          win->skey = elem1.ts;
H
hjxilinx 已提交
119 120
        }
  
H
Haojun Liao 已提交
121 122
        if (win->ekey < elem1.ts) {
          win->ekey = elem1.ts;
H
hjxilinx 已提交
123 124
        }
        
125 126
        tsBufAppend(output1, elem1.vnode, &elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
        tsBufAppend(output2, elem2.vnode, &elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
H
hjxilinx 已提交
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
      } else {
        pLimit->offset -= 1;
      }

      if (!tsBufNextPos(pSupporter1->pTSBuf)) {
        break;
      }

      numOfInput1++;

      if (!tsBufNextPos(pSupporter2->pTSBuf)) {
        break;
      }

      numOfInput2++;
    }
  }

  /*
   * failed to set the correct ts order yet in two cases:
   * 1. only one element
   * 2. only one element for each tag.
   */
  if (output1->tsOrder == -1) {
151 152
    output1->tsOrder = TSDB_ORDER_ASC;
    output2->tsOrder = TSDB_ORDER_ASC;
H
hjxilinx 已提交
153 154 155 156 157
  }

  tsBufFlush(output1);
  tsBufFlush(output2);

H
Haojun Liao 已提交
158 159
  tsBufDestroy(pSupporter1->pTSBuf);
  tsBufDestroy(pSupporter2->pTSBuf);
H
hjxilinx 已提交
160

161
  tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " for secondary query after ts blocks "
H
Haojun Liao 已提交
162 163
           "intersecting, skey:%" PRId64 ", ekey:%" PRId64, pSql, numOfInput1, numOfInput2, output1->numOfTotal,
           win->skey, win->ekey);
H
hjxilinx 已提交
164 165 166 167 168

  return output1->numOfTotal;
}

// todo handle failed to create sub query
169 170
SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index) {
  SJoinSupporter* pSupporter = calloc(1, sizeof(SJoinSupporter));
H
hjxilinx 已提交
171 172 173 174 175 176 177 178 179 180
  if (pSupporter == NULL) {
    return NULL;
  }

  pSupporter->pObj = pSql;
  pSupporter->pState = pState;

  pSupporter->subqueryIndex = index;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
  
H
Haojun Liao 已提交
181 182
  pSupporter->intervalTime = pQueryInfo->intervalTime;
  pSupporter->slidingTime = pQueryInfo->slidingTime;
H
hjxilinx 已提交
183 184 185
  pSupporter->limit = pQueryInfo->limit;

  STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, index);
186
  pSupporter->uid = pTableMetaInfo->pTableMeta->id.uid;
H
hjxilinx 已提交
187 188
  assert (pSupporter->uid != 0);

S
slguan 已提交
189
  taosGetTmpfilePath("join-", pSupporter->path);
H
hjxilinx 已提交
190 191
  pSupporter->f = fopen(pSupporter->path, "w");

H
Haojun Liao 已提交
192
  // todo handle error
H
hjxilinx 已提交
193 194 195 196 197 198 199
  if (pSupporter->f == NULL) {
    tscError("%p failed to create tmp file:%s, reason:%s", pSql, pSupporter->path, strerror(errno));
  }

  return pSupporter;
}

H
Haojun Liao 已提交
200
static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
H
hjxilinx 已提交
201 202 203 204
  if (pSupporter == NULL) {
    return;
  }

H
hjxilinx 已提交
205 206 207 208 209 210 211
  if (pSupporter->exprList != NULL) {
    tscSqlExprInfoDestroy(pSupporter->exprList);
  }
  
  if (pSupporter->colList != NULL) {
    tscColumnListDestroy(pSupporter->colList);
  }
H
hjxilinx 已提交
212

H
hjxilinx 已提交
213
  tscFieldInfoClear(&pSupporter->fieldsInfo);
H
hjxilinx 已提交
214 215 216 217

  if (pSupporter->f != NULL) {
    fclose(pSupporter->f);
    unlink(pSupporter->path);
H
hjxilinx 已提交
218
    pSupporter->f = NULL;
H
hjxilinx 已提交
219 220
  }

S
Shengliang Guan 已提交
221
  taosTFree(pSupporter->pIdTagList);
H
hjxilinx 已提交
222 223 224 225 226 227 228 229 230 231
  tscTagCondRelease(&pSupporter->tagCond);
  free(pSupporter);
}

/*
 * need the secondary query process
 * In case of count(ts)/count(*)/spread(ts) query, that are only applied to
 * primary timestamp column , the secondary query is not necessary
 *
 */
H
Haojun Liao 已提交
232
static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
233 234 235
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  
  for (int32_t i = 0; i < numOfCols; ++i) {
236 237
    SColumn* base = taosArrayGet(pQueryInfo->colList, i);
    if (base->colIndex.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
H
hjxilinx 已提交
238 239 240 241 242 243 244 245 246 247
      return true;
    }
  }

  return false;
}

/*
 * launch secondary stage query to fetch the result that contains timestamp in set
 */
H
Haojun Liao 已提交
248
static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
H
hjxilinx 已提交
249
  int32_t         numOfSub = 0;
250
  SJoinSupporter* pSupporter = NULL;
H
hjxilinx 已提交
251
  
H
Haojun Liao 已提交
252
  //If the columns are not involved in the final select clause, the corresponding query will not be issued.
H
hjxilinx 已提交
253 254
  for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
    pSupporter = pSql->pSubs[i]->param;
H
hjxilinx 已提交
255
    if (taosArrayGetSize(pSupporter->exprList) > 0) {
H
hjxilinx 已提交
256 257 258 259 260 261 262
      ++numOfSub;
    }
  }
  
  assert(numOfSub > 0);
  
  // scan all subquery, if one sub query has only ts, ignore it
263
  tscDebug("%p start to launch secondary subqueries, total:%d, only:%d needs to query", pSql, pSql->numOfSubs, numOfSub);
H
hjxilinx 已提交
264

H
hjxilinx 已提交
265
  //the subqueries that do not actually launch the secondary query to virtual node is set as completed.
266
  SSubqueryState* pState = pSupporter->pState;
H
hjxilinx 已提交
267
  pState->numOfTotal = pSql->numOfSubs;
H
Haojun Liao 已提交
268
  pState->numOfRemain = numOfSub;
H
hjxilinx 已提交
269 270 271 272 273 274 275 276 277
  
  bool success = true;
  
  for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
    SSqlObj *pPrevSub = pSql->pSubs[i];
    pSql->pSubs[i] = NULL;
    
    pSupporter = pPrevSub->param;
  
H
hjxilinx 已提交
278
    if (taosArrayGetSize(pSupporter->exprList) == 0) {
279
      tscDebug("%p subIndex: %d, no need to launch query, ignore it", pSql, i);
H
hjxilinx 已提交
280 281 282 283 284 285 286 287 288 289 290 291 292
    
      tscDestroyJoinSupporter(pSupporter);
      tscFreeSqlObj(pPrevSub);
    
      pSql->pSubs[i] = NULL;
      continue;
    }
  
    SQueryInfo *pSubQueryInfo = tscGetQueryInfoDetail(&pPrevSub->cmd, 0);
    STSBuf *pTSBuf = pSubQueryInfo->tsBuf;
    pSubQueryInfo->tsBuf = NULL;
  
    // free result for async object will also free sqlObj
H
hjxilinx 已提交
293
    assert(tscSqlExprNumOfExprs(pSubQueryInfo) == 1); // ts_comp query only requires one resutl columns
H
hjxilinx 已提交
294 295
    taos_free_result(pPrevSub);
  
296
    SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL);
H
hjxilinx 已提交
297 298 299 300 301 302 303 304 305 306 307 308 309
    if (pNew == NULL) {
      tscDestroyJoinSupporter(pSupporter);
      success = false;
      break;
    }
  
    tscClearSubqueryInfo(&pNew->cmd);
    pSql->pSubs[i] = pNew;
  
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
    pQueryInfo->tsBuf = pTSBuf;  // transfer the ownership of timestamp comp-z data to the new created object
  
    // set the second stage sub query for join process
H
hjxilinx 已提交
310
    TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE);
H
Haojun Liao 已提交
311

H
Haojun Liao 已提交
312 313
    pQueryInfo->intervalTime = pSupporter->intervalTime;
    pQueryInfo->slidingTime = pSupporter->slidingTime;
H
hjxilinx 已提交
314
    pQueryInfo->groupbyExpr = pSupporter->groupbyExpr;
H
hjxilinx 已提交
315
    
H
hjxilinx 已提交
316 317
    tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond);
  
H
hjxilinx 已提交
318 319 320
    pQueryInfo->colList = pSupporter->colList;
    pQueryInfo->exprList = pSupporter->exprList;
    pQueryInfo->fieldsInfo = pSupporter->fieldsInfo;
H
hjxilinx 已提交
321
    
H
hjxilinx 已提交
322 323 324
    pSupporter->exprList = NULL;
    pSupporter->colList = NULL;
    memset(&pSupporter->fieldsInfo, 0, sizeof(SFieldInfo));
H
hjxilinx 已提交
325 326 327 328
  
    SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
    assert(pNew->numOfSubs == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
  
H
hjxilinx 已提交
329
    tscFieldInfoUpdateOffset(pNewQueryInfo);
H
hjxilinx 已提交
330 331 332 333 334 335 336 337 338
  
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
  
    /*
     * When handling the projection query, the offset value will be modified for table-table join, which is changed
     * during the timestamp intersection.
     */
    pSupporter->limit = pQueryInfo->limit;
    pNewQueryInfo->limit = pSupporter->limit;
H
Haojun Liao 已提交
339 340 341 342 343

    SColumnIndex index = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
    SSchema* s = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, 0);

    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, 0);
H
Haojun Liao 已提交
344 345
    int16_t funcId = pExpr->functionId;

H
Haojun Liao 已提交
346
    if ((pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) ||
H
Haojun Liao 已提交
347 348 349 350 351
        (funcId != TSDB_FUNC_TS && funcId != TSDB_FUNC_TS_DUMMY && funcId != TSDB_FUNC_PRJ)) {

      int16_t functionId = tscIsProjectionQuery(pQueryInfo)? TSDB_FUNC_PRJ : TSDB_FUNC_TS;

      tscAddSpecialColumnForSelect(pQueryInfo, 0, functionId, &index, s, TSDB_COL_NORMAL);
H
Haojun Liao 已提交
352 353
      tscPrintSelectClause(pNew, 0);
      tscFieldInfoUpdateOffset(pNewQueryInfo);
H
Haojun Liao 已提交
354 355 356 357

      pExpr = tscSqlExprGet(pQueryInfo, 0);
    }

358
    // set the join condition tag column info, todo extract method
H
Haojun Liao 已提交
359 360
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
      assert(pQueryInfo->tagCond.joinInfo.hasJoin);
361
      int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
H
Haojun Liao 已提交
362

363
      // set the tag column id for executor to extract correct tag value
H
Haojun Liao 已提交
364 365
      pExpr->param[0].i64Key = colId;
      pExpr->numOfParams = 1;
H
Haojun Liao 已提交
366 367
    }

368
    size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
S
TD-1057  
Shengliang Guan 已提交
369
    tscDebug("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s",
H
Haojun Liao 已提交
370 371
             pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, taosArrayGetSize(pNewQueryInfo->exprList),
             numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, pTableMetaInfo->name);
H
hjxilinx 已提交
372 373 374 375
  }
  
  //prepare the subqueries object failed, abort
  if (!success) {
376
    pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
377 378
    tscError("%p failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d", pSql,
        pSql->numOfSubs, pSql->res.code);
H
hjxilinx 已提交
379
    freeJoinSubqueryObj(pSql);
H
hjxilinx 已提交
380 381 382 383 384
    
    return pSql->res.code;
  }
  
  for(int32_t i = 0; i < pSql->numOfSubs; ++i) {
H
Haojun Liao 已提交
385
    if (pSql->pSubs[i] == NULL) {
H
hjxilinx 已提交
386 387
      continue;
    }
H
Haojun Liao 已提交
388

H
Haojun Liao 已提交
389
    tscDoQuery(pSql->pSubs[i]);
H
hjxilinx 已提交
390 391 392 393 394
  }

  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
395
void freeJoinSubqueryObj(SSqlObj* pSql) {
H
hjxilinx 已提交
396 397 398
  SSubqueryState* pState = NULL;

  for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
H
hjxilinx 已提交
399 400 401 402 403
    SSqlObj* pSub = pSql->pSubs[i];
    if (pSub == NULL) {
      continue;
    }
    
404
    SJoinSupporter* p = pSub->param;
H
hjxilinx 已提交
405
    pState = p->pState;
H
hjxilinx 已提交
406

H
hjxilinx 已提交
407
    tscDestroyJoinSupporter(p);
H
hjxilinx 已提交
408

H
hjxilinx 已提交
409 410
    if (pSub->res.code == TSDB_CODE_SUCCESS) {
      taos_free_result(pSub);
H
hjxilinx 已提交
411 412 413
    }
  }

S
Shengliang Guan 已提交
414
  taosTFree(pState);
H
hjxilinx 已提交
415 416 417
  pSql->numOfSubs = 0;
}

418
static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
H
Haojun Liao 已提交
419 420 421
  assert(pSupporter->pState->numOfRemain > 0);

  if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) <= 0) {
H
hjxilinx 已提交
422
    tscError("%p all subquery return and query failed, global code:%d", pSqlObj, pSqlObj->res.code);
H
hjxilinx 已提交
423
    freeJoinSubqueryObj(pSqlObj);
H
hjxilinx 已提交
424 425 426 427
  }
}

// update the query time range according to the join results on timestamp
H
Haojun Liao 已提交
428 429 430
static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) {
  assert(pQueryInfo->window.skey <= win->skey && pQueryInfo->window.ekey >= win->ekey);
  pQueryInfo->window = *win;
H
hjxilinx 已提交
431 432
}

weixin_48148422's avatar
weixin_48148422 已提交
433
int32_t tscCompareTidTags(const void* p1, const void* p2) {
H
Haojun Liao 已提交
434 435
  const STidTags* t1 = (const STidTags*) varDataVal(p1);
  const STidTags* t2 = (const STidTags*) varDataVal(p2);
436 437
  
  if (t1->vgId != t2->vgId) {
weixin_48148422's avatar
weixin_48148422 已提交
438 439
    return (t1->vgId > t2->vgId) ? 1 : -1;
  }
440

weixin_48148422's avatar
weixin_48148422 已提交
441 442
  if (t1->tid != t2->tid) {
    return (t1->tid > t2->tid) ? 1 : -1;
443
  }
weixin_48148422's avatar
weixin_48148422 已提交
444
  return 0;
445 446
}

H
Haojun Liao 已提交
447
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables) {
H
Haojun Liao 已提交
448 449
  SArray*   result = taosArrayInit(4, sizeof(SVgroupTableInfo));
  SArray*   vgTables = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
450 451
  STidTags* prev = NULL;

H
Haojun Liao 已提交
452 453 454
  size_t numOfTables = taosArrayGetSize(tables);
  for (size_t i = 0; i < numOfTables; i++) {
    STidTags* tt = taosArrayGet(tables, i);
weixin_48148422's avatar
weixin_48148422 已提交
455

H
Haojun Liao 已提交
456
    if (prev == NULL || tt->vgId != prev->vgId) {
457
      SVgroupsInfo* pvg = pTableMetaInfo->vgroupList;
weixin_48148422's avatar
weixin_48148422 已提交
458

H
Haojun Liao 已提交
459 460 461
      SVgroupTableInfo info = {{0}};
      for (int32_t m = 0; m < pvg->numOfVgroups; ++m) {
        if (tt->vgId == pvg->vgroups[m].vgId) {
462 463 464 465
          info.vgInfo = pvg->vgroups[m];
          break;
        }
      }
466
      assert(info.vgInfo.numOfEps != 0);
weixin_48148422's avatar
weixin_48148422 已提交
467

H
Haojun Liao 已提交
468
      vgTables = taosArrayInit(4, sizeof(STableIdInfo));
weixin_48148422's avatar
weixin_48148422 已提交
469
      info.itemList = vgTables;
H
Haojun Liao 已提交
470
      taosArrayPush(result, &info);
471
    }
weixin_48148422's avatar
weixin_48148422 已提交
472

473
    tscDebug("%p tid:%d, uid:%"PRIu64",vgId:%d added for vnode query", pSql, tt->tid, tt->uid, tt->vgId)
H
Haojun Liao 已提交
474 475
    STableIdInfo item = {.uid = tt->uid, .tid = tt->tid, .key = INT64_MIN};
    taosArrayPush(vgTables, &item);
weixin_48148422's avatar
weixin_48148422 已提交
476
    prev = tt;
477
  }
weixin_48148422's avatar
weixin_48148422 已提交
478 479

  pTableMetaInfo->pVgroupTables = result;
H
Haojun Liao 已提交
480
  pTableMetaInfo->vgroupIndex = 0;
481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504
}

static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* pParent) {
  SSqlCmd* pCmd = &pSql->cmd;
  tscClearSubqueryInfo(pCmd);
  tscFreeSqlResult(pSql);
  
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  
  tscInitQueryInfo(pQueryInfo);

  TSDB_QUERY_CLEAR_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
  TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
  
  pCmd->command = TSDB_SQL_SELECT;
  pSql->fp = tscJoinQueryCallback;
  
  SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
  
  SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
  tscAddSpecialColumnForSelect(pQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL);
  
  // set the tags value for ts_comp function
H
Haojun Liao 已提交
505 506
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0);
507
    int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
H
Haojun Liao 已提交
508 509 510 511
    pExpr->param->i64Key = tagColId;
    pExpr->numOfParams = 1;
  }

512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527
  // add the filter tag column
  if (pSupporter->colList != NULL) {
    size_t s = taosArrayGetSize(pSupporter->colList);
    
    for (int32_t i = 0; i < s; ++i) {
      SColumn *pCol = taosArrayGetP(pSupporter->colList, i);
      
      if (pCol->numOfFilters > 0) {  // copy to the pNew->cmd.colList if it is filtered.
        SColumn *p = tscColumnClone(pCol);
        taosArrayPush(pQueryInfo->colList, &p);
      }
    }
  }
  
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  
528
  tscDebug(
H
Haojun Liao 已提交
529
      "%p subquery:%p tableIndex:%d, vgroupIndex:%d, numOfVgroups:%d, type:%d, ts_comp query to retrieve timestamps, "
S
TD-1057  
Shengliang Guan 已提交
530
      "numOfExpr:%" PRIzu ", colList:%" PRIzu ", numOfOutputFields:%d, name:%s",
H
Haojun Liao 已提交
531 532
      pParent, pSql, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pQueryInfo->type,
      tscSqlExprNumOfExprs(pQueryInfo), numOfCols, pQueryInfo->fieldsInfo.numOfOutput, pTableMetaInfo->name);
533 534 535 536
  
  tscProcessSql(pSql);
}

H
Haojun Liao 已提交
537
static bool checkForDuplicateTagVal(SQueryInfo* pQueryInfo, SJoinSupporter* p1, SSqlObj* pPSqlObj) {
H
Haojun Liao 已提交
538 539 540 541 542 543 544 545 546
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);

  SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);// todo: tags mismatch, tags not completed
  SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, 0);
  SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];

  for(int32_t i = 1; i < p1->num; ++i) {
    STidTags* prev = (STidTags*) varDataVal(p1->pIdTagList + (i - 1) * p1->tagSize);
    STidTags* p = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize);
547
    assert(prev->vgId >= 1 && p->vgId >= 1);
H
Haojun Liao 已提交
548 549

    if (doCompare(prev->tag, p->tag, pColSchema->type, pColSchema->bytes) == 0) {
H
Haojun Liao 已提交
550 551
      tscError("%p join tags have same value for different table, free all sub SqlObj and quit", pPSqlObj);
      pPSqlObj->res.code = TSDB_CODE_QRY_DUP_JOIN_KEY;
H
Haojun Liao 已提交
552 553 554 555 556 557 558
      return false;
    }
  }

  return true;
}

559
static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray** s1, SArray** s2) {
560
  tscDebug("%p all subqueries retrieve <tid, tags> complete, do tags match", pParentSql);
H
Haojun Liao 已提交
561 562 563 564 565 566 567 568

  SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
  SJoinSupporter* p2 = pParentSql->pSubs[1]->param;

  qsort(p1->pIdTagList, p1->num, p1->tagSize, tscCompareTidTags);
  qsort(p2->pIdTagList, p2->num, p2->tagSize, tscCompareTidTags);

  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
569
  int16_t tagColId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
H
Haojun Liao 已提交
570 571 572 573 574 575

  SSchema* pColSchema = tscGetTableColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);

  *s1 = taosArrayInit(p1->num, p1->tagSize);
  *s2 = taosArrayInit(p2->num, p2->tagSize);

H
Haojun Liao 已提交
576
  if (!(checkForDuplicateTagVal(pQueryInfo, p1, pParentSql) && checkForDuplicateTagVal(pQueryInfo, p2, pParentSql))) {
577
    return TSDB_CODE_QRY_DUP_JOIN_KEY;
H
Haojun Liao 已提交
578 579 580 581 582 583
  }

  int32_t i = 0, j = 0;
  while(i < p1->num && j < p2->num) {
    STidTags* pp1 = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize);
    STidTags* pp2 = (STidTags*) varDataVal(p2->pIdTagList + j * p2->tagSize);
584
    assert(pp1->tid != 0 && pp2->tid != 0);
H
Haojun Liao 已提交
585 586 587

    int32_t ret = doCompare(pp1->tag, pp2->tag, pColSchema->type, pColSchema->bytes);
    if (ret == 0) {
588
      tscDebug("%p tag matched, vgId:%d, val:%d, tid:%d, uid:%"PRIu64", tid:%d, uid:%"PRIu64, pParentSql, pp1->vgId,
H
Haojun Liao 已提交
589 590 591 592 593 594 595 596 597 598 599 600
               *(int*) pp1->tag, pp1->tid, pp1->uid, pp2->tid, pp2->uid);

      taosArrayPush(*s1, pp1);
      taosArrayPush(*s2, pp2);
      j++;
      i++;
    } else if (ret > 0) {
      j++;
    } else {
      i++;
    }
  }
601 602

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
603 604
}

H
Haojun Liao 已提交
605
static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
606
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
607

H
hjxilinx 已提交
608
  SSqlObj* pParentSql = pSupporter->pObj;
H
Haojun Liao 已提交
609

H
hjxilinx 已提交
610 611
  SSqlObj* pSql = (SSqlObj*)tres;
  SSqlCmd* pCmd = &pSql->cmd;
H
Haojun Liao 已提交
612 613 614
  SSqlRes* pRes = &pSql->res;

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
Haojun Liao 已提交
615
  assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY));
H
Haojun Liao 已提交
616

H
Haojun Liao 已提交
617 618 619
  // check for the error code firstly
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    // todo retry if other subqueries are not failed
H
Haojun Liao 已提交
620

H
Haojun Liao 已提交
621 622
    assert(numOfRows < 0 && numOfRows == taos_errno(pSql));
    tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
H
Haojun Liao 已提交
623

H
Haojun Liao 已提交
624 625
    pParentSql->res.code = numOfRows;
    quitAllSubquery(pParentSql, pSupporter);
H
Haojun Liao 已提交
626

H
Haojun Liao 已提交
627 628 629
    tscQueueAsyncRes(pParentSql);
    return;
  }
H
Haojun Liao 已提交
630

H
Haojun Liao 已提交
631 632
  // keep the results in memory
  if (numOfRows > 0) {
633
    size_t validLen = (size_t)(pSupporter->tagSize * pRes->numOfRows);
H
Haojun Liao 已提交
634
    size_t length = pSupporter->totalLen + validLen;
H
Haojun Liao 已提交
635

H
Haojun Liao 已提交
636 637 638 639
    // todo handle memory error
    char* tmp = realloc(pSupporter->pIdTagList, length);
    if (tmp == NULL) {
      tscError("%p failed to malloc memory", pSql);
H
Haojun Liao 已提交
640

H
Haojun Liao 已提交
641 642
      pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
      quitAllSubquery(pParentSql, pSupporter);
H
Haojun Liao 已提交
643

H
Haojun Liao 已提交
644 645 646
      tscQueueAsyncRes(pParentSql);
      return;
    }
H
Haojun Liao 已提交
647

H
Haojun Liao 已提交
648
    pSupporter->pIdTagList = tmp;
H
Haojun Liao 已提交
649

H
Haojun Liao 已提交
650
    memcpy(pSupporter->pIdTagList + pSupporter->totalLen, pRes->data, validLen);
S
Shengliang Guan 已提交
651 652
    pSupporter->totalLen += (int32_t)validLen;
    pSupporter->num += (int32_t)pRes->numOfRows;
H
Haojun Liao 已提交
653

H
Haojun Liao 已提交
654 655 656 657 658 659
    // query not completed, continue to retrieve tid + tag tuples
    if (!pRes->completed) {
      taos_fetch_rows_a(tres, tidTagRetrieveCallback, param);
      return;
    }
  }
H
Haojun Liao 已提交
660

H
Haojun Liao 已提交
661 662 663 664
  // data in current vnode has all returned to client, try next vnode if exits
  // <tid + tag> tuples have been retrieved to client, try <tid + tag> tuples from the next vnode
  if (hasMoreVnodesToTry(pSql)) {
    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
665

H
Haojun Liao 已提交
666 667 668
    int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
    pTableMetaInfo->vgroupIndex += 1;
    assert(pTableMetaInfo->vgroupIndex < totalVgroups);
H
Haojun Liao 已提交
669

670
    tscDebug("%p tid_tag from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%d",
H
Haojun Liao 已提交
671
             pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, pSupporter->num);
H
Haojun Liao 已提交
672

H
Haojun Liao 已提交
673 674
    pCmd->command = TSDB_SQL_SELECT;
    tscResetForNextRetrieve(&pSql->res);
H
Haojun Liao 已提交
675

H
Haojun Liao 已提交
676 677 678 679 680
    // set the callback function
    pSql->fp = tscJoinQueryCallback;
    tscProcessSql(pSql);
    return;
  }
H
Haojun Liao 已提交
681

H
Haojun Liao 已提交
682 683 684 685 686
  // no data exists in next vnode, mark the <tid, tags> query completed
  // only when there is no subquery exits any more, proceeds to get the intersect of the <tid, tags> tuple sets.
  if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) > 0) {
    return;
  }
H
Haojun Liao 已提交
687

H
Haojun Liao 已提交
688
  SArray *s1 = NULL, *s2 = NULL;
689 690 691 692 693
  int32_t code = getIntersectionOfTableTuple(pQueryInfo, pParentSql, &s1, &s2);
  if (code != TSDB_CODE_SUCCESS) {
    freeJoinSubqueryObj(pParentSql);
    pParentSql->res.code = code;
    tscQueueAsyncRes(pParentSql);
694 695 696

    taosArrayDestroy(s1);
    taosArrayDestroy(s2);
697 698 699
    return;
  }

H
Haojun Liao 已提交
700
  if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) {  // no results,return.
701
    tscDebug("%p tag intersect does not generated qualified tables for join, free all sub SqlObj and quit", pParentSql);
H
Haojun Liao 已提交
702
    freeJoinSubqueryObj(pParentSql);
H
Haojun Liao 已提交
703

704 705
    // set no result command
    pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
H
Haojun Liao 已提交
706
    (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
B
Bomin Zhang 已提交
707 708 709 710
  } else {
    // proceed to for ts_comp query
    SSqlCmd* pSubCmd1 = &pParentSql->pSubs[0]->cmd;
    SSqlCmd* pSubCmd2 = &pParentSql->pSubs[1]->cmd;
H
Haojun Liao 已提交
711

B
Bomin Zhang 已提交
712 713 714
    SQueryInfo*     pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0);
    STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo1, 0);
    tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo1, s1);
H
Haojun Liao 已提交
715

B
Bomin Zhang 已提交
716 717 718
    SQueryInfo*     pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0);
    STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0);
    tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2);
H
Haojun Liao 已提交
719

B
Bomin Zhang 已提交
720 721
    pSupporter->pState->numOfTotal = 2;
    pSupporter->pState->numOfRemain = pSupporter->pState->numOfTotal;
H
Haojun Liao 已提交
722

B
Bomin Zhang 已提交
723 724 725 726
    for (int32_t m = 0; m < pParentSql->numOfSubs; ++m) {
      SSqlObj* sub = pParentSql->pSubs[m];
      issueTSCompQuery(sub, sub->param, pParentSql);
    }
H
Haojun Liao 已提交
727
  }
B
Bomin Zhang 已提交
728 729 730

  taosArrayDestroy(s1);
  taosArrayDestroy(s2);
H
Haojun Liao 已提交
731
}
H
Haojun Liao 已提交
732

H
Haojun Liao 已提交
733 734
static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
735

H
Haojun Liao 已提交
736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754
  SSqlObj* pParentSql = pSupporter->pObj;

  SSqlObj* pSql = (SSqlObj*)tres;
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  assert(!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE));

  // check for the error code firstly
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    // todo retry if other subqueries are not failed yet 
    assert(numOfRows < 0 && numOfRows == taos_errno(pSql));
    tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);

    pParentSql->res.code = numOfRows;
    quitAllSubquery(pParentSql, pSupporter);

    tscQueueAsyncRes(pParentSql);
755 756
    return;
  }
H
Haojun Liao 已提交
757

H
Haojun Liao 已提交
758
  if (numOfRows > 0) {  // write the compressed timestamp to disk file
759
    fwrite(pRes->data, (size_t)pRes->numOfRows, 1, pSupporter->f);
H
Haojun Liao 已提交
760 761 762 763 764 765 766 767 768 769
    fclose(pSupporter->f);
    pSupporter->f = NULL;

    STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true);
    if (pBuf == NULL) {  // in error process, close the fd
      tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows);

      pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
      tscQueueAsyncRes(pParentSql);

H
hjxilinx 已提交
770 771
      return;
    }
772

H
Haojun Liao 已提交
773
    if (pSupporter->pTSBuf == NULL) {
774
      tscDebug("%p create tmp file for ts block:%s, size:%d bytes", pSql, pBuf->path, numOfRows);
H
Haojun Liao 已提交
775 776 777 778
      pSupporter->pTSBuf = pBuf;
    } else {
      assert(pQueryInfo->numOfTables == 1);  // for subquery, only one
      STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
779

H
Haojun Liao 已提交
780
      tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex);
H
Haojun Liao 已提交
781
      tsBufDestroy(pBuf);
H
Haojun Liao 已提交
782
    }
H
hjxilinx 已提交
783

H
Haojun Liao 已提交
784 785
    // continue to retrieve ts-comp data from vnode
    if (!pRes->completed) {
S
slguan 已提交
786
      taosGetTmpfilePath("ts-join", pSupporter->path);
H
Haojun Liao 已提交
787
      pSupporter->f = fopen(pSupporter->path, "w");
S
Shengliang Guan 已提交
788
      pRes->row = (int32_t)pRes->numOfRows;
H
hjxilinx 已提交
789

H
Haojun Liao 已提交
790 791
      taos_fetch_rows_a(tres, tsCompRetrieveCallback, param);
      return;
H
hjxilinx 已提交
792
    }
H
Haojun Liao 已提交
793
  }
H
Haojun Liao 已提交
794

H
Haojun Liao 已提交
795 796
  if (hasMoreVnodesToTry(pSql)) {
    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
797

H
Haojun Liao 已提交
798 799 800
    int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
    pTableMetaInfo->vgroupIndex += 1;
    assert(pTableMetaInfo->vgroupIndex < totalVgroups);
H
Haojun Liao 已提交
801

802
    tscDebug("%p results from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%" PRId64,
H
Haojun Liao 已提交
803 804
             pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups,
             pRes->numOfClauseTotal);
H
Haojun Liao 已提交
805

H
Haojun Liao 已提交
806 807
    pCmd->command = TSDB_SQL_SELECT;
    tscResetForNextRetrieve(&pSql->res);
H
Haojun Liao 已提交
808

H
Haojun Liao 已提交
809
    assert(pSupporter->f == NULL);
S
slguan 已提交
810
    taosGetTmpfilePath("ts-join", pSupporter->path);
H
Haojun Liao 已提交
811 812 813
    
    // TODO check for failure
    pSupporter->f = fopen(pSupporter->path, "w");
S
Shengliang Guan 已提交
814
    pRes->row = (int32_t)pRes->numOfRows;
H
Haojun Liao 已提交
815

H
Haojun Liao 已提交
816 817 818 819 820
    // set the callback function
    pSql->fp = tscJoinQueryCallback;
    tscProcessSql(pSql);
    return;
  }
H
Haojun Liao 已提交
821

H
Haojun Liao 已提交
822 823 824
  if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) > 0) {
    return;
  }
H
hjxilinx 已提交
825

826
  tscDebug("%p all subquery retrieve ts complete, do ts block intersect", pParentSql);
H
hjxilinx 已提交
827

H
Haojun Liao 已提交
828 829 830
  // proceeds to launched secondary query to retrieve final data
  SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
  SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
H
Haojun Liao 已提交
831

H
Haojun Liao 已提交
832 833 834
  STimeWindow win = TSWINDOW_INITIALIZER;
  int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &win);
  if (num <= 0) {  // no result during ts intersect
835
    tscDebug("%p no results generated in ts intersection, free all sub SqlObj and quit", pParentSql);
H
Haojun Liao 已提交
836
    freeJoinSubqueryObj(pParentSql);
837 838 839

    // set no result command
    pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
H
Haojun Liao 已提交
840
    (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
H
Haojun Liao 已提交
841 842 843 844 845 846
    return;
  }

  // launch the query the retrieve actual results from vnode along with the filtered timestamp
  SQueryInfo* pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
  updateQueryTimeRange(pPQueryInfo, &win);
H
Haojun Liao 已提交
847
  tscLaunchRealSubqueries(pParentSql);
H
Haojun Liao 已提交
848
}
H
Haojun Liao 已提交
849

H
Haojun Liao 已提交
850 851
static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfRows) {
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
852

H
Haojun Liao 已提交
853 854
  SSqlObj* pParentSql = pSupporter->pObj;
  SSubqueryState* pState = pSupporter->pState;
H
Haojun Liao 已提交
855

H
Haojun Liao 已提交
856 857 858
  SSqlObj* pSql = (SSqlObj*)tres;
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;
H
Haojun Liao 已提交
859

H
Haojun Liao 已提交
860 861 862
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(numOfRows == taos_errno(pSql));
H
hjxilinx 已提交
863

H
Haojun Liao 已提交
864 865
    pParentSql->res.code = numOfRows;
    tscError("%p retrieve failed, index:%d, code:%s", pSql, pSupporter->subqueryIndex, tstrerror(numOfRows));
H
Haojun Liao 已提交
866 867 868

    tscQueueAsyncRes(pParentSql);
    return;
H
Haojun Liao 已提交
869
  }
H
Haojun Liao 已提交
870

H
Haojun Liao 已提交
871 872 873
  if (numOfRows >= 0) {
    pRes->numOfTotal += pRes->numOfRows;
  }
H
Haojun Liao 已提交
874

H
Haojun Liao 已提交
875 876 877
  if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) {
    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
    assert(pQueryInfo->numOfTables == 1);
H
Haojun Liao 已提交
878

H
Haojun Liao 已提交
879 880 881 882 883 884 885 886 887 888 889 890 891 892
    // for projection query, need to try next vnode if current vnode is exhausted
    if ((++pTableMetaInfo->vgroupIndex) < pTableMetaInfo->vgroupList->numOfVgroups) {
      pState->numOfRemain = 1;
      pState->numOfTotal = 1;

      pSql->cmd.command = TSDB_SQL_SELECT;
      pSql->fp = tscJoinQueryCallback;
      tscProcessSql(pSql);

      return;
    }
  }

  if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) {
893
    tscDebug("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pState->numOfRemain, pState->numOfTotal);
H
Haojun Liao 已提交
894 895 896
    return;
  }

897
  tscDebug("%p all %d secondary subqueries retrieval completed, code:%d", tres, pState->numOfTotal, pParentSql->res.code);
H
Haojun Liao 已提交
898 899 900 901 902 903 904 905 906 907

  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    freeJoinSubqueryObj(pParentSql);
    pParentSql->res.completed = true;
  }

  // update the records for each subquery in parent sql object.
  for (int32_t i = 0; i < pParentSql->numOfSubs; ++i) {
    if (pParentSql->pSubs[i] == NULL) {
      continue;
H
hjxilinx 已提交
908
    }
H
Haojun Liao 已提交
909 910 911

    SSqlRes* pRes1 = &pParentSql->pSubs[i]->res;
    pRes1->numOfClauseTotal += pRes1->numOfRows;
H
hjxilinx 已提交
912
  }
H
Haojun Liao 已提交
913 914 915

  // data has retrieved to client, build the join results
  tscBuildResFromSubqueries(pParentSql);
H
hjxilinx 已提交
916 917
}

918
static SJoinSupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch) {
H
hjxilinx 已提交
919
  int32_t notInvolved = 0;
920
  SJoinSupporter* pSupporter = NULL;
H
hjxilinx 已提交
921 922 923 924 925 926
  SSubqueryState* pState = NULL;
  
  for(int32_t i = 0; i < pSql->numOfSubs; ++i) {
    if (pSql->pSubs[i] == NULL) {
      notInvolved++;
    } else {
927
      pSupporter = (SJoinSupporter*)pSql->pSubs[i]->param;
H
hjxilinx 已提交
928 929 930 931
      pState = pSupporter->pState;
    }
  }
  
932
  assert(pState != NULL);
B
Bomin Zhang 已提交
933 934 935 936
  if (pState != NULL) {
    pState->numOfTotal = pSql->numOfSubs;
    pState->numOfRemain = numOfFetch;
  }
H
hjxilinx 已提交
937 938 939 940 941 942 943
  
  return pSupporter;
}

void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
  assert(pSql->numOfSubs >= 1);
  
H
hjxilinx 已提交
944 945
  int32_t numOfFetch = 0;
  bool hasData = true;
H
hjxilinx 已提交
946
  for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
H
hjxilinx 已提交
947 948 949
    // if the subquery is NULL, it does not involved in the final result generation
    SSqlObj* pSub = pSql->pSubs[i];
    if (pSub == NULL) {
H
hjxilinx 已提交
950 951 952
      continue;
    }
    
H
hjxilinx 已提交
953 954 955
    SSqlRes *pRes = &pSub->res;
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0);

H
Haojun Liao 已提交
956 957 958 959 960 961
    if (!tscHasReachLimitation(pQueryInfo, pRes)) {
      if (pRes->row >= pRes->numOfRows) {
        hasData = false;

        if (!pRes->completed) {
          numOfFetch++;
H
Haojun Liao 已提交
962
        }
H
hjxilinx 已提交
963
      }
H
Haojun Liao 已提交
964 965 966 967 968
    } else {  // has reach the limitation, no data anymore
      if (pRes->row >= pRes->numOfRows) {
        hasData = false;
        break;
      }
H
hjxilinx 已提交
969
    }
H
Haojun Liao 已提交
970
  }
H
hjxilinx 已提交
971

H
hjxilinx 已提交
972 973 974 975 976 977 978 979 980 981 982 983 984 985
  // has data remains in client side, and continue to return data to app
  if (hasData) {
    tscBuildResFromSubqueries(pSql);
    return;
  } else if (numOfFetch <= 0) {
    pSql->res.completed = true;
    freeJoinSubqueryObj(pSql);
    
    if (pSql->res.code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, 0);
    } else {
      tscQueueAsyncRes(pSql);
    }
    
H
hjxilinx 已提交
986 987 988 989
    return;
  }

  // TODO multi-vnode retrieve for projection query with limitation has bugs, since the global limiation is not handled
990
  tscDebug("%p retrieve data from %d subqueries", pSql, numOfFetch);
991
  SJoinSupporter* pSupporter = tscUpdateSubqueryStatus(pSql, numOfFetch);
H
hjxilinx 已提交
992 993 994 995 996 997 998 999 1000 1001
  
  for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
    SSqlObj* pSql1 = pSql->pSubs[i];
    if (pSql1 == NULL) {
      continue;
    }
    
    SSqlRes* pRes1 = &pSql1->res;
    SSqlCmd* pCmd1 = &pSql1->cmd;

1002
    pSupporter = (SJoinSupporter*)pSql1->param;
H
hjxilinx 已提交
1003 1004 1005 1006 1007 1008 1009 1010

    // wait for all subqueries completed
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd1, 0);
    assert(pRes1->numOfRows >= 0 && pQueryInfo->numOfTables == 1);

    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
    
    if (pRes1->row >= pRes1->numOfRows) {
1011
      tscDebug("%p subquery:%p retrieve data from vnode, subquery:%d, vgroupIndex:%d", pSql, pSql1,
H
hjxilinx 已提交
1012
               pSupporter->subqueryIndex, pTableMetaInfo->vgroupIndex);
H
hjxilinx 已提交
1013 1014

      tscResetForNextRetrieve(pRes1);
H
Haojun Liao 已提交
1015
      pSql1->fp = joinRetrieveFinalResCallback;
H
hjxilinx 已提交
1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030

      if (pCmd1->command < TSDB_SQL_LOCAL) {
        pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
      }

      tscProcessSql(pSql1);
    }
  }
}

// all subqueries return, set the result output index
void tscSetupOutputColumnIndex(SSqlObj* pSql) {
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;

H
Haojun Liao 已提交
1031
  tscDebug("%p all subquery response, retrieve data for subclause:%d", pSql, pCmd->clauseIndex);
H
hjxilinx 已提交
1032

H
Haojun Liao 已提交
1033
  // the column transfer support struct has been built
H
hjxilinx 已提交
1034
  if (pRes->pColumnIndex != NULL) {
H
Haojun Liao 已提交
1035
    return;
H
hjxilinx 已提交
1036 1037 1038 1039
  }

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

S
Shengliang Guan 已提交
1040
  int32_t numOfExprs = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
H
Haojun Liao 已提交
1041 1042 1043
  pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * numOfExprs);

  for (int32_t i = 0; i < numOfExprs; ++i) {
H
hjxilinx 已提交
1044 1045 1046 1047 1048
    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);

    int32_t tableIndexOfSub = -1;
    for (int32_t j = 0; j < pQueryInfo->numOfTables; ++j) {
      STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, j);
1049
      if (pTableMetaInfo->pTableMeta->id.uid == pExpr->uid) {
H
hjxilinx 已提交
1050 1051 1052 1053 1054 1055 1056 1057 1058 1059
        tableIndexOfSub = j;
        break;
      }
    }

    assert(tableIndexOfSub >= 0 && tableIndexOfSub < pQueryInfo->numOfTables);
    
    SSqlCmd* pSubCmd = &pSql->pSubs[tableIndexOfSub]->cmd;
    SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(pSubCmd, 0);
    
H
Haojun Liao 已提交
1060 1061
    size_t numOfSubExpr = taosArrayGetSize(pSubQueryInfo->exprList);
    for (int32_t k = 0; k < numOfSubExpr; ++k) {
H
hjxilinx 已提交
1062 1063 1064 1065 1066 1067 1068
      SSqlExpr* pSubExpr = tscSqlExprGet(pSubQueryInfo, k);
      if (pExpr->functionId == pSubExpr->functionId && pExpr->colInfo.colId == pSubExpr->colInfo.colId) {
        pRes->pColumnIndex[i] = (SColumnIndex){.tableIndex = tableIndexOfSub, .columnIndex = k};
        break;
      }
    }
  }
H
Haojun Liao 已提交
1069 1070 1071 1072

  // restore the offset value for super table query in case of final result.
  tscRestoreSQLFuncForSTableQuery(pQueryInfo);
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
1073 1074 1075 1076
}

void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
  SSqlObj* pSql = (SSqlObj*)tres;
H
Haojun Liao 已提交
1077

1078
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
1079 1080
  SSqlObj* pParentSql = pSupporter->pObj;

H
hjxilinx 已提交
1081
  // There is only one subquery and table for each subquery.
H
hjxilinx 已提交
1082
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
1083
  assert(pQueryInfo->numOfTables == 1 && pSql->cmd.numOfClause == 1);
H
hjxilinx 已提交
1084

H
Haojun Liao 已提交
1085 1086 1087 1088 1089
  // retrieve actual query results from vnode during the second stage join subquery
  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code, pParentSql->res.code);
    quitAllSubquery(pParentSql, pSupporter);
    tscQueueAsyncRes(pParentSql);
H
hjxilinx 已提交
1090

H
Haojun Liao 已提交
1091 1092
    return;
  }
H
hjxilinx 已提交
1093

H
Haojun Liao 已提交
1094 1095 1096
  // TODO here retry is required, not directly returns to client
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(taos_errno(pSql) == code);
H
hjxilinx 已提交
1097

1098
    tscError("%p abort query, code:%s, global code:%s", pSql, tstrerror(code), tstrerror(pParentSql->res.code));
H
Haojun Liao 已提交
1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144
    pParentSql->res.code = code;

    quitAllSubquery(pParentSql, pSupporter);
    tscQueueAsyncRes(pParentSql);

    return;
  }

  // retrieve <tid, tag> tuples from vnode
  if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) {
    pSql->fp = tidTagRetrieveCallback;
    pSql->cmd.command = TSDB_SQL_FETCH;
    tscProcessSql(pSql);
    return;
  }

  // retrieve ts_comp info from vnode
  if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
    pSql->fp = tsCompRetrieveCallback;
    pSql->cmd.command = TSDB_SQL_FETCH;
    tscProcessSql(pSql);
    return;
  }

  // wait for the other subqueries response from vnode
  if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) > 0) {
    return;
  }

  tscSetupOutputColumnIndex(pParentSql);
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);

  /**
   * if the query is a continue query (vgroupIndex > 0 for projection query) for next vnode, do the retrieval of
   * data instead of returning to its invoker
   */
  if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
    pSupporter->pState->numOfRemain = pSupporter->pState->numOfTotal;  // reset the record value

    pSql->fp = joinRetrieveFinalResCallback;  // continue retrieve data
    pSql->cmd.command = TSDB_SQL_FETCH;
    tscProcessSql(pSql);
  } else {  // first retrieve from vnode during the secondary stage sub-query
    // set the command flag must be after the semaphore been correctly set.
    if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
      (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
H
hjxilinx 已提交
1145
    } else {
H
Haojun Liao 已提交
1146
      tscQueueAsyncRes(pParentSql);
H
hjxilinx 已提交
1147 1148 1149 1150 1151 1152 1153
    }
  }
}

/////////////////////////////////////////////////////////////////////////////////////////
static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code);

1154
static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
H
hjxilinx 已提交
1155

1156
int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) {
H
hjxilinx 已提交
1157 1158 1159 1160
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  
  pSql->res.qhandle = 0x1;
H
Haojun Liao 已提交
1161 1162
  assert(pSql->res.numOfRows == 0);

H
hjxilinx 已提交
1163 1164 1165
  if (pSql->pSubs == NULL) {
    pSql->pSubs = calloc(pSupporter->pState->numOfTotal, POINTER_BYTES);
    if (pSql->pSubs == NULL) {
1166
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
1167 1168 1169
    }
  }
  
1170
  SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL);
H
hjxilinx 已提交
1171
  if (pNew == NULL) {
1172
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184
  }
  
  pSql->pSubs[pSql->numOfSubs++] = pNew;
  assert(pSql->numOfSubs <= pSupporter->pState->numOfTotal);
  
  if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
    addGroupInfoForSubquery(pSql, pNew, 0, tableIndex);
    
    // refactor as one method
    SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
    assert(pNewQueryInfo != NULL);
    
1185 1186 1187 1188 1189 1190 1191
    // update the table index
    size_t num = taosArrayGetSize(pNewQueryInfo->colList);
    for (int32_t i = 0; i < num; ++i) {
      SColumn* pCol = taosArrayGetP(pNewQueryInfo->colList, i);
      pCol->colIndex.tableIndex = 0;
    }
    
H
hjxilinx 已提交
1192 1193 1194 1195 1196 1197 1198
    pSupporter->colList = pNewQueryInfo->colList;
    pNewQueryInfo->colList = NULL;
    
    pSupporter->exprList = pNewQueryInfo->exprList;
    pNewQueryInfo->exprList = NULL;
    
    pSupporter->fieldsInfo = pNewQueryInfo->fieldsInfo;
H
hjxilinx 已提交
1199
  
H
hjxilinx 已提交
1200 1201
    // this data needs to be transfer to support struct
    memset(&pNewQueryInfo->fieldsInfo, 0, sizeof(SFieldInfo));
H
Haojun Liao 已提交
1202 1203
    tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond);//pNewQueryInfo->tagCond;

H
hjxilinx 已提交
1204 1205
    pNew->cmd.numOfCols = 0;
    pNewQueryInfo->intervalTime = 0;
H
Haojun Liao 已提交
1206 1207 1208 1209 1210
    pSupporter->limit = pNewQueryInfo->limit;

    pNewQueryInfo->limit.limit = -1;
    pNewQueryInfo->limit.offset = 0;

H
hjxilinx 已提交
1211 1212 1213 1214
    // backup the data and clear it in the sqlcmd object
    pSupporter->groupbyExpr = pNewQueryInfo->groupbyExpr;
    memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));
    
H
hjxilinx 已提交
1215
    tscInitQueryInfo(pNewQueryInfo);
H
hjxilinx 已提交
1216 1217
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
    
weixin_48148422's avatar
weixin_48148422 已提交
1218
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // return the tableId & tag
1219
      SColumnIndex index = {0};
H
Haojun Liao 已提交
1220 1221 1222 1223

      STagCond* pTagCond = &pSupporter->tagCond;
      assert(pTagCond->joinInfo.hasJoin);

1224
      int32_t tagColId = tscGetJoinTagColIdByUid(pTagCond, pTableMetaInfo->pTableMeta->id.uid);
H
Haojun Liao 已提交
1225
      SSchema* s = tscGetTableColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
H
Haojun Liao 已提交
1226

1227 1228 1229 1230 1231 1232 1233 1234 1235 1236
      // get the tag colId column index
      int32_t numOfTags = tscGetNumOfTags(pTableMetaInfo->pTableMeta);
      SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
      for(int32_t i = 0; i < numOfTags; ++i) {
        if (pSchema[i].colId == tagColId) {
          index.columnIndex = i;
          break;
        }
      }

H
Haojun Liao 已提交
1237
      int16_t bytes = 0;
H
Haojun Liao 已提交
1238
      int16_t type  = 0;
H
Haojun Liao 已提交
1239 1240
      int32_t inter = 0;

H
Haojun Liao 已提交
1241
      getResultDataInfo(s->type, s->bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0);
H
Haojun Liao 已提交
1242

S
Shengliang Guan 已提交
1243
      SSchema s1 = {.colId = s->colId, .type = (uint8_t)type, .bytes = bytes};
H
Haojun Liao 已提交
1244
      pSupporter->tagSize = s1.bytes;
1245
      assert(isValidDataType(s1.type) && s1.bytes > 0);
H
Haojun Liao 已提交
1246

1247 1248
      // set get tags query type
      TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
H
Haojun Liao 已提交
1249
      tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &index, &s1, TSDB_COL_TAG);
1250
      size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
1251
  
1252
      tscDebug(
1253
          "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to tid_tag query to retrieve (tableId, tags), "
S
TD-1057  
Shengliang Guan 已提交
1254
          "exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, tagIndex:%d, name:%s",
1255
          pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo),
H
Haojun Liao 已提交
1256
          numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, index.columnIndex, pNewQueryInfo->pTableMetaInfo[0]->name);
1257 1258 1259 1260 1261 1262 1263 1264
    } else {
      SSchema      colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
      SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
      tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL);

      // set the tags value for ts_comp function
      SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0);

H
Haojun Liao 已提交
1265
      if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
1266
        int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
H
Haojun Liao 已提交
1267 1268 1269
        pExpr->param->i64Key = tagColId;
        pExpr->numOfParams = 1;
      }
1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286

      // add the filter tag column
      if (pSupporter->colList != NULL) {
        size_t s = taosArrayGetSize(pSupporter->colList);

        for (int32_t i = 0; i < s; ++i) {
          SColumn *pCol = taosArrayGetP(pSupporter->colList, i);

          if (pCol->numOfFilters > 0) {  // copy to the pNew->cmd.colList if it is filtered.
            SColumn *p = tscColumnClone(pCol);
            taosArrayPush(pNewQueryInfo->colList, &p);
          }
        }
      }

      size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);

1287
      tscDebug(
B
Bomin Zhang 已提交
1288
          "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%u, transfer to ts_comp query to retrieve timestamps, "
S
TD-1057  
Shengliang Guan 已提交
1289
          "exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s",
1290 1291 1292
          pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo),
          numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, pNewQueryInfo->pTableMetaInfo[0]->name);
    }
H
hjxilinx 已提交
1293
  } else {
H
hjxilinx 已提交
1294
    assert(0);
H
hjxilinx 已提交
1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305
    SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
    pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
  }

  return tscProcessSql(pNew);
}

int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) {
  SSqlCmd* pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0);
1306 1307

  // todo add test
H
hjxilinx 已提交
1308
  SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
1309 1310 1311 1312 1313
  if (pState == NULL) {
    pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pSql->res.code;
  }

H
hjxilinx 已提交
1314
  pState->numOfTotal = pQueryInfo->numOfTables;
H
Haojun Liao 已提交
1315 1316
  pState->numOfRemain = pState->numOfTotal;

1317
  tscDebug("%p start subquery, total:%d", pSql, pQueryInfo->numOfTables);
H
hjxilinx 已提交
1318
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
1319
    SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i);
H
hjxilinx 已提交
1320 1321 1322
    
    if (pSupporter == NULL) {  // failed to create support struct, abort current query
      tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i);
H
Haojun Liao 已提交
1323
      pState->numOfRemain = i;
1324
      pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
1325 1326
      if (0 == i) {
        taosTFree(pState);
H
Haojun Liao 已提交
1327
      }
H
hjxilinx 已提交
1328 1329 1330 1331 1332 1333
      return pSql->res.code;
    }
    
    int32_t code = tscLaunchJoinSubquery(pSql, i, pSupporter);
    if (code != TSDB_CODE_SUCCESS) {  // failed to create subquery object, quit query
      tscDestroyJoinSupporter(pSupporter);
1334
      pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
1335 1336 1337
      if (0 == i) {
        taosTFree(pState);
      }
H
hjxilinx 已提交
1338 1339 1340
      break;
    }
  }
H
Haojun Liao 已提交
1341

1342
  pSql->cmd.command = (pSql->numOfSubs <= 0)? TSDB_SQL_RETRIEVE_EMPTY_RESULT:TSDB_SQL_TABLE_JOIN_RETRIEVE;
H
hjxilinx 已提交
1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355
  
  return TSDB_CODE_SUCCESS;
}

static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState* pState) {
  assert(numOfSubs <= pSql->numOfSubs && numOfSubs >= 0 && pState != NULL);
  
  for(int32_t i = 0; i < numOfSubs; ++i) {
    SSqlObj* pSub = pSql->pSubs[i];
    assert(pSub != NULL);
    
    SRetrieveSupport* pSupport = pSub->param;
    
S
Shengliang Guan 已提交
1356 1357
    taosTFree(pSupport->localBuffer);
    taosTFree(pSupport);
H
hjxilinx 已提交
1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369
    
    tscFreeSqlObj(pSub);
  }
  
  free(pState);
}

int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
  
  // pRes->code check only serves in launching metric sub-queries
1370
  if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
H
hjxilinx 已提交
1371
    pCmd->command = TSDB_SQL_RETRIEVE_LOCALMERGE;  // enable the abort of kill super table function.
H
hjxilinx 已提交
1372 1373 1374 1375 1376 1377 1378
    return pRes->code;
  }
  
  tExtMemBuffer **  pMemoryBuf = NULL;
  tOrderDescriptor *pDesc = NULL;
  SColumnModel *    pModel = NULL;
  
H
Haojun Liao 已提交
1379
  pRes->qhandle = 0x1;  // hack the qhandle check
H
hjxilinx 已提交
1380
  
1381
  const uint32_t nBufferSize = (1u << 16);  // 64KB
H
hjxilinx 已提交
1382 1383 1384
  
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
1385
  
H
hjxilinx 已提交
1386
  pSql->numOfSubs = pTableMetaInfo->vgroupList->numOfVgroups;
1387
  assert(pSql->numOfSubs > 0);
H
hjxilinx 已提交
1388 1389 1390
  
  int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize);
  if (ret != 0) {
1391
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
1392
    tscQueueAsyncRes(pSql);
S
Shengliang Guan 已提交
1393
    taosTFree(pMemoryBuf);
H
hjxilinx 已提交
1394
    return ret;
H
hjxilinx 已提交
1395 1396
  }
  
1397
  pSql->pSubs = calloc(pSql->numOfSubs, POINTER_BYTES);
H
hjxilinx 已提交
1398
  
1399
  tscDebug("%p retrieved query data from %d vnode(s)", pSql, pSql->numOfSubs);
H
hjxilinx 已提交
1400
  SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
1401
  pState->numOfTotal = pSql->numOfSubs;
H
Haojun Liao 已提交
1402 1403
  pState->numOfRemain = pSql->numOfSubs;

H
hjxilinx 已提交
1404 1405 1406
  pRes->code = TSDB_CODE_SUCCESS;
  
  int32_t i = 0;
1407
  for (; i < pSql->numOfSubs; ++i) {
H
hjxilinx 已提交
1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420
    SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport));
    if (trs == NULL) {
      tscError("%p failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
      break;
    }
    
    trs->pExtMemBuffer = pMemoryBuf;
    trs->pOrderDescriptor = pDesc;
    trs->pState = pState;
    
    trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage));
    if (trs->localBuffer == NULL) {
      tscError("%p failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
S
Shengliang Guan 已提交
1421
      taosTFree(trs);
H
hjxilinx 已提交
1422 1423 1424 1425
      break;
    }
    
    trs->subqueryIndex = i;
H
Haojun Liao 已提交
1426
    trs->pParentSql = pSql;
H
hjxilinx 已提交
1427 1428
    trs->pFinalColModel = pModel;
    
1429
    SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL);
H
hjxilinx 已提交
1430 1431
    if (pNew == NULL) {
      tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
S
Shengliang Guan 已提交
1432 1433
      taosTFree(trs->localBuffer);
      taosTFree(trs);
H
hjxilinx 已提交
1434 1435 1436 1437 1438 1439 1440
      break;
    }
    
    // todo handle multi-vnode situation
    if (pQueryInfo->tsBuf) {
      SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
      pNewQueryInfo->tsBuf = tsBufClone(pQueryInfo->tsBuf);
H
Haojun Liao 已提交
1441
      assert(pNewQueryInfo->tsBuf != NULL);
H
hjxilinx 已提交
1442 1443
    }
    
1444
    tscDebug("%p sub:%p create subquery success. orderOfSub:%d", pSql, pNew, trs->subqueryIndex);
H
hjxilinx 已提交
1445 1446
  }
  
1447
  if (i < pSql->numOfSubs) {
H
hjxilinx 已提交
1448
    tscError("%p failed to prepare subquery structure and launch subqueries", pSql);
1449
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
1450
    
1451
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->numOfSubs);
H
hjxilinx 已提交
1452 1453 1454 1455
    doCleanupSubqueries(pSql, i, pState);
    return pRes->code;   // free all allocated resource
  }
  
1456
  if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
1457
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->numOfSubs);
H
hjxilinx 已提交
1458 1459 1460 1461
    doCleanupSubqueries(pSql, i, pState);
    return pRes->code;
  }
  
1462
  for(int32_t j = 0; j < pSql->numOfSubs; ++j) {
H
hjxilinx 已提交
1463 1464 1465
    SSqlObj* pSub = pSql->pSubs[j];
    SRetrieveSupport* pSupport = pSub->param;
    
1466
    tscDebug("%p sub:%p launch subquery, orderOfSub:%d.", pSql, pSub, pSupport->subqueryIndex);
H
hjxilinx 已提交
1467 1468 1469 1470 1471 1472 1473
    tscProcessSql(pSub);
  }
  
  return TSDB_CODE_SUCCESS;
}

static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) {
1474
  tscDebug("%p start to free subquery obj", pSql);
1475 1476 1477 1478 1479 1480 1481

  int32_t  index = trsupport->subqueryIndex;
  SSqlObj *pParentSql = trsupport->pParentSql;

  assert(pSql == pParentSql->pSubs[index]);
  pParentSql->pSubs[index] = NULL;

B
Bomin Zhang 已提交
1482
  taos_free_result(pSql);
S
Shengliang Guan 已提交
1483 1484
  taosTFree(trsupport->localBuffer);
  taosTFree(trsupport);
H
hjxilinx 已提交
1485 1486
}

1487
static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows);
1488
static void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows);
H
hjxilinx 已提交
1489

H
Haojun Liao 已提交
1490
static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t code) {
H
hjxilinx 已提交
1491
// set no disk space error info
1492
  tscError("sub:%p failed to flush data to disk, reason:%s", tres, tstrerror(code));
H
Haojun Liao 已提交
1493
  SSqlObj* pParentSql = trsupport->pParentSql;
H
Haojun Liao 已提交
1494 1495

  pParentSql->res.code = code;
H
hjxilinx 已提交
1496
  trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
H
Haojun Liao 已提交
1497
  tscHandleSubqueryError(trsupport, tres, pParentSql->res.code);
H
hjxilinx 已提交
1498 1499
}

H
Haojun Liao 已提交
1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514
/*
 * current query failed, and the retry count is less than the available
 * count, retry query clear previous retrieved data, then launch a new sub query
 */
static int32_t tscReissueSubquery(SRetrieveSupport *trsupport, SSqlObj *pSql, int32_t code) {
  SSqlObj *pParentSql = trsupport->pParentSql;
  int32_t  subqueryIndex = trsupport->subqueryIndex;

  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
  SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];

  tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]);

  // clear local saved number of results
  trsupport->localBuffer->num = 0;
1515
  tscError("%p sub:%p retrieve/query failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSql, pSql,
H
Haojun Liao 已提交
1516 1517
           tstrerror(code), subqueryIndex, trsupport->numOfRetry);

1518
  SSqlObj *pNew = tscCreateSTableSubquery(trsupport->pParentSql, trsupport, pSql);
H
Haojun Liao 已提交
1519
  if (pNew == NULL) {
1520 1521
    tscError("%p sub:%p failed to create new subquery due to error:%s, abort retry, vgId:%d, orderOfSub:%d",
             trsupport->pParentSql, pSql, tstrerror(terrno), pVgroup->vgId, trsupport->subqueryIndex);
H
Haojun Liao 已提交
1522

1523
    pParentSql->res.code = terrno;
H
Haojun Liao 已提交
1524 1525 1526 1527 1528
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;

    return pParentSql->res.code;
  }

1529 1530 1531 1532 1533
  int32_t ret = tscProcessSql(pNew);

  // if failed to process sql, let following code handle the pSql
  if (ret == TSDB_CODE_SUCCESS) {
    taos_free_result(pSql);
H
Haojun Liao 已提交
1534 1535 1536
    return ret;
  } else {
    return ret;
1537
  }
H
Haojun Liao 已提交
1538 1539
}

1540
void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
H
Haojun Liao 已提交
1541
  SSqlObj *pParentSql = trsupport->pParentSql;
H
hjxilinx 已提交
1542 1543 1544 1545
  int32_t  subqueryIndex = trsupport->subqueryIndex;
  
  assert(pSql != NULL);
  SSubqueryState* pState = trsupport->pState;
H
Haojun Liao 已提交
1546
  assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal);
H
hjxilinx 已提交
1547
  
1548
  // retrieved in subquery failed. OR query cancelled in retrieve phase.
H
Haojun Liao 已提交
1549 1550
  if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) {

H
hjxilinx 已提交
1551 1552
    /*
     * kill current sub-query connection, which may retrieve data from vnodes;
1553
     * Here we get: pPObj->res.code == TSDB_CODE_TSC_QUERY_CANCELLED
H
hjxilinx 已提交
1554 1555 1556
     */
    pSql->res.numOfRows = 0;
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;  // disable retry efforts
1557 1558
    tscDebug("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%s", pParentSql, pSql,
             subqueryIndex, tstrerror(pParentSql->res.code));
H
hjxilinx 已提交
1559
  }
H
Haojun Liao 已提交
1560

H
hjxilinx 已提交
1561
  if (numOfRows >= 0) {  // current query is successful, but other sub query failed, still abort current query.
1562
    tscDebug("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pParentSql, pSql, numOfRows, subqueryIndex);
1563 1564
    tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%s", pParentSql, pSql,
             subqueryIndex, tstrerror(pParentSql->res.code));
H
hjxilinx 已提交
1565
  } else {
H
Haojun Liao 已提交
1566
    if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pParentSql->res.code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1567
      if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
1568 1569 1570
        return;
      }
    } else {  // reach the maximum retry count, abort
H
Haojun Liao 已提交
1571 1572 1573
      atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows);
      tscError("%p sub:%p retrieve failed,code:%s,orderOfSub:%d failed.no more retry,set global code:%s", pParentSql, pSql,
               tstrerror(numOfRows), subqueryIndex, tstrerror(pParentSql->res.code));
H
hjxilinx 已提交
1574 1575
    }
  }
H
Haojun Liao 已提交
1576 1577 1578

  int32_t remain = -1;
  if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) {
1579
    tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex,
H
Haojun Liao 已提交
1580 1581
        pState->numOfTotal - remain);

S
Shengliang Guan 已提交
1582 1583
    tscFreeSubSqlObj(trsupport, pSql);
    return;
H
hjxilinx 已提交
1584 1585 1586
  }
  
  // all subqueries are failed
H
Haojun Liao 已提交
1587 1588 1589
  tscError("%p retrieve from %d vnode(s) completed,code:%s.FAILED.", pParentSql, pState->numOfTotal,
      tstrerror(pParentSql->res.code));

H
hjxilinx 已提交
1590 1591 1592 1593
  // release allocated resource
  tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel,
                            pState->numOfTotal);
  
S
Shengliang Guan 已提交
1594
  taosTFree(trsupport->pState);
H
hjxilinx 已提交
1595
  tscFreeSubSqlObj(trsupport, pSql);
1596

H
hjxilinx 已提交
1597
  // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
1598 1599 1600 1601 1602 1603 1604 1605 1606
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0);

  if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
    (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code);
  } else {  // regular super table query
    if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
      tscQueueAsyncRes(pParentSql);
    }
  }
H
hjxilinx 已提交
1607 1608
}

1609 1610
static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* pSql) {
  int32_t           idx = trsupport->subqueryIndex;
H
Haojun Liao 已提交
1611
  SSqlObj *         pParentSql = trsupport->pParentSql;
1612 1613 1614 1615 1616
  tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
  
  SSubqueryState* pState = trsupport->pState;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  
1617 1618
  STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
  
1619
  // data in from current vnode is stored in cache and disk
S
Shengliang Guan 已提交
1620
  uint32_t numOfRowsFromSubquery = (uint32_t)(trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num);
1621 1622
    tscDebug("%p sub:%p all data retrieved from ep:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pParentSql, pSql,
        pTableMetaInfo->vgroupList->vgroups[0].epAddr[0].fqdn, pTableMetaInfo->vgroupList->vgroups[0].vgId,
1623
        numOfRowsFromSubquery, idx);
1624 1625 1626 1627
  
  tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity);

#ifdef _DEBUG_VIEW
1628
  printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->num);
1629 1630
    SSrcColumnInfo colInfo[256] = {0};
    tscGetSrcColumnInfo(colInfo, pQueryInfo);
1631 1632
    tColModelDisplayEx(pDesc->pColumnModel, trsupport->localBuffer->data, trsupport->localBuffer->num,
                       trsupport->localBuffer->num, colInfo);
1633 1634
#endif
  
H
Haojun Liao 已提交
1635 1636 1637
  if (tsTotalTmpDirGB != 0 && tsAvailTmpDirectorySpace < tsReservedTmpDirectorySpace) {
    tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pParentSql, pSql,
             tsAvailTmpDirectorySpace, tsReservedTmpDirectorySpace);
S
Shengliang Guan 已提交
1638 1639
    tscAbortFurtherRetryRetrieval(trsupport, pSql, TSDB_CODE_TSC_NO_DISKSPACE);
    return;
1640 1641 1642
  }
  
  // each result for a vnode is ordered as an independant list,
H
Haojun Liao 已提交
1643
  // then used as an input of loser tree for disk-based merge
1644 1645
  int32_t code = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pQueryInfo->groupbyExpr.orderType);
  if (code != 0) { // set no disk space error info, and abort retry
S
Shengliang Guan 已提交
1646 1647
    tscAbortFurtherRetryRetrieval(trsupport, pSql, code);
    return;
1648 1649
  }
  
H
Haojun Liao 已提交
1650 1651
  int32_t remain = -1;
  if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) {
H
Haojun Liao 已提交
1652
    tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex,
H
Haojun Liao 已提交
1653 1654
        pState->numOfTotal - remain);

S
Shengliang Guan 已提交
1655 1656
    tscFreeSubSqlObj(trsupport, pSql);
    return;
1657 1658 1659 1660 1661
  }
  
  // all sub-queries are returned, start to local merge process
  pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage;
  
H
Haojun Liao 已提交
1662
  tscDebug("%p retrieve from %d vnodes completed.final NumOfRows:%" PRId64 ",start to build loser tree", pParentSql,
1663 1664
           pState->numOfTotal, pState->numOfRetrievedRows);
  
H
Haojun Liao 已提交
1665
  SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0);
1666 1667
  tscClearInterpInfo(pPQueryInfo);
  
H
Haojun Liao 已提交
1668 1669
  tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel, pParentSql);
  tscDebug("%p build loser tree completed", pParentSql);
1670
  
H
Haojun Liao 已提交
1671 1672 1673
  pParentSql->res.precision = pSql->res.precision;
  pParentSql->res.numOfRows = 0;
  pParentSql->res.row = 0;
1674 1675
  
  // only free once
S
Shengliang Guan 已提交
1676
  taosTFree(trsupport->pState);
1677
  tscFreeSubSqlObj(trsupport, pSql);
H
Haojun Liao 已提交
1678

1679 1680 1681 1682 1683 1684 1685
  // set the command flag must be after the semaphore been correctly set.
  pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE;
  if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
    (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
  } else {
    tscQueueAsyncRes(pParentSql);
  }
1686 1687 1688
}

static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
H
hjxilinx 已提交
1689 1690
  SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
  tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
H
Haojun Liao 已提交
1691
  int32_t           idx   = trsupport->subqueryIndex;
H
Haojun Liao 已提交
1692
  SSqlObj *         pParentSql = trsupport->pParentSql;
H
Haojun Liao 已提交
1693

1694
  assert(tres != NULL);
H
hjxilinx 已提交
1695
  SSqlObj *pSql = (SSqlObj *)tres;
1696 1697 1698 1699 1700
//  if (pSql == NULL) {  // sql object has been released in error process, return immediately
//    tscDebug("%p subquery has been released, idx:%d, abort", pParentSql, idx);
//    return;
//  }

H
hjxilinx 已提交
1701
  SSubqueryState* pState = trsupport->pState;
H
Haojun Liao 已提交
1702
  assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal);
H
hjxilinx 已提交
1703
  
H
Haojun Liao 已提交
1704
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
1705
  SCMVgroupInfo  *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
H
Haojun Liao 已提交
1706 1707 1708

  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
1709
    tscDebug("%p query cancelled/failed, sub:%p, vgId:%d, orderOfSub:%d, code:%s, global code:%s",
H
Haojun Liao 已提交
1710 1711 1712 1713 1714 1715 1716 1717 1718 1719
             pParentSql, pSql, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(numOfRows), tstrerror(pParentSql->res.code));

    tscHandleSubqueryError(param, tres, numOfRows);
    return;
  }

  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(numOfRows == taos_errno(pSql));

    if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
1720
      tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(numOfRows), trsupport->numOfRetry);
H
Haojun Liao 已提交
1721 1722 1723 1724 1725

      if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) {
        return;
      }
    } else {
H
Haojun Liao 已提交
1726
      tscDebug("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(numOfRows));
H
Haojun Liao 已提交
1727 1728 1729 1730 1731
      atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows);  // set global code and abort
    }

    tscHandleSubqueryError(param, tres, numOfRows);
    return;
H
hjxilinx 已提交
1732 1733 1734 1735 1736 1737 1738 1739 1740
  }
  
  SSqlRes *   pRes = &pSql->res;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  
  if (numOfRows > 0) {
    assert(pRes->numOfRows == numOfRows);
    int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows);
    
1741 1742
    tscDebug("%p sub:%p retrieve numOfRows:%" PRId64 " totalNumOfRows:%" PRIu64 " from ep:%s, orderOfSub:%d", pParentSql, pSql,
             pRes->numOfRows, pState->numOfRetrievedRows, pSql->epSet.fqdn[pSql->epSet.inUse], idx);
1743

H
hjxilinx 已提交
1744
    if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
B
Bomin Zhang 已提交
1745
      tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64,
H
Haojun Liao 已提交
1746
               pParentSql, pSql, tsMaxNumOfOrderedResults, num);
S
Shengliang Guan 已提交
1747 1748
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_SORTED_RES_TOO_MANY);
      return;
H
hjxilinx 已提交
1749 1750 1751
    }

#ifdef _DEBUG_VIEW
1752
    printf("received data from vnode: %"PRIu64" rows\n", pRes->numOfRows);
H
hjxilinx 已提交
1753 1754 1755 1756 1757
    SSrcColumnInfo colInfo[256] = {0};

    tscGetSrcColumnInfo(colInfo, pQueryInfo);
    tColModelDisplayEx(pDesc->pColumnModel, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo);
#endif
1758
    
H
Haojun Liao 已提交
1759 1760 1761 1762
    // no disk space for tmp directory
    if (tsTotalTmpDirGB != 0 && tsAvailTmpDirectorySpace < tsReservedTmpDirectorySpace) {
      tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pParentSql, pSql,
               tsAvailTmpDirectorySpace, tsReservedTmpDirectorySpace);
S
Shengliang Guan 已提交
1763 1764
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_NO_DISKSPACE);
      return;
H
hjxilinx 已提交
1765 1766 1767
    }
    
    int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data,
S
Shengliang Guan 已提交
1768
                               (int32_t)pRes->numOfRows, pQueryInfo->groupbyExpr.orderType);
1769
    if (ret != 0) { // set no disk space error info, and abort retry
1770
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_NO_DISKSPACE);
1771 1772 1773 1774
    } else if (pRes->completed) {
      tscAllDataRetrievedFromDnode(trsupport, pSql);
    } else { // continue fetch data from dnode
      taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
H
hjxilinx 已提交
1775
    }
1776
    
1777 1778
  } else { // all data has been retrieved to client
    tscAllDataRetrievedFromDnode(trsupport, pSql);
H
hjxilinx 已提交
1779 1780 1781
  }
}

1782
static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
H
hjxilinx 已提交
1783 1784
  const int32_t table_index = 0;
  
1785
  SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, TSDB_SQL_SELECT, prevSqlObj);
H
hjxilinx 已提交
1786 1787
  if (pNew != NULL) {  // the sub query of two-stage super table query
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
1788

H
hjxilinx 已提交
1789
    pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
1790
    assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1 && trsupport->subqueryIndex < pSql->numOfSubs);
H
hjxilinx 已提交
1791
    
H
hjxilinx 已提交
1792
    // launch subquery for each vnode, so the subquery index equals to the vgroupIndex.
H
hjxilinx 已提交
1793
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index);
H
hjxilinx 已提交
1794
    pTableMetaInfo->vgroupIndex = trsupport->subqueryIndex;
H
hjxilinx 已提交
1795 1796 1797 1798 1799 1800 1801 1802
    
    pSql->pSubs[trsupport->subqueryIndex] = pNew;
  }
  
  return pNew;
}

void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
1803
  SRetrieveSupport *trsupport = (SRetrieveSupport *) param;
H
hjxilinx 已提交
1804
  
H
Haojun Liao 已提交
1805
  SSqlObj*  pParentSql = trsupport->pParentSql;
1806
  SSqlObj*  pSql = (SSqlObj *) tres;
1807

1808 1809
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  assert(pSql->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1);
H
hjxilinx 已提交
1810
  
1811
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
Haojun Liao 已提交
1812
  SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[trsupport->subqueryIndex];
H
Haojun Liao 已提交
1813

H
Haojun Liao 已提交
1814
  // stable query killed or other subquery failed, all query stopped
H
Haojun Liao 已提交
1815
  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
1816
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
H
Haojun Liao 已提交
1817
    tscError("%p query cancelled or failed, sub:%p, vgId:%d, orderOfSub:%d, code:%s, global code:%s",
H
Haojun Liao 已提交
1818 1819 1820 1821
        pParentSql, pSql, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(code), tstrerror(pParentSql->res.code));

    tscHandleSubqueryError(param, tres, code);
    return;
H
hjxilinx 已提交
1822 1823 1824
  }
  
  /*
H
Haojun Liao 已提交
1825
   * if a subquery on a vnode failed, all retrieve operations from vnode that occurs later
1826
   * than this one are actually not necessary, we simply call the tscRetrieveFromDnodeCallBack
H
hjxilinx 已提交
1827 1828
   * function to abort current and remain retrieve process.
   *
1829
   * NOTE: thread safe is required.
H
hjxilinx 已提交
1830
   */
H
Haojun Liao 已提交
1831 1832
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(code == taos_errno(pSql));
1833

H
Haojun Liao 已提交
1834
    if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
1835
      tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry);
H
Haojun Liao 已提交
1836
      if (tscReissueSubquery(trsupport, pSql, code) == TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
1837 1838
        return;
      }
1839
    } else {
H
Haojun Liao 已提交
1840
      tscError("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(code));
H
Haojun Liao 已提交
1841
      atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code);  // set global code and abort
1842
    }
H
Haojun Liao 已提交
1843 1844 1845 1846 1847

    tscHandleSubqueryError(param, tres, pParentSql->res.code);
    return;
  }

H
Haojun Liao 已提交
1848
  tscDebug("%p sub:%p query complete, ep:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSql, pSql,
1849
             pVgroup->epAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex);
H
Haojun Liao 已提交
1850 1851 1852 1853 1854

  if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode
    tscRetrieveFromDnodeCallBack(param, pSql, 0);
  } else {
    taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
H
hjxilinx 已提交
1855 1856 1857
  }
}

H
Haojun Liao 已提交
1858
static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) {
H
hjxilinx 已提交
1859 1860 1861
  SInsertSupporter *pSupporter = (SInsertSupporter *)param;
  SSqlObj* pParentObj = pSupporter->pSql;
  SSubqueryState* pState = pSupporter->pState;
H
Haojun Liao 已提交
1862

H
Haojun Liao 已提交
1863
  // record the total inserted rows
H
Haojun Liao 已提交
1864 1865
  if (numOfRows > 0) {
    pParentObj->res.numOfRows += numOfRows;
H
Haojun Liao 已提交
1866 1867
  }

H
Haojun Liao 已提交
1868
  if (taos_errno(tres) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1869 1870 1871 1872
    SSqlObj* pSql = (SSqlObj*) tres;
    assert(pSql != NULL && pSql->res.code == numOfRows);
    
    pParentObj->res.code = pSql->res.code;
H
hjxilinx 已提交
1873
  }
H
Haojun Liao 已提交
1874

H
Haojun Liao 已提交
1875
  taos_free_result(tres);
S
Shengliang Guan 已提交
1876
  taosTFree(pSupporter);
H
Haojun Liao 已提交
1877

H
Haojun Liao 已提交
1878
  if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) {
H
hjxilinx 已提交
1879 1880 1881
    return;
  }
  
1882
  tscDebug("%p Async insertion completed, total inserted:%" PRId64, pParentObj, pParentObj->res.numOfRows);
H
Haojun Liao 已提交
1883

H
hjxilinx 已提交
1884
  // release data block data
S
Shengliang Guan 已提交
1885
  taosTFree(pState);
1886

H
hjxilinx 已提交
1887 1888
  // restore user defined fp
  pParentObj->fp = pParentObj->fetchFp;
1889 1890

  // todo remove this parameter in async callback function definition.
H
hjxilinx 已提交
1891
  // all data has been sent to vnode, call user function
S
Shengliang Guan 已提交
1892
  int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows;
1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909
  (*pParentObj->fp)(pParentObj->param, pParentObj, v);
}

/**
 * it is a subquery, so after parse the sql string, copy the submit block to payload of itself
 * @param pSql
 * @return
 */
int32_t tscHandleInsertRetry(SSqlObj* pSql) {
  assert(pSql != NULL && pSql->param != NULL);
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;

  SInsertSupporter* pSupporter = (SInsertSupporter*) pSql->param;
  assert(pSupporter->index < pSupporter->pState->numOfTotal);

  STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, pSupporter->index);
H
Haojun Liao 已提交
1910 1911 1912 1913 1914
  int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock);

  if ((pRes->code = code)!= TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return code;  // here the pSql may have been released already.
1915 1916 1917
  }

  return tscProcessSql(pSql);
H
hjxilinx 已提交
1918 1919 1920 1921 1922 1923
}

int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
  
1924 1925 1926
  size_t size = taosArrayGetSize(pCmd->pDataBlocks);
  assert(size > 0);

H
Haojun Liao 已提交
1927 1928 1929
  // the number of already initialized subqueries
  int32_t numOfSub = 0;

H
Haojun Liao 已提交
1930
  pSql->numOfSubs = (uint16_t)size;
1931
  pSql->pSubs = calloc(size, POINTER_BYTES);
H
Haojun Liao 已提交
1932 1933 1934 1935
  if (pSql->pSubs == NULL) {
    goto _error;
  }

S
TD-1057  
Shengliang Guan 已提交
1936
  tscDebug("%p submit data to %" PRIzu " vnode(s)", pSql, size);
1937

H
hjxilinx 已提交
1938 1939
  SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
  pState->numOfTotal = pSql->numOfSubs;
dengyihao's avatar
dengyihao 已提交
1940
  pState->numOfRemain = pSql->numOfSubs;
1941
 
H
hjxilinx 已提交
1942
  pRes->code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1943

H
Haojun Liao 已提交
1944 1945
  while(numOfSub < pSql->numOfSubs) {
    SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter));
H
Haojun Liao 已提交
1946 1947 1948 1949
    if (pSupporter == NULL) {
      goto _error;
    }

1950
    pSupporter->pSql   = pSql;
H
Haojun Liao 已提交
1951
    pSupporter->pState = pState;
1952 1953 1954
    pSupporter->index  = numOfSub;

    SSqlObj *pNew = createSimpleSubObj(pSql, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT);
H
hjxilinx 已提交
1955
    if (pNew == NULL) {
H
Haojun Liao 已提交
1956
      tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, numOfSub, strerror(errno));
H
Haojun Liao 已提交
1957
      goto _error;
H
hjxilinx 已提交
1958
    }
1959 1960 1961
  
    /*
     * assign the callback function to fetchFp to make sure that the error process function can restore
H
Haojun Liao 已提交
1962
     * the callback function (multiVnodeInsertFinalize) correctly.
1963 1964
     */
    pNew->fetchFp = pNew->fp;
H
Haojun Liao 已提交
1965
    pSql->pSubs[numOfSub] = pNew;
H
Haojun Liao 已提交
1966

1967 1968
    STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, numOfSub);
    pRes->code = tscCopyDataBlockToPayload(pNew, pTableDataBlock);
H
Haojun Liao 已提交
1969
    if (pRes->code == TSDB_CODE_SUCCESS) {
1970
      tscDebug("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, numOfSub);
1971
      numOfSub++;
H
Haojun Liao 已提交
1972
    } else {
S
TD-1057  
Shengliang Guan 已提交
1973
      tscDebug("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%" PRIzu ", code:%s", pSql, numOfSub,
1974
               size, tstrerror(pRes->code));
H
Haojun Liao 已提交
1975
      goto _error;
H
Haojun Liao 已提交
1976
    }
H
hjxilinx 已提交
1977 1978
  }
  
H
Haojun Liao 已提交
1979
  if (numOfSub < pSql->numOfSubs) {
H
hjxilinx 已提交
1980
    tscError("%p failed to prepare subObj structure and launch sub-insertion", pSql);
1981
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1982
    goto _error;
H
hjxilinx 已提交
1983
  }
H
Haojun Liao 已提交
1984

1985 1986
  pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);

H
Haojun Liao 已提交
1987 1988
  // use the local variable
  for (int32_t j = 0; j < numOfSub; ++j) {
H
hjxilinx 已提交
1989
    SSqlObj *pSub = pSql->pSubs[j];
1990
    tscDebug("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, j);
H
hjxilinx 已提交
1991 1992
    tscProcessSql(pSub);
  }
H
Haojun Liao 已提交
1993

H
hjxilinx 已提交
1994
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1995 1996

  _error:
H
Haojun Liao 已提交
1997
  for(int32_t j = 0; j < numOfSub; ++j) {
S
Shengliang Guan 已提交
1998
    taosTFree(pSql->pSubs[j]->param);
H
Haojun Liao 已提交
1999 2000 2001
    taos_free_result(pSql->pSubs[j]);
  }

S
Shengliang Guan 已提交
2002
  taosTFree(pState);
H
Haojun Liao 已提交
2003
  return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
2004
}
H
hjxilinx 已提交
2005

H
Haojun Liao 已提交
2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028
static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t columnIndex, int16_t* bytes) {
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

  SFieldSupInfo* pInfo = (SFieldSupInfo*) TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.pSupportInfo, columnIndex);
  assert(pInfo->pSqlExpr != NULL);

  *bytes = pInfo->pSqlExpr->resBytes;
  char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows;

  return pData;
}

static void doBuildResFromSubqueries(SSqlObj* pSql) {
  SSqlRes* pRes = &pSql->res;

  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);

  int32_t numOfRes = INT32_MAX;
  for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
    if (pSql->pSubs[i] == NULL) {
      continue;
    }

S
Shengliang Guan 已提交
2029
    numOfRes = (int32_t)(MIN(numOfRes, pSql->pSubs[i]->res.numOfRows));
H
Haojun Liao 已提交
2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055
  }

  int32_t totalSize = tscGetResRowLength(pQueryInfo->exprList);
  pRes->pRsp = realloc(pRes->pRsp, numOfRes * totalSize);
  pRes->data = pRes->pRsp;

  char* data = pRes->data;
  int16_t bytes = 0;

  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
  for(int32_t i = 0; i < numOfExprs; ++i) {
    SColumnIndex* pIndex = &pRes->pColumnIndex[i];
    SSqlRes *pRes1 = &pSql->pSubs[pIndex->tableIndex]->res;
    SSqlCmd *pCmd1 = &pSql->pSubs[pIndex->tableIndex]->cmd;

    char* pData = getResultBlockPosition(pCmd1, pRes1, pIndex->columnIndex, &bytes);
    memcpy(data, pData, bytes * numOfRes);

    data += bytes * numOfRes;
    pRes1->row = numOfRes;
  }

  pRes->numOfRows = numOfRes;
  pRes->numOfClauseTotal += numOfRes;
}

H
hjxilinx 已提交
2056
void tscBuildResFromSubqueries(SSqlObj *pSql) {
H
Haojun Liao 已提交
2057 2058
  SSqlRes* pRes = &pSql->res;

H
hjxilinx 已提交
2059 2060 2061 2062
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return;
  }
H
Haojun Liao 已提交
2063 2064 2065 2066

  if (pRes->tsrow == NULL) {
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);

H
hjxilinx 已提交
2067
    size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
H
Haojun Liao 已提交
2068 2069 2070 2071 2072 2073 2074 2075
    pRes->tsrow  = calloc(numOfExprs, POINTER_BYTES);
    pRes->buffer = calloc(numOfExprs, POINTER_BYTES);
    pRes->length = calloc(numOfExprs, sizeof(int32_t));

    tscRestoreSQLFuncForSTableQuery(pQueryInfo);
  }

  while (1) {
H
Haojun Liao 已提交
2076
    assert (pRes->row >= pRes->numOfRows);
H
Haojun Liao 已提交
2077 2078

    doBuildResFromSubqueries(pSql);
S
TD-1057  
Shengliang Guan 已提交
2079
    tsem_post(&pSql->rspSem);
H
Haojun Liao 已提交
2080
    return;
H
hjxilinx 已提交
2081 2082 2083 2084 2085 2086
  }
}

static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pField) {
  SSqlRes *pRes = &pSql->res;
  
H
Haojun Liao 已提交
2087
  if (pRes->tsrow[columnIndex] != NULL && pField->type == TSDB_DATA_TYPE_NCHAR) {
H
hjxilinx 已提交
2088 2089 2090 2091 2092 2093 2094 2095
    // convert unicode to native code in a temporary buffer extra one byte for terminated symbol
    if (pRes->buffer[columnIndex] == NULL) {
      pRes->buffer[columnIndex] = malloc(pField->bytes + TSDB_NCHAR_SIZE);
    }
    
    /* string terminated char for binary data*/
    memset(pRes->buffer[columnIndex], 0, pField->bytes + TSDB_NCHAR_SIZE);
    
2096 2097
    int32_t length = taosUcs4ToMbs(pRes->tsrow[columnIndex], pRes->length[columnIndex], pRes->buffer[columnIndex]);
    if ( length >= 0 ) {
H
hjxilinx 已提交
2098
      pRes->tsrow[columnIndex] = pRes->buffer[columnIndex];
2099
      pRes->length[columnIndex] = length;
H
hjxilinx 已提交
2100
    } else {
B
Bomin Zhang 已提交
2101
      tscError("%p charset:%s to %s. val:%s convert failed.", pSql, DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)pRes->tsrow[columnIndex]);
H
hjxilinx 已提交
2102
      pRes->tsrow[columnIndex] = NULL;
2103
      pRes->length[columnIndex] = 0;
H
hjxilinx 已提交
2104 2105 2106 2107
    }
  }
}

2108 2109 2110 2111 2112 2113 2114 2115
static char *getArithemicInputSrc(void *param, const char *name, int32_t colId) {
  SArithmeticSupport *pSupport = (SArithmeticSupport *) param;

  int32_t index = -1;
  SSqlExpr* pExpr = NULL;
  
  for (int32_t i = 0; i < pSupport->numOfCols; ++i) {
    pExpr = taosArrayGetP(pSupport->exprList, i);
B
Bomin Zhang 已提交
2116
    if (strncmp(name, pExpr->aliasName, sizeof(pExpr->aliasName) - 1) == 0) {
2117 2118 2119 2120 2121 2122 2123 2124 2125
      index = i;
      break;
    }
  }

  assert(index >= 0 && index < pSupport->numOfCols);
  return pSupport->data[index] + pSupport->offset * pExpr->resBytes;
}

H
hjxilinx 已提交
2126 2127 2128
void **doSetResultRowData(SSqlObj *pSql, bool finalResult) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
H
Haojun Liao 已提交
2129

H
hjxilinx 已提交
2130 2131
  assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows);
  if (pRes->row >= pRes->numOfRows) {  // all the results has returned to invoker
S
Shengliang Guan 已提交
2132
    taosTFree(pRes->tsrow);
H
hjxilinx 已提交
2133 2134
    return pRes->tsrow;
  }
H
Haojun Liao 已提交
2135

H
hjxilinx 已提交
2136
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
Haojun Liao 已提交
2137

H
Haojun Liao 已提交
2138 2139
  size_t size = tscNumOfFields(pQueryInfo);
  for (int i = 0; i < size; ++i) {
H
Haojun Liao 已提交
2140
    SFieldSupInfo* pSup = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.pSupportInfo, i);
H
hjxilinx 已提交
2141
    if (pSup->pSqlExpr != NULL) {
2142
      tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i);
H
hjxilinx 已提交
2143
    }
H
Haojun Liao 已提交
2144

H
hjxilinx 已提交
2145 2146 2147 2148
    // primary key column cannot be null in interval query, no need to check
    if (i == 0 && pQueryInfo->intervalTime > 0) {
      continue;
    }
H
Haojun Liao 已提交
2149 2150 2151 2152 2153 2154

    TAOS_FIELD *pField = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.pFields, i);
    if (pRes->tsrow[i] != NULL && pField->type == TSDB_DATA_TYPE_NCHAR) {
      transferNcharData(pSql, i, pField);
    }

H
hjxilinx 已提交
2155 2156
    // calculate the result from several other columns
    if (pSup->pArithExprInfo != NULL) {
2157 2158 2159 2160
      if (pRes->pArithSup == NULL) {
        SArithmeticSupport *sas = (SArithmeticSupport *) calloc(1, sizeof(SArithmeticSupport));
        sas->offset     = 0;
        sas->pArithExpr = pSup->pArithExprInfo;
S
Shengliang Guan 已提交
2161
        sas->numOfCols  = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
2162 2163
        sas->exprList   = pQueryInfo->exprList;
        sas->data       = calloc(sas->numOfCols, POINTER_BYTES);
H
Haojun Liao 已提交
2164

2165 2166
        pRes->pArithSup = sas;
      }
H
Haojun Liao 已提交
2167

2168 2169 2170 2171 2172 2173 2174 2175 2176
      if (pRes->buffer[i] == NULL) {
        TAOS_FIELD* field = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
        pRes->buffer[i] = malloc(field->bytes);
      }

      for(int32_t k = 0; k < pRes->pArithSup->numOfCols; ++k) {
        SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k);
        pRes->pArithSup->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes;
      }
H
Haojun Liao 已提交
2177

2178 2179 2180
      tExprTreeCalcTraverse(pRes->pArithSup->pArithExpr->pExpr, 1, pRes->buffer[i], pRes->pArithSup,
          TSDB_ORDER_ASC, getArithemicInputSrc);
      pRes->tsrow[i] = pRes->buffer[i];
H
hjxilinx 已提交
2181 2182
    }
  }
H
Haojun Liao 已提交
2183

H
hjxilinx 已提交
2184 2185 2186 2187
  pRes->row++;  // index increase one-step
  return pRes->tsrow;
}

H
Haojun Liao 已提交
2188
static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
H
hjxilinx 已提交
2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230
  bool     hasData = true;
  SSqlCmd *pCmd = &pSql->cmd;
  
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
    bool allSubqueryExhausted = true;
    
    for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
      if (pSql->pSubs[i] == NULL) {
        continue;
      }
      
      SSqlRes *pRes1 = &pSql->pSubs[i]->res;
      SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd;
      
      SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(pCmd1, pCmd1->clauseIndex);
      assert(pQueryInfo1->numOfTables == 1);
      
      STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo1, 0);
      
      /*
       * if the global limitation is not reached, and current result has not exhausted, or next more vnodes are
       * available, goes on
       */
      if (pTableMetaInfo->vgroupIndex < pTableMetaInfo->vgroupList->numOfVgroups && pRes1->row < pRes1->numOfRows &&
          (!tscHasReachLimitation(pQueryInfo1, pRes1))) {
        allSubqueryExhausted = false;
        break;
      }
    }
    
    hasData = !allSubqueryExhausted;
  } else {  // otherwise, in case inner join, if any subquery exhausted, query completed.
    for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
      if (pSql->pSubs[i] == 0) {
        continue;
      }
      
      SSqlRes *   pRes1 = &pSql->pSubs[i]->res;
      SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0);
      
      if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pQueryInfo1, pRes1) &&
H
Haojun Liao 已提交
2231
          tscIsProjectionQuery(pQueryInfo1)) || (pRes1->numOfRows == 0)) {
H
hjxilinx 已提交
2232 2233 2234 2235 2236 2237 2238 2239
        hasData = false;
        break;
      }
    }
  }
  
  return hasData;
}