tscSubquery.c 95.3 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
H
Hui Li 已提交
15 16
#define _GNU_SOURCE
 
H
Haojun Liao 已提交
17
#include "os.h"
H
hjxilinx 已提交
18

H
Haojun Liao 已提交
19 20
#include "qAst.h"
#include "qTsbuf.h"
H
Haojun Liao 已提交
21
#include "tcompare.h"
S
slguan 已提交
22
#include "tscLog.h"
H
Haojun Liao 已提交
23 24
#include "tscSubquery.h"
#include "tschemautil.h"
25
#include "tsclient.h"
H
hjxilinx 已提交
26 27 28

typedef struct SInsertSupporter {
  SSqlObj*  pSql;
29
  int32_t   index;
H
hjxilinx 已提交
30 31
} SInsertSupporter;

H
hjxilinx 已提交
32
static void freeJoinSubqueryObj(SSqlObj* pSql);
H
Haojun Liao 已提交
33
static bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql);
H
hjxilinx 已提交
34

H
Haojun Liao 已提交
35 36 37 38 39
static int32_t tsCompare(int32_t order, int64_t left, int64_t right) {
  if (left == right) {
    return 0;
  }

40
  if (order == TSDB_ORDER_ASC) {
H
Haojun Liao 已提交
41
    return left < right? -1:1;
H
hjxilinx 已提交
42
  } else {
H
Haojun Liao 已提交
43
    return left > right? -1:1;
H
hjxilinx 已提交
44 45 46
  }
}

47 48 49 50 51 52 53 54 55 56 57
static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) {
  while (tsBufNextPos(pTSBuf)) {
    STSElem el1 = tsBufGetElem(pTSBuf);

    int32_t res = tVariantCompare(el1.tag, tag1);
    if (res != 0) { // it is a record with new tag
      return;
    }
  }
}

58 59 60 61
static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, int8_t state) {
  assert(idx < subState->numOfSub);
  assert(subState->states);

D
fix bug  
dapan1121 已提交
62 63
  pthread_mutex_lock(&subState->mutex);
  
64 65 66 67
  tscDebug("subquery:%p,%d state set to %d", pSql, idx, state);
  
  subState->states[idx] = state;

D
fix bug  
dapan1121 已提交
68
  pthread_mutex_unlock(&subState->mutex);
69 70
}

D
fix bug  
dapan1121 已提交
71
static bool allSubqueryDone(SSqlObj *pParentSql) {
72
  bool done = true;
D
fix bug  
dapan1121 已提交
73
  SSubqueryState *subState = &pParentSql->subState;
74 75 76 77 78

  //lock in caller
  
  for (int i = 0; i < subState->numOfSub; i++) {
    if (0 == subState->states[i]) {
D
dapan1121 已提交
79
      tscDebug("%p subquery:%p,%d is NOT finished, total:%d", pParentSql, pParentSql->pSubs[i], i, subState->numOfSub);
80 81 82
      done = false;
      break;
    } else {
D
dapan1121 已提交
83
      tscDebug("%p subquery:%p,%d is finished, total:%d", pParentSql, pParentSql->pSubs[i], i, subState->numOfSub);
84 85 86 87 88 89
    }
  }

  return done;
}

D
fix bug  
dapan1121 已提交
90 91 92
static bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
  SSubqueryState *subState = &pParentSql->subState;

93 94
  assert(idx < subState->numOfSub);

D
fix bug  
dapan1121 已提交
95
  pthread_mutex_lock(&subState->mutex);
96

D
dapan1121 已提交
97
  tscDebug("%p subquery:%p,%d state set to 1", pParentSql, pSql, idx);
98 99 100
  
  subState->states[idx] = 1;

D
fix bug  
dapan1121 已提交
101
  bool done = allSubqueryDone(pParentSql);
102

D
fix bug  
dapan1121 已提交
103
  pthread_mutex_unlock(&subState->mutex);
104 105 106 107 108 109

  return done;
}



H
Haojun Liao 已提交
110 111
static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJoinSupporter* pSupporter2, STimeWindow * win) {
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
H
hjxilinx 已提交
112

H
Haojun Liao 已提交
113 114 115 116 117
  STSBuf* output1 = tsBufCreate(true, pQueryInfo->order.order);
  STSBuf* output2 = tsBufCreate(true, pQueryInfo->order.order);

  win->skey = INT64_MAX;
  win->ekey = INT64_MIN;
H
hjxilinx 已提交
118 119 120

  SLimitVal* pLimit = &pQueryInfo->limit;
  int32_t    order = pQueryInfo->order.order;
H
Haojun Liao 已提交
121

H
hjxilinx 已提交
122 123
  SQueryInfo* pSubQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[0]->cmd, 0);
  SQueryInfo* pSubQueryInfo2 = tscGetQueryInfoDetail(&pSql->pSubs[1]->cmd, 0);
H
Haojun Liao 已提交
124

H
hjxilinx 已提交
125 126 127
  pSubQueryInfo1->tsBuf = output1;
  pSubQueryInfo2->tsBuf = output2;

H
Haojun Liao 已提交
128 129
  TSKEY st = taosGetTimestampUs();

130 131 132 133 134 135
  // no result generated, return directly
  if (pSupporter1->pTSBuf == NULL || pSupporter2->pTSBuf == NULL) {
    tscDebug("%p at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql);
    return 0;
  }

H
hjxilinx 已提交
136 137 138 139 140 141 142
  tsBufResetPos(pSupporter1->pTSBuf);
  tsBufResetPos(pSupporter2->pTSBuf);

  if (!tsBufNextPos(pSupporter1->pTSBuf)) {
    tsBufFlush(output1);
    tsBufFlush(output2);

143
    tscDebug("%p input1 is empty, 0 for secondary query after ts blocks intersecting", pSql);
H
hjxilinx 已提交
144 145 146 147 148 149 150
    return 0;
  }

  if (!tsBufNextPos(pSupporter2->pTSBuf)) {
    tsBufFlush(output1);
    tsBufFlush(output2);

151
    tscDebug("%p input2 is empty, 0 for secondary query after ts blocks intersecting", pSql);
H
hjxilinx 已提交
152 153 154 155 156 157
    return 0;
  }

  int64_t numOfInput1 = 1;
  int64_t numOfInput2 = 1;

H
Haojun Liao 已提交
158 159
  while(1) {
    STSElem elem = tsBufGetElem(pSupporter1->pTSBuf);
H
hjxilinx 已提交
160

H
Haojun Liao 已提交
161
    // no data in pSupporter1 anymore, jump out of loop
162
    if (!tsBufIsValidElem(&elem)) {
H
Haojun Liao 已提交
163 164
      break;
    }
H
hjxilinx 已提交
165

166 167
    // find the data in supporter2 with the same tag value
    STSElem e2 = tsBufFindElemStartPosByTag(pSupporter2->pTSBuf, elem.tag);
H
hjxilinx 已提交
168

H
Haojun Liao 已提交
169 170 171
    /**
     * there are elements in pSupporter2 with the same tag, continue
     */
172 173 174 175
    tVariant tag1 = {0};
    tVariantAssign(&tag1, elem.tag);

    if (tsBufIsValidElem(&e2)) {
H
Haojun Liao 已提交
176 177 178 179
      while (1) {
        STSElem elem1 = tsBufGetElem(pSupporter1->pTSBuf);
        STSElem elem2 = tsBufGetElem(pSupporter2->pTSBuf);

180 181 182 183 184 185 186 187 188 189
        // data with current are exhausted
        if (!tsBufIsValidElem(&elem1) || tVariantCompare(elem1.tag, &tag1) != 0) {
          break;
        }

        if (!tsBufIsValidElem(&elem2) || tVariantCompare(elem2.tag, &tag1) != 0) { // ignore all records with the same tag
          skipRemainValue(pSupporter1->pTSBuf, &tag1);
          break;
        }

H
Haojun Liao 已提交
190 191
        /*
         * in case of stable query, limit/offset is not applied here. the limit/offset is applied to the
192
         * final results which is acquired after the secondary merge of in the client.
H
Haojun Liao 已提交
193 194 195
         */
        int32_t re = tsCompare(order, elem1.ts, elem2.ts);
        if (re < 0) {
196
          tsBufNextPos(pSupporter1->pTSBuf);
H
Haojun Liao 已提交
197 198
          numOfInput1++;
        } else if (re > 0) {
199
          tsBufNextPos(pSupporter2->pTSBuf);
H
Haojun Liao 已提交
200 201 202 203 204 205 206 207 208 209 210
          numOfInput2++;
        } else {
          if (pLimit->offset == 0 || pQueryInfo->interval.interval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
            if (win->skey > elem1.ts) {
              win->skey = elem1.ts;
            }

            if (win->ekey < elem1.ts) {
              win->ekey = elem1.ts;
            }

H
Haojun Liao 已提交
211 212
            tsBufAppend(output1, elem1.id, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
            tsBufAppend(output2, elem2.id, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
H
Haojun Liao 已提交
213
          } else {
214
            pLimit->offset -= 1;//offset apply to projection?
H
Haojun Liao 已提交
215 216
          }

217
          tsBufNextPos(pSupporter1->pTSBuf);
H
Haojun Liao 已提交
218 219
          numOfInput1++;

220
          tsBufNextPos(pSupporter2->pTSBuf);
H
Haojun Liao 已提交
221 222
          numOfInput2++;
        }
H
hjxilinx 已提交
223
      }
H
Haojun Liao 已提交
224
    } else {  // no data in pSupporter2, ignore current data in pSupporter2
225
      skipRemainValue(pSupporter1->pTSBuf, &tag1);
H
hjxilinx 已提交
226 227 228 229 230 231 232 233 234
    }
  }

  /*
   * failed to set the correct ts order yet in two cases:
   * 1. only one element
   * 2. only one element for each tag.
   */
  if (output1->tsOrder == -1) {
235 236
    output1->tsOrder = TSDB_ORDER_ASC;
    output2->tsOrder = TSDB_ORDER_ASC;
H
hjxilinx 已提交
237 238 239 240 241
  }

  tsBufFlush(output1);
  tsBufFlush(output2);

H
Haojun Liao 已提交
242
  tsBufDestroy(pSupporter1->pTSBuf);
D
fix bug  
dapan1121 已提交
243
  pSupporter1->pTSBuf = NULL;
H
Haojun Liao 已提交
244
  tsBufDestroy(pSupporter2->pTSBuf);
D
fix bug  
dapan1121 已提交
245 246
  pSupporter2->pTSBuf = NULL;
    
H
Haojun Liao 已提交
247
  TSKEY et = taosGetTimestampUs();
H
Haojun Liao 已提交
248
  tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks "
H
Haojun Liao 已提交
249
           "intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us",
H
Haojun Liao 已提交
250 251
           pSql, numOfInput1, numOfInput2, output1->numOfTotal, output1->numOfGroups, win->skey, win->ekey,
           tsBufGetNumOfGroup(output1), et - st);
H
hjxilinx 已提交
252 253 254 255 256

  return output1->numOfTotal;
}

// todo handle failed to create sub query
H
Haojun Liao 已提交
257
SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, int32_t index) {
258
  SJoinSupporter* pSupporter = calloc(1, sizeof(SJoinSupporter));
H
hjxilinx 已提交
259 260 261 262 263 264 265 266 267
  if (pSupporter == NULL) {
    return NULL;
  }

  pSupporter->pObj = pSql;

  pSupporter->subqueryIndex = index;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
  
268
  memcpy(&pSupporter->interval, &pQueryInfo->interval, sizeof(pSupporter->interval));
H
hjxilinx 已提交
269 270 271
  pSupporter->limit = pQueryInfo->limit;

  STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, index);
272
  pSupporter->uid = pTableMetaInfo->pTableMeta->id.uid;
H
hjxilinx 已提交
273 274
  assert (pSupporter->uid != 0);

S
slguan 已提交
275
  taosGetTmpfilePath("join-", pSupporter->path);
H
hjxilinx 已提交
276

D
fix bug  
dapan1121 已提交
277 278
  // do NOT create file here to reduce crash generated file left issue
  pSupporter->f = NULL;
H
hjxilinx 已提交
279 280 281 282

  return pSupporter;
}

H
Haojun Liao 已提交
283
static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
H
hjxilinx 已提交
284 285 286 287
  if (pSupporter == NULL) {
    return;
  }

H
hjxilinx 已提交
288 289 290 291 292 293 294
  if (pSupporter->exprList != NULL) {
    tscSqlExprInfoDestroy(pSupporter->exprList);
  }
  
  if (pSupporter->colList != NULL) {
    tscColumnListDestroy(pSupporter->colList);
  }
H
hjxilinx 已提交
295

H
hjxilinx 已提交
296
  tscFieldInfoClear(&pSupporter->fieldsInfo);
H
hjxilinx 已提交
297

D
fix bug  
dapan1121 已提交
298 299 300 301 302 303 304
  if (pSupporter->pTSBuf != NULL) {
    tsBufDestroy(pSupporter->pTSBuf);
    pSupporter->pTSBuf = NULL;
  }

  unlink(pSupporter->path);
  
H
hjxilinx 已提交
305 306
  if (pSupporter->f != NULL) {
    fclose(pSupporter->f);
H
hjxilinx 已提交
307
    pSupporter->f = NULL;
H
hjxilinx 已提交
308 309
  }

D
fix bug  
dapan1121 已提交
310

311 312 313 314 315
  if (pSupporter->pVgroupTables != NULL) {
    taosArrayDestroy(pSupporter->pVgroupTables);
    pSupporter->pVgroupTables = NULL;
  }

S
TD-1848  
Shengliang Guan 已提交
316
  tfree(pSupporter->pIdTagList);
H
hjxilinx 已提交
317 318 319 320 321 322 323 324 325 326
  tscTagCondRelease(&pSupporter->tagCond);
  free(pSupporter);
}

/*
 * need the secondary query process
 * In case of count(ts)/count(*)/spread(ts) query, that are only applied to
 * primary timestamp column , the secondary query is not necessary
 *
 */
H
Haojun Liao 已提交
327
static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
328 329 330
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  
  for (int32_t i = 0; i < numOfCols; ++i) {
331 332
    SColumn* base = taosArrayGet(pQueryInfo->colList, i);
    if (base->colIndex.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
H
hjxilinx 已提交
333 334 335 336 337 338 339
      return true;
    }
  }

  return false;
}

340 341 342
static void filterVgroupTables(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
  int32_t  num = 0;
  int32_t* list = NULL;
H
Haojun Liao 已提交
343
  tsBufGetGroupIdList(pQueryInfo->tsBuf, &num, &list);
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368

  // The virtual node, of which all tables are disqualified after the timestamp intersection,
  // is removed to avoid next stage query.
  // TODO: If tables from some vnodes are not qualified for next stage query, discard them.
  for (int32_t k = 0; k < taosArrayGetSize(pVgroupTables);) {
    SVgroupTableInfo* p = taosArrayGet(pVgroupTables, k);

    bool found = false;
    for (int32_t f = 0; f < num; ++f) {
      if (p->vgInfo.vgId == list[f]) {
        found = true;
        break;
      }
    }

    if (!found) {
      tscRemoveVgroupTableGroup(pVgroupTables, k);
    } else {
      k++;
    }
  }

  assert(taosArrayGetSize(pVgroupTables) > 0);
  TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);

S
TD-1848  
Shengliang Guan 已提交
369
  tfree(list);
370 371 372 373 374
}

static SArray* buildVgroupTableByResult(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
  int32_t  num = 0;
  int32_t* list = NULL;
H
Haojun Liao 已提交
375
  tsBufGetGroupIdList(pQueryInfo->tsBuf, &num, &list);
376

S
Shengliang Guan 已提交
377
  size_t numOfGroups = taosArrayGetSize(pVgroupTables);
378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395

  SArray* pNew = taosArrayInit(num, sizeof(SVgroupTableInfo));

  SVgroupTableInfo info;
  for (int32_t i = 0; i < num; ++i) {
    int32_t vnodeId = list[i];

    for (int32_t j = 0; j < numOfGroups; ++j) {
      SVgroupTableInfo* p1 = taosArrayGet(pVgroupTables, j);
      if (p1->vgInfo.vgId == vnodeId) {
        tscVgroupTableCopy(&info, p1);
        break;
      }
    }

    taosArrayPush(pNew, &info);
  }

S
TD-1848  
Shengliang Guan 已提交
396
  tfree(list);
397 398 399 400 401
  TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);

  return pNew;
}

H
hjxilinx 已提交
402 403 404
/*
 * launch secondary stage query to fetch the result that contains timestamp in set
 */
H
Haojun Liao 已提交
405
static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
H
hjxilinx 已提交
406
  int32_t         numOfSub = 0;
407
  SJoinSupporter* pSupporter = NULL;
H
hjxilinx 已提交
408
  
H
Haojun Liao 已提交
409
  //If the columns are not involved in the final select clause, the corresponding query will not be issued.
H
Haojun Liao 已提交
410
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
411
    pSupporter = pSql->pSubs[i]->param;
H
hjxilinx 已提交
412
    if (taosArrayGetSize(pSupporter->exprList) > 0) {
H
hjxilinx 已提交
413 414 415 416 417 418 419
      ++numOfSub;
    }
  }
  
  assert(numOfSub > 0);
  
  // scan all subquery, if one sub query has only ts, ignore it
H
Haojun Liao 已提交
420
  tscDebug("%p start to launch secondary subqueries, %d out of %d needs to query", pSql, numOfSub, pSql->subState.numOfSub);
H
hjxilinx 已提交
421 422 423

  bool success = true;
  
H
Haojun Liao 已提交
424
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
425 426 427 428 429
    SSqlObj *pPrevSub = pSql->pSubs[i];
    pSql->pSubs[i] = NULL;
    
    pSupporter = pPrevSub->param;
  
H
hjxilinx 已提交
430
    if (taosArrayGetSize(pSupporter->exprList) == 0) {
431
      tscDebug("%p subIndex: %d, no need to launch query, ignore it", pSql, i);
H
hjxilinx 已提交
432 433
    
      tscDestroyJoinSupporter(pSupporter);
434
      taos_free_result(pPrevSub);
H
hjxilinx 已提交
435 436 437 438 439 440
    
      pSql->pSubs[i] = NULL;
      continue;
    }
  
    SQueryInfo *pSubQueryInfo = tscGetQueryInfoDetail(&pPrevSub->cmd, 0);
H
Haojun Liao 已提交
441
    STSBuf     *pTsBuf = pSubQueryInfo->tsBuf;
H
hjxilinx 已提交
442 443 444
    pSubQueryInfo->tsBuf = NULL;
  
    // free result for async object will also free sqlObj
H
hjxilinx 已提交
445
    assert(tscSqlExprNumOfExprs(pSubQueryInfo) == 1); // ts_comp query only requires one resutl columns
H
hjxilinx 已提交
446 447
    taos_free_result(pPrevSub);
  
448
    SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL);
H
hjxilinx 已提交
449 450 451 452 453
    if (pNew == NULL) {
      tscDestroyJoinSupporter(pSupporter);
      success = false;
      break;
    }
454
    
B
Bomin Zhang 已提交
455

H
hjxilinx 已提交
456 457 458 459
    tscClearSubqueryInfo(&pNew->cmd);
    pSql->pSubs[i] = pNew;
  
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
H
Haojun Liao 已提交
460
    pQueryInfo->tsBuf = pTsBuf;  // transfer the ownership of timestamp comp-z data to the new created object
B
Bomin Zhang 已提交
461

H
hjxilinx 已提交
462
    // set the second stage sub query for join process
H
hjxilinx 已提交
463
    TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE);
464
    memcpy(&pQueryInfo->interval, &pSupporter->interval, sizeof(pQueryInfo->interval));
B
Bomin Zhang 已提交
465

H
hjxilinx 已提交
466
    tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond);
B
Bomin Zhang 已提交
467

H
hjxilinx 已提交
468 469 470
    pQueryInfo->colList = pSupporter->colList;
    pQueryInfo->exprList = pSupporter->exprList;
    pQueryInfo->fieldsInfo = pSupporter->fieldsInfo;
H
Haojun Liao 已提交
471
    pQueryInfo->groupbyExpr = pSupporter->groupInfo;
H
Haojun Liao 已提交
472

473
    assert(pNew->subState.numOfSub == 0 && pNew->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1);
H
hjxilinx 已提交
474
  
475
    tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
476
  
477
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
478
    pTableMetaInfo->pVgroupTables = pSupporter->pVgroupTables;
H
Haojun Liao 已提交
479 480 481

    pSupporter->exprList = NULL;
    pSupporter->colList = NULL;
482
    pSupporter->pVgroupTables = NULL;
H
Haojun Liao 已提交
483 484
    memset(&pSupporter->fieldsInfo, 0, sizeof(SFieldInfo));
    memset(&pSupporter->groupInfo, 0, sizeof(SSqlGroupbyExpr));
H
Haojun Liao 已提交
485

H
hjxilinx 已提交
486 487 488 489 490
    /*
     * When handling the projection query, the offset value will be modified for table-table join, which is changed
     * during the timestamp intersection.
     */
    pSupporter->limit = pQueryInfo->limit;
491
    pQueryInfo->limit = pSupporter->limit;
H
Haojun Liao 已提交
492 493 494 495 496

    SColumnIndex index = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
    SSchema* s = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, 0);

    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, 0);
H
Haojun Liao 已提交
497 498
    int16_t funcId = pExpr->functionId;

H
Haojun Liao 已提交
499
    if ((pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) ||
H
Haojun Liao 已提交
500 501 502 503 504
        (funcId != TSDB_FUNC_TS && funcId != TSDB_FUNC_TS_DUMMY && funcId != TSDB_FUNC_PRJ)) {

      int16_t functionId = tscIsProjectionQuery(pQueryInfo)? TSDB_FUNC_PRJ : TSDB_FUNC_TS;

      tscAddSpecialColumnForSelect(pQueryInfo, 0, functionId, &index, s, TSDB_COL_NORMAL);
H
Haojun Liao 已提交
505
      tscPrintSelectClause(pNew, 0);
506
      tscFieldInfoUpdateOffset(pQueryInfo);
H
Haojun Liao 已提交
507 508 509 510

      pExpr = tscSqlExprGet(pQueryInfo, 0);
    }

511
    // set the join condition tag column info, todo extract method
H
Haojun Liao 已提交
512 513
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
      assert(pQueryInfo->tagCond.joinInfo.hasJoin);
514
      int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
H
Haojun Liao 已提交
515

516
      // set the tag column id for executor to extract correct tag value
517
      pExpr->param[0] = (tVariant) {.i64 = colId, .nType = TSDB_DATA_TYPE_BIGINT, .nLen = sizeof(int64_t)};
H
Haojun Liao 已提交
518
      pExpr->numOfParams = 1;
H
Haojun Liao 已提交
519 520
    }

521 522 523 524 525 526 527 528
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
      assert(pTableMetaInfo->pVgroupTables != NULL);
      if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
        SArray* p = buildVgroupTableByResult(pQueryInfo, pTableMetaInfo->pVgroupTables);
        tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables);
        pTableMetaInfo->pVgroupTables = p;
      } else {
        filterVgroupTables(pQueryInfo, pTableMetaInfo->pVgroupTables);
H
Haojun Liao 已提交
529 530 531
      }
    }

D
fix bug  
dapan1121 已提交
532 533
    subquerySetState(pPrevSub, &pSql->subState, i, 0);
    
534
    size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
S
TD-1057  
Shengliang Guan 已提交
535
    tscDebug("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s",
536
             pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pQueryInfo->type, taosArrayGetSize(pQueryInfo->exprList),
537
             numOfCols, pQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name));
H
hjxilinx 已提交
538 539 540 541
  }
  
  //prepare the subqueries object failed, abort
  if (!success) {
542
    pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
543
    tscError("%p failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d", pSql,
H
Haojun Liao 已提交
544
        pSql->subState.numOfSub, pSql->res.code);
H
hjxilinx 已提交
545
    freeJoinSubqueryObj(pSql);
H
hjxilinx 已提交
546 547 548 549
    
    return pSql->res.code;
  }
  
H
Haojun Liao 已提交
550
  for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
Haojun Liao 已提交
551
    if (pSql->pSubs[i] == NULL) {
H
hjxilinx 已提交
552 553
      continue;
    }
H
Haojun Liao 已提交
554

H
Haojun Liao 已提交
555
    tscDoQuery(pSql->pSubs[i]);
H
hjxilinx 已提交
556 557 558 559 560
  }

  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
561
void freeJoinSubqueryObj(SSqlObj* pSql) {
H
Haojun Liao 已提交
562
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
563 564 565 566 567
    SSqlObj* pSub = pSql->pSubs[i];
    if (pSub == NULL) {
      continue;
    }
    
568
    SJoinSupporter* p = pSub->param;
H
hjxilinx 已提交
569
    tscDestroyJoinSupporter(p);
H
hjxilinx 已提交
570

571 572
    taos_free_result(pSub);
    pSql->pSubs[i] = NULL;
H
hjxilinx 已提交
573 574
  }

D
fix bug  
dapan1121 已提交
575 576 577 578
  if (pSql->subState.states) {
    pthread_mutex_destroy(&pSql->subState.mutex);
  }
  
579 580
  tfree(pSql->subState.states);
  
D
fix bug  
dapan1121 已提交
581
  
H
Haojun Liao 已提交
582
  pSql->subState.numOfSub = 0;
H
hjxilinx 已提交
583 584
}

585
static void quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
D
fix bug  
dapan1121 已提交
586
  if (subAndCheckDone(pSqlSub, pSqlObj, pSupporter->subqueryIndex)) {
587
    tscError("%p all subquery return and query failed, global code:%s", pSqlObj, tstrerror(pSqlObj->res.code));  
H
hjxilinx 已提交
588
    freeJoinSubqueryObj(pSqlObj);
589
    return;
H
hjxilinx 已提交
590
  }
D
fix bug  
dapan1121 已提交
591

D
TD-2516  
dapan1121 已提交
592
  //tscDestroyJoinSupporter(pSupporter);
H
hjxilinx 已提交
593 594 595
}

// update the query time range according to the join results on timestamp
H
Haojun Liao 已提交
596 597 598
static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) {
  assert(pQueryInfo->window.skey <= win->skey && pQueryInfo->window.ekey >= win->ekey);
  pQueryInfo->window = *win;
H
Haojun Liao 已提交
599 600


H
hjxilinx 已提交
601 602
}

H
Haojun Liao 已提交
603 604 605
int32_t tidTagsCompar(const void* p1, const void* p2) {
  const STidTags* t1 = (const STidTags*) (p1);
  const STidTags* t2 = (const STidTags*) (p2);
606 607
  
  if (t1->vgId != t2->vgId) {
weixin_48148422's avatar
weixin_48148422 已提交
608 609
    return (t1->vgId > t2->vgId) ? 1 : -1;
  }
610

H
Haojun Liao 已提交
611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629
  tstr* tag1 = (tstr*) t1->tag;
  tstr* tag2 = (tstr*) t2->tag;

  if (tag1->len != tag2->len) {
    return (tag1->len > tag2->len)? 1: -1;
  }

  return strncmp(tag1->data, tag2->data, tag1->len);
}

int32_t tagValCompar(const void* p1, const void* p2) {
  const STidTags* t1 = (const STidTags*) varDataVal(p1);
  const STidTags* t2 = (const STidTags*) varDataVal(p2);

  tstr* tag1 = (tstr*) t1->tag;
  tstr* tag2 = (tstr*) t2->tag;

  if (tag1->len != tag2->len) {
    return (tag1->len > tag2->len)? 1: -1;
630
  }
H
Haojun Liao 已提交
631

H
Haojun Liao 已提交
632
  return memcmp(tag1->data, tag2->data, tag1->len);
633 634
}

H
Haojun Liao 已提交
635
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables) {
H
Haojun Liao 已提交
636 637
  SArray*   result = taosArrayInit(4, sizeof(SVgroupTableInfo));
  SArray*   vgTables = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
638 639
  STidTags* prev = NULL;

H
Haojun Liao 已提交
640 641 642
  size_t numOfTables = taosArrayGetSize(tables);
  for (size_t i = 0; i < numOfTables; i++) {
    STidTags* tt = taosArrayGet(tables, i);
weixin_48148422's avatar
weixin_48148422 已提交
643

H
Haojun Liao 已提交
644
    if (prev == NULL || tt->vgId != prev->vgId) {
645
      SVgroupsInfo* pvg = pTableMetaInfo->vgroupList;
weixin_48148422's avatar
weixin_48148422 已提交
646

H
Haojun Liao 已提交
647 648 649
      SVgroupTableInfo info = {{0}};
      for (int32_t m = 0; m < pvg->numOfVgroups; ++m) {
        if (tt->vgId == pvg->vgroups[m].vgId) {
S
TD-1732  
Shengliang Guan 已提交
650
          tscSVgroupInfoCopy(&info.vgInfo, &pvg->vgroups[m]);
651 652 653
          break;
        }
      }
654
      assert(info.vgInfo.numOfEps != 0);
weixin_48148422's avatar
weixin_48148422 已提交
655

H
Haojun Liao 已提交
656
      vgTables = taosArrayInit(4, sizeof(STableIdInfo));
weixin_48148422's avatar
weixin_48148422 已提交
657
      info.itemList = vgTables;
H
Haojun Liao 已提交
658 659 660

      if (taosArrayGetSize(result) > 0) {
        SVgroupTableInfo* prevGroup = taosArrayGet(result, taosArrayGetSize(result) - 1);
P
plum-lihui 已提交
661
        tscDebug("%p vgId:%d, tables:%"PRIzu, pSql, prevGroup->vgInfo.vgId, taosArrayGetSize(prevGroup->itemList));
H
Haojun Liao 已提交
662 663
      }

H
Haojun Liao 已提交
664
      taosArrayPush(result, &info);
665
    }
weixin_48148422's avatar
weixin_48148422 已提交
666

H
Haojun Liao 已提交
667 668
    STableIdInfo item = {.uid = tt->uid, .tid = tt->tid, .key = INT64_MIN};
    taosArrayPush(vgTables, &item);
H
Haojun Liao 已提交
669

H
Haojun Liao 已提交
670
    tscTrace("%p tid:%d, uid:%"PRIu64",vgId:%d added", pSql, tt->tid, tt->uid, tt->vgId);
weixin_48148422's avatar
weixin_48148422 已提交
671
    prev = tt;
672
  }
weixin_48148422's avatar
weixin_48148422 已提交
673 674

  pTableMetaInfo->pVgroupTables = result;
H
Haojun Liao 已提交
675
  pTableMetaInfo->vgroupIndex = 0;
H
Haojun Liao 已提交
676 677 678

  if (taosArrayGetSize(result) > 0) {
    SVgroupTableInfo* g = taosArrayGet(result, taosArrayGetSize(result) - 1);
P
plum-lihui 已提交
679
    tscDebug("%p vgId:%d, tables:%"PRIzu, pSql, g->vgInfo.vgId, taosArrayGetSize(g->itemList));
H
Haojun Liao 已提交
680
  }
681 682 683 684 685 686
}

static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* pParent) {
  SSqlCmd* pCmd = &pSql->cmd;
  tscClearSubqueryInfo(pCmd);
  tscFreeSqlResult(pSql);
H
Haojun Liao 已提交
687

688
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
Haojun Liao 已提交
689 690
  assert(pQueryInfo->numOfTables == 1);

691 692 693 694 695 696 697 698 699 700 701 702 703 704 705
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  tscInitQueryInfo(pQueryInfo);

  TSDB_QUERY_CLEAR_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
  TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
  
  pCmd->command = TSDB_SQL_SELECT;
  pSql->fp = tscJoinQueryCallback;
  
  SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
  
  SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
  tscAddSpecialColumnForSelect(pQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL);
  
  // set the tags value for ts_comp function
H
Haojun Liao 已提交
706 707
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0);
708
    int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
709
    pExpr->param->i64 = tagColId;
H
Haojun Liao 已提交
710 711 712
    pExpr->numOfParams = 1;
  }

713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728
  // add the filter tag column
  if (pSupporter->colList != NULL) {
    size_t s = taosArrayGetSize(pSupporter->colList);
    
    for (int32_t i = 0; i < s; ++i) {
      SColumn *pCol = taosArrayGetP(pSupporter->colList, i);
      
      if (pCol->numOfFilters > 0) {  // copy to the pNew->cmd.colList if it is filtered.
        SColumn *p = tscColumnClone(pCol);
        taosArrayPush(pQueryInfo->colList, &p);
      }
    }
  }
  
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  
729
  tscDebug(
H
Haojun Liao 已提交
730
      "%p subquery:%p tableIndex:%d, vgroupIndex:%d, numOfVgroups:%d, type:%d, ts_comp query to retrieve timestamps, "
S
TD-1057  
Shengliang Guan 已提交
731
      "numOfExpr:%" PRIzu ", colList:%" PRIzu ", numOfOutputFields:%d, name:%s",
H
Haojun Liao 已提交
732
      pParent, pSql, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pQueryInfo->type,
733
      tscSqlExprNumOfExprs(pQueryInfo), numOfCols, pQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name));
734 735 736 737
  
  tscProcessSql(pSql);
}

H
Haojun Liao 已提交
738
static bool checkForDuplicateTagVal(SSchema* pColSchema, SJoinSupporter* p1, SSqlObj* pPSqlObj) {
H
Haojun Liao 已提交
739 740 741
  for(int32_t i = 1; i < p1->num; ++i) {
    STidTags* prev = (STidTags*) varDataVal(p1->pIdTagList + (i - 1) * p1->tagSize);
    STidTags* p = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize);
742
    assert(prev->vgId >= 1 && p->vgId >= 1);
H
Haojun Liao 已提交
743 744

    if (doCompare(prev->tag, p->tag, pColSchema->type, pColSchema->bytes) == 0) {
H
Haojun Liao 已提交
745 746
      tscError("%p join tags have same value for different table, free all sub SqlObj and quit", pPSqlObj);
      pPSqlObj->res.code = TSDB_CODE_QRY_DUP_JOIN_KEY;
H
Haojun Liao 已提交
747 748 749 750 751 752 753
      return false;
    }
  }

  return true;
}

754
static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray** s1, SArray** s2) {
H
Haojun Liao 已提交
755 756 757
  SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
  SJoinSupporter* p2 = pParentSql->pSubs[1]->param;

H
Haojun Liao 已提交
758 759
  tscDebug("%p all subquery retrieve <tid, tags> complete, do tags match, %d, %d", pParentSql, p1->num, p2->num);

H
Haojun Liao 已提交
760 761 762
  // sort according to the tag value
  qsort(p1->pIdTagList, p1->num, p1->tagSize, tagValCompar);
  qsort(p2->pIdTagList, p2->num, p2->tagSize, tagValCompar);
H
Haojun Liao 已提交
763 764

  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
765
  int16_t tagColId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
H
Haojun Liao 已提交
766

H
Haojun Liao 已提交
767
  SSchema* pColSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
H
Haojun Liao 已提交
768

H
Haojun Liao 已提交
769
  // int16_t for padding
H
Haojun Liao 已提交
770 771 772
  int32_t size = p1->tagSize - sizeof(int16_t);
  *s1 = taosArrayInit(p1->num, size);
  *s2 = taosArrayInit(p2->num, size);
H
Haojun Liao 已提交
773

H
Haojun Liao 已提交
774
  if (!(checkForDuplicateTagVal(pColSchema, p1, pParentSql) && checkForDuplicateTagVal(pColSchema, p2, pParentSql))) {
775
    return TSDB_CODE_QRY_DUP_JOIN_KEY;
H
Haojun Liao 已提交
776 777 778 779 780 781
  }

  int32_t i = 0, j = 0;
  while(i < p1->num && j < p2->num) {
    STidTags* pp1 = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize);
    STidTags* pp2 = (STidTags*) varDataVal(p2->pIdTagList + j * p2->tagSize);
782
    assert(pp1->tid != 0 && pp2->tid != 0);
H
Haojun Liao 已提交
783 784 785

    int32_t ret = doCompare(pp1->tag, pp2->tag, pColSchema->type, pColSchema->bytes);
    if (ret == 0) {
786
      tscDebug("%p tag matched, vgId:%d, val:%d, tid:%d, uid:%"PRIu64", tid:%d, uid:%"PRIu64, pParentSql, pp1->vgId,
H
Haojun Liao 已提交
787 788 789 790 791 792 793 794 795 796 797 798
               *(int*) pp1->tag, pp1->tid, pp1->uid, pp2->tid, pp2->uid);

      taosArrayPush(*s1, pp1);
      taosArrayPush(*s2, pp2);
      j++;
      i++;
    } else if (ret > 0) {
      j++;
    } else {
      i++;
    }
  }
799

H
Haojun Liao 已提交
800 801 802 803 804 805 806 807
  // reorganize the tid-tag value according to both the vgroup id and tag values
  // sort according to the tag value
  size_t t1 = taosArrayGetSize(*s1);
  size_t t2 = taosArrayGetSize(*s2);

  qsort((*s1)->pData, t1, size, tidTagsCompar);
  qsort((*s2)->pData, t2, size, tidTagsCompar);

H
Haojun Liao 已提交
808 809 810 811 812 813 814 815 816 817 818 819
#if 0
  for(int32_t k = 0; k < t1; ++k) {
    STidTags* p =  (*s1)->pData + size * k;
    printf("%d, tag:%s\n", p->vgId, ((tstr*)(p->tag))->data);
  }

  for(int32_t k = 0; k < t1; ++k) {
    STidTags* p =  (*s2)->pData + size * k;
    printf("%d, tag:%s\n", p->vgId, ((tstr*)(p->tag))->data);
  }
#endif

P
plum-lihui 已提交
820
  tscDebug("%p tags match complete, result: %"PRIzu", %"PRIzu, pParentSql, t1, t2);
821
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
822 823
}

H
Haojun Liao 已提交
824
static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
825
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
826

H
hjxilinx 已提交
827
  SSqlObj* pParentSql = pSupporter->pObj;
H
Haojun Liao 已提交
828

H
hjxilinx 已提交
829 830
  SSqlObj* pSql = (SSqlObj*)tres;
  SSqlCmd* pCmd = &pSql->cmd;
H
Haojun Liao 已提交
831 832 833
  SSqlRes* pRes = &pSql->res;

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
Haojun Liao 已提交
834
  assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY));
H
Haojun Liao 已提交
835

D
dapan1121 已提交
836 837 838 839 840 841 842 843 844
  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, pParentSql->res.code);
    quitAllSubquery(pSql, pParentSql, pSupporter);

    tscAsyncResultOnError(pParentSql);

    return;
  }

H
Haojun Liao 已提交
845 846 847
  // check for the error code firstly
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    // todo retry if other subqueries are not failed
H
Haojun Liao 已提交
848

H
Haojun Liao 已提交
849 850
    assert(numOfRows < 0 && numOfRows == taos_errno(pSql));
    tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
H
Haojun Liao 已提交
851

H
Haojun Liao 已提交
852
    pParentSql->res.code = numOfRows;
853
    quitAllSubquery(pSql, pParentSql, pSupporter);
H
Haojun Liao 已提交
854

H
Haojun Liao 已提交
855
    tscAsyncResultOnError(pParentSql);
H
Haojun Liao 已提交
856 857
    return;
  }
H
Haojun Liao 已提交
858

H
Haojun Liao 已提交
859 860
  // keep the results in memory
  if (numOfRows > 0) {
861
    size_t validLen = (size_t)(pSupporter->tagSize * pRes->numOfRows);
H
Haojun Liao 已提交
862
    size_t length = pSupporter->totalLen + validLen;
H
Haojun Liao 已提交
863

H
Haojun Liao 已提交
864 865 866 867
    // todo handle memory error
    char* tmp = realloc(pSupporter->pIdTagList, length);
    if (tmp == NULL) {
      tscError("%p failed to malloc memory", pSql);
H
Haojun Liao 已提交
868

H
Haojun Liao 已提交
869
      pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
870
      quitAllSubquery(pSql, pParentSql, pSupporter);
H
Haojun Liao 已提交
871

H
Haojun Liao 已提交
872
      tscAsyncResultOnError(pParentSql);
H
Haojun Liao 已提交
873 874
      return;
    }
H
Haojun Liao 已提交
875

H
Haojun Liao 已提交
876
    pSupporter->pIdTagList = tmp;
H
Haojun Liao 已提交
877

H
Haojun Liao 已提交
878
    memcpy(pSupporter->pIdTagList + pSupporter->totalLen, pRes->data, validLen);
S
Shengliang Guan 已提交
879 880
    pSupporter->totalLen += (int32_t)validLen;
    pSupporter->num += (int32_t)pRes->numOfRows;
H
Haojun Liao 已提交
881

H
Haojun Liao 已提交
882 883 884 885 886 887
    // query not completed, continue to retrieve tid + tag tuples
    if (!pRes->completed) {
      taos_fetch_rows_a(tres, tidTagRetrieveCallback, param);
      return;
    }
  }
H
Haojun Liao 已提交
888

H
Haojun Liao 已提交
889 890 891 892
  // data in current vnode has all returned to client, try next vnode if exits
  // <tid + tag> tuples have been retrieved to client, try <tid + tag> tuples from the next vnode
  if (hasMoreVnodesToTry(pSql)) {
    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
893

H
Haojun Liao 已提交
894 895 896
    int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
    pTableMetaInfo->vgroupIndex += 1;
    assert(pTableMetaInfo->vgroupIndex < totalVgroups);
H
Haojun Liao 已提交
897

898
    tscDebug("%p tid_tag from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%d",
H
Haojun Liao 已提交
899
             pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, pSupporter->num);
H
Haojun Liao 已提交
900

H
Haojun Liao 已提交
901 902
    pCmd->command = TSDB_SQL_SELECT;
    tscResetForNextRetrieve(&pSql->res);
H
Haojun Liao 已提交
903

H
Haojun Liao 已提交
904 905 906 907 908
    // set the callback function
    pSql->fp = tscJoinQueryCallback;
    tscProcessSql(pSql);
    return;
  }
H
Haojun Liao 已提交
909

H
Haojun Liao 已提交
910 911
  // no data exists in next vnode, mark the <tid, tags> query completed
  // only when there is no subquery exits any more, proceeds to get the intersect of the <tid, tags> tuple sets.
D
fix bug  
dapan1121 已提交
912
  if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
913
    tscDebug("%p tagRetrieve:%p,%d completed, total:%d", pParentSql, tres, pSupporter->subqueryIndex, pParentSql->subState.numOfSub);
H
Haojun Liao 已提交
914
    return;
915
  }  
H
Haojun Liao 已提交
916

H
Haojun Liao 已提交
917
  SArray *s1 = NULL, *s2 = NULL;
918 919 920 921
  int32_t code = getIntersectionOfTableTuple(pQueryInfo, pParentSql, &s1, &s2);
  if (code != TSDB_CODE_SUCCESS) {
    freeJoinSubqueryObj(pParentSql);
    pParentSql->res.code = code;
H
Haojun Liao 已提交
922
    tscAsyncResultOnError(pParentSql);
923 924 925

    taosArrayDestroy(s1);
    taosArrayDestroy(s2);
926 927 928
    return;
  }

H
Haojun Liao 已提交
929
  if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) {  // no results,return.
H
Haojun Liao 已提交
930 931
    assert(pParentSql->fp != tscJoinQueryCallback);

932
    tscDebug("%p tag intersect does not generated qualified tables for join, free all sub SqlObj and quit", pParentSql);
H
Haojun Liao 已提交
933
    freeJoinSubqueryObj(pParentSql);
H
Haojun Liao 已提交
934

935 936
    // set no result command
    pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
H
Haojun Liao 已提交
937 938
    assert(pParentSql->fp != tscJoinQueryCallback);

H
Haojun Liao 已提交
939
    (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
B
Bomin Zhang 已提交
940 941 942 943
  } else {
    // proceed to for ts_comp query
    SSqlCmd* pSubCmd1 = &pParentSql->pSubs[0]->cmd;
    SSqlCmd* pSubCmd2 = &pParentSql->pSubs[1]->cmd;
H
Haojun Liao 已提交
944

B
Bomin Zhang 已提交
945 946 947
    SQueryInfo*     pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0);
    STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo1, 0);
    tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo1, s1);
H
Haojun Liao 已提交
948

B
Bomin Zhang 已提交
949 950 951
    SQueryInfo*     pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0);
    STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0);
    tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2);
H
Haojun Liao 已提交
952

H
Haojun Liao 已提交
953
    SSqlObj* psub1 = pParentSql->pSubs[0];
H
Haojun Liao 已提交
954
    ((SJoinSupporter*)psub1->param)->pVgroupTables =  tscVgroupTableInfoDup(pTableMetaInfo1->pVgroupTables);
H
Haojun Liao 已提交
955 956

    SSqlObj* psub2 = pParentSql->pSubs[1];
H
Haojun Liao 已提交
957
    ((SJoinSupporter*)psub2->param)->pVgroupTables =  tscVgroupTableInfoDup(pTableMetaInfo2->pVgroupTables);
H
Haojun Liao 已提交
958

H
Haojun Liao 已提交
959
    pParentSql->subState.numOfSub = 2;
D
fix bug  
dapan1121 已提交
960
    
961
    memset(pParentSql->subState.states, 0, sizeof(pParentSql->subState.states[0]) * pParentSql->subState.numOfSub);
D
fix bug  
dapan1121 已提交
962 963
    tscDebug("%p reset all sub states to 0", pParentSql);
    
H
Haojun Liao 已提交
964
    for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) {
B
Bomin Zhang 已提交
965 966 967
      SSqlObj* sub = pParentSql->pSubs[m];
      issueTSCompQuery(sub, sub->param, pParentSql);
    }
H
Haojun Liao 已提交
968
  }
B
Bomin Zhang 已提交
969 970 971

  taosArrayDestroy(s1);
  taosArrayDestroy(s2);
H
Haojun Liao 已提交
972
}
H
Haojun Liao 已提交
973

H
Haojun Liao 已提交
974 975
static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
976

H
Haojun Liao 已提交
977 978 979 980 981 982 983 984 985
  SSqlObj* pParentSql = pSupporter->pObj;

  SSqlObj* pSql = (SSqlObj*)tres;
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  assert(!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE));

D
dapan1121 已提交
986 987 988 989 990 991 992 993 994
  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, pParentSql->res.code);
    quitAllSubquery(pSql, pParentSql, pSupporter);

    tscAsyncResultOnError(pParentSql);

    return;
  }

H
Haojun Liao 已提交
995 996 997 998 999 1000 1001
  // check for the error code firstly
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    // todo retry if other subqueries are not failed yet 
    assert(numOfRows < 0 && numOfRows == taos_errno(pSql));
    tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);

    pParentSql->res.code = numOfRows;
1002
    quitAllSubquery(pSql, pParentSql, pSupporter);
H
Haojun Liao 已提交
1003

H
Haojun Liao 已提交
1004
    tscAsyncResultOnError(pParentSql);
1005 1006
    return;
  }
H
Haojun Liao 已提交
1007

H
Haojun Liao 已提交
1008
  if (numOfRows > 0) {  // write the compressed timestamp to disk file
D
fix bug  
dapan1121 已提交
1009 1010 1011 1012 1013 1014 1015 1016
    if(pSupporter->f == NULL) {
      pSupporter->f = fopen(pSupporter->path, "w");

      if (pSupporter->f == NULL) {
        tscError("%p failed to create tmp file:%s, reason:%s", pSql, pSupporter->path, strerror(errno));
        
        pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);

1017
        quitAllSubquery(pSql, pParentSql, pSupporter);
D
fix bug  
dapan1121 已提交
1018 1019 1020 1021 1022 1023 1024
        
        tscAsyncResultOnError(pParentSql);

        return;
      }
    }
      
1025
    fwrite(pRes->data, (size_t)pRes->numOfRows, 1, pSupporter->f);
H
Haojun Liao 已提交
1026 1027 1028 1029 1030 1031 1032 1033
    fclose(pSupporter->f);
    pSupporter->f = NULL;

    STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true);
    if (pBuf == NULL) {  // in error process, close the fd
      tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows);

      pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
D
fix bug  
dapan1121 已提交
1034

1035
      quitAllSubquery(pSql, pParentSql, pSupporter);
D
fix bug  
dapan1121 已提交
1036
      
H
Haojun Liao 已提交
1037
      tscAsyncResultOnError(pParentSql);
H
Haojun Liao 已提交
1038

H
hjxilinx 已提交
1039 1040
      return;
    }
1041

H
Haojun Liao 已提交
1042
    if (pSupporter->pTSBuf == NULL) {
1043
      tscDebug("%p create tmp file for ts block:%s, size:%d bytes", pSql, pBuf->path, numOfRows);
H
Haojun Liao 已提交
1044 1045 1046
      pSupporter->pTSBuf = pBuf;
    } else {
      assert(pQueryInfo->numOfTables == 1);  // for subquery, only one
H
Haojun Liao 已提交
1047
      tsBufMerge(pSupporter->pTSBuf, pBuf);
H
Haojun Liao 已提交
1048
      tsBufDestroy(pBuf);
H
Haojun Liao 已提交
1049
    }
H
hjxilinx 已提交
1050

H
Haojun Liao 已提交
1051 1052
    // continue to retrieve ts-comp data from vnode
    if (!pRes->completed) {
S
slguan 已提交
1053
      taosGetTmpfilePath("ts-join", pSupporter->path);
H
Haojun Liao 已提交
1054
      pSupporter->f = fopen(pSupporter->path, "w");
H
Haojun Liao 已提交
1055
      pRes->row = pRes->numOfRows;
H
hjxilinx 已提交
1056

H
Haojun Liao 已提交
1057 1058
      taos_fetch_rows_a(tres, tsCompRetrieveCallback, param);
      return;
H
hjxilinx 已提交
1059
    }
H
Haojun Liao 已提交
1060
  }
H
Haojun Liao 已提交
1061

H
Haojun Liao 已提交
1062 1063
  if (hasMoreVnodesToTry(pSql)) {
    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
1064

H
Haojun Liao 已提交
1065 1066 1067
    int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
    pTableMetaInfo->vgroupIndex += 1;
    assert(pTableMetaInfo->vgroupIndex < totalVgroups);
H
Haojun Liao 已提交
1068

1069
    tscDebug("%p results from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%" PRId64,
H
Haojun Liao 已提交
1070 1071
             pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups,
             pRes->numOfClauseTotal);
H
Haojun Liao 已提交
1072

H
Haojun Liao 已提交
1073 1074
    pCmd->command = TSDB_SQL_SELECT;
    tscResetForNextRetrieve(&pSql->res);
H
Haojun Liao 已提交
1075

H
Haojun Liao 已提交
1076
    assert(pSupporter->f == NULL);
S
slguan 已提交
1077
    taosGetTmpfilePath("ts-join", pSupporter->path);
H
Haojun Liao 已提交
1078 1079 1080
    
    // TODO check for failure
    pSupporter->f = fopen(pSupporter->path, "w");
H
Haojun Liao 已提交
1081
    pRes->row = pRes->numOfRows;
H
Haojun Liao 已提交
1082

H
Haojun Liao 已提交
1083 1084 1085 1086 1087
    // set the callback function
    pSql->fp = tscJoinQueryCallback;
    tscProcessSql(pSql);
    return;
  }
H
Haojun Liao 已提交
1088

D
fix bug  
dapan1121 已提交
1089
  if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
H
Haojun Liao 已提交
1090
    return;
1091
  }  
H
hjxilinx 已提交
1092

1093
  tscDebug("%p all subquery retrieve ts complete, do ts block intersect", pParentSql);
H
hjxilinx 已提交
1094

H
Haojun Liao 已提交
1095 1096 1097
  // proceeds to launched secondary query to retrieve final data
  SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
  SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
H
Haojun Liao 已提交
1098

H
Haojun Liao 已提交
1099 1100 1101
  STimeWindow win = TSWINDOW_INITIALIZER;
  int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &win);
  if (num <= 0) {  // no result during ts intersect
1102
    tscDebug("%p no results generated in ts intersection, free all sub SqlObj and quit", pParentSql);
H
Haojun Liao 已提交
1103
    freeJoinSubqueryObj(pParentSql);
1104 1105 1106

    // set no result command
    pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
H
Haojun Liao 已提交
1107
    (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
H
Haojun Liao 已提交
1108 1109 1110 1111 1112 1113
    return;
  }

  // launch the query the retrieve actual results from vnode along with the filtered timestamp
  SQueryInfo* pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
  updateQueryTimeRange(pPQueryInfo, &win);
H
Haojun Liao 已提交
1114 1115

  //update the vgroup that involved in real data query
H
Haojun Liao 已提交
1116
  tscLaunchRealSubqueries(pParentSql);
H
Haojun Liao 已提交
1117
}
H
Haojun Liao 已提交
1118

H
Haojun Liao 已提交
1119 1120
static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfRows) {
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
1121

H
Haojun Liao 已提交
1122
  SSqlObj* pParentSql = pSupporter->pObj;
H
Haojun Liao 已提交
1123

H
Haojun Liao 已提交
1124 1125 1126
  SSqlObj* pSql = (SSqlObj*)tres;
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;
H
Haojun Liao 已提交
1127

H
Haojun Liao 已提交
1128
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
D
dapan1121 已提交
1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139

  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, pParentSql->res.code);
    quitAllSubquery(pSql, pParentSql, pSupporter);

    tscAsyncResultOnError(pParentSql);

    return;
  }

  
H
Haojun Liao 已提交
1140 1141
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(numOfRows == taos_errno(pSql));
H
hjxilinx 已提交
1142

H
Haojun Liao 已提交
1143 1144
    pParentSql->res.code = numOfRows;
    tscError("%p retrieve failed, index:%d, code:%s", pSql, pSupporter->subqueryIndex, tstrerror(numOfRows));
H
Haojun Liao 已提交
1145

H
Haojun Liao 已提交
1146
    tscAsyncResultOnError(pParentSql);
H
Haojun Liao 已提交
1147
    return;
H
Haojun Liao 已提交
1148
  }
H
Haojun Liao 已提交
1149

H
Haojun Liao 已提交
1150 1151 1152
  if (numOfRows >= 0) {
    pRes->numOfTotal += pRes->numOfRows;
  }
H
Haojun Liao 已提交
1153

H
Haojun Liao 已提交
1154
  SSubqueryState* pState = &pParentSql->subState;
H
Haojun Liao 已提交
1155 1156 1157
  if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) {
    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
    assert(pQueryInfo->numOfTables == 1);
H
Haojun Liao 已提交
1158

H
Haojun Liao 已提交
1159
    // for projection query, need to try next vnode if current vnode is exhausted
H
Haojun Liao 已提交
1160 1161
    int32_t numOfVgroups = 0;  // TODO refactor
    if (pTableMetaInfo->pVgroupTables != NULL) {
S
Shengliang Guan 已提交
1162
      numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
H
Haojun Liao 已提交
1163 1164 1165 1166 1167
    } else {
      numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
    }

    if ((++pTableMetaInfo->vgroupIndex) < numOfVgroups) {
H
Haojun Liao 已提交
1168
      tscDebug("%p no result in current vnode anymore, try next vnode, vgIndex:%d", pSql, pTableMetaInfo->vgroupIndex);
H
Haojun Liao 已提交
1169 1170
      pSql->cmd.command = TSDB_SQL_SELECT;
      pSql->fp = tscJoinQueryCallback;
H
Haojun Liao 已提交
1171

H
Haojun Liao 已提交
1172 1173
      tscProcessSql(pSql);
      return;
H
Haojun Liao 已提交
1174 1175
    } else {
      tscDebug("%p no result in current subquery anymore", pSql);
H
Haojun Liao 已提交
1176 1177 1178
    }
  }

D
fix bug  
dapan1121 已提交
1179
  if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
1180
    tscDebug("%p sub:%p,%d completed, total:%d", pParentSql, tres, pSupporter->subqueryIndex, pState->numOfSub);
H
Haojun Liao 已提交
1181 1182 1183
    return;
  }

H
Haojun Liao 已提交
1184
  tscDebug("%p all %d secondary subqueries retrieval completed, code:%d", tres, pState->numOfSub, pParentSql->res.code);
H
Haojun Liao 已提交
1185 1186 1187 1188 1189 1190 1191

  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    freeJoinSubqueryObj(pParentSql);
    pParentSql->res.completed = true;
  }

  // update the records for each subquery in parent sql object.
1192
  bool stableQuery = tscIsTwoStageSTableQuery(pQueryInfo, 0);
H
Haojun Liao 已提交
1193
  for (int32_t i = 0; i < pState->numOfSub; ++i) {
H
Haojun Liao 已提交
1194
    if (pParentSql->pSubs[i] == NULL) {
H
Haojun Liao 已提交
1195
      tscDebug("%p %p sub:%d not retrieve data", pParentSql, NULL, i);
H
Haojun Liao 已提交
1196
      continue;
H
hjxilinx 已提交
1197
    }
H
Haojun Liao 已提交
1198 1199

    SSqlRes* pRes1 = &pParentSql->pSubs[i]->res;
H
Haojun Liao 已提交
1200 1201

    if (pRes1->row > 0 && pRes1->numOfRows > 0) {
H
Haojun Liao 已提交
1202
      tscDebug("%p sub:%p index:%d numOfRows:%d total:%"PRId64 " (not retrieve)", pParentSql, pParentSql->pSubs[i], i,
H
Haojun Liao 已提交
1203 1204 1205
               pRes1->numOfRows, pRes1->numOfTotal);
      assert(pRes1->row < pRes1->numOfRows);
    } else {
1206
      if (!stableQuery) {
1207 1208 1209
        pRes1->numOfClauseTotal += pRes1->numOfRows;
      }

H
Haojun Liao 已提交
1210
      tscDebug("%p sub:%p index:%d numOfRows:%d total:%"PRId64, pParentSql, pParentSql->pSubs[i], i,
H
Haojun Liao 已提交
1211 1212
               pRes1->numOfRows, pRes1->numOfTotal);
    }
H
hjxilinx 已提交
1213
  }
H
Haojun Liao 已提交
1214 1215 1216

  // data has retrieved to client, build the join results
  tscBuildResFromSubqueries(pParentSql);
H
hjxilinx 已提交
1217 1218
}

1219
void tscFetchDatablockForSubquery(SSqlObj* pSql) {
H
Haojun Liao 已提交
1220
  assert(pSql->subState.numOfSub >= 1);
H
hjxilinx 已提交
1221
  
H
hjxilinx 已提交
1222
  int32_t numOfFetch = 0;
H
Haojun Liao 已提交
1223 1224
  bool    hasData = true;
  bool    reachLimit = false;
H
Haojun Liao 已提交
1225 1226

  // if the subquery is NULL, it does not involved in the final result generation
H
Haojun Liao 已提交
1227
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
1228 1229
    SSqlObj* pSub = pSql->pSubs[i];
    if (pSub == NULL) {
H
hjxilinx 已提交
1230 1231
      continue;
    }
H
Haojun Liao 已提交
1232

H
hjxilinx 已提交
1233
    SSqlRes *pRes = &pSub->res;
H
Haojun Liao 已提交
1234

H
hjxilinx 已提交
1235 1236
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0);

H
Haojun Liao 已提交
1237 1238
    if (!tscHasReachLimitation(pQueryInfo, pRes)) {
      if (pRes->row >= pRes->numOfRows) {
H
Haojun Liao 已提交
1239
        // no data left in current result buffer
H
Haojun Liao 已提交
1240 1241
        hasData = false;

H
Haojun Liao 已提交
1242 1243
        // The current query is completed for the active vnode, try next vnode if exists
        // If it is completed, no need to fetch anymore.
H
Haojun Liao 已提交
1244 1245
        if (!pRes->completed) {
          numOfFetch++;
H
Haojun Liao 已提交
1246
        }
H
hjxilinx 已提交
1247
      }
H
Haojun Liao 已提交
1248 1249
    } else {  // has reach the limitation, no data anymore
      if (pRes->row >= pRes->numOfRows) {
H
Haojun Liao 已提交
1250 1251
        reachLimit = true;
        hasData    = false;
H
Haojun Liao 已提交
1252 1253
        break;
      }
H
hjxilinx 已提交
1254
    }
H
Haojun Liao 已提交
1255
  }
H
hjxilinx 已提交
1256

H
hjxilinx 已提交
1257 1258 1259 1260
  // has data remains in client side, and continue to return data to app
  if (hasData) {
    tscBuildResFromSubqueries(pSql);
    return;
H
Haojun Liao 已提交
1261
  }
H
Haojun Liao 已提交
1262

H
Haojun Liao 已提交
1263 1264
  // If at least one subquery is completed in current vnode, try the next vnode in case of multi-vnode
  // super table projection query.
1265 1266 1267 1268 1269 1270 1271
  if (reachLimit) {
    pSql->res.completed = true;
    freeJoinSubqueryObj(pSql);

    if (pSql->res.code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, 0);
    } else {
H
Haojun Liao 已提交
1272
      tscAsyncResultOnError(pSql);
1273 1274 1275 1276 1277 1278
    }

    return;
  }

  if (numOfFetch <= 0) {
H
Haojun Liao 已提交
1279 1280
    bool tryNextVnode = false;

1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293
    bool orderedPrjQuery = false;
    for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
      SSqlObj* pSub = pSql->pSubs[i];
      if (pSub == NULL) {
        continue;
      }

      SQueryInfo* p = tscGetQueryInfoDetail(&pSub->cmd, 0);
      orderedPrjQuery = tscNonOrderedProjectionQueryOnSTable(p, 0);
      if (orderedPrjQuery) {
        break;
      }
    }
H
Haojun Liao 已提交
1294

D
fix bug  
dapan1121 已提交
1295

1296
    if (orderedPrjQuery) {
1297
      for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
Haojun Liao 已提交
1298 1299
        SSqlObj* pSub = pSql->pSubs[i];
        if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) {
D
fix bug  
dapan1121 已提交
1300
          subquerySetState(pSub, &pSql->subState, i, 0);
H
Haojun Liao 已提交
1301 1302 1303
        }
      }
    }
D
fix bug  
dapan1121 已提交
1304
    
H
Haojun Liao 已提交
1305 1306 1307 1308 1309 1310 1311 1312 1313

    for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
      SSqlObj* pSub = pSql->pSubs[i];
      if (pSub == NULL) {
        continue;
      }

      SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0);

1314 1315
      if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && pSub->res.row >= pSub->res.numOfRows &&
          pSub->res.completed) {
H
Haojun Liao 已提交
1316 1317 1318 1319 1320 1321
        STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
        assert(pQueryInfo->numOfTables == 1);

        // for projection query, need to try next vnode if current vnode is exhausted
        int32_t numOfVgroups = 0;  // TODO refactor
        if (pTableMetaInfo->pVgroupTables != NULL) {
S
Shengliang Guan 已提交
1322
          numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
H
Haojun Liao 已提交
1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343
        } else {
          numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
        }

        if ((++pTableMetaInfo->vgroupIndex) < numOfVgroups) {
          tscDebug("%p no result in current vnode anymore, try next vnode, vgIndex:%d", pSub,
                   pTableMetaInfo->vgroupIndex);
          pSub->cmd.command = TSDB_SQL_SELECT;
          pSub->fp = tscJoinQueryCallback;

          tscProcessSql(pSub);
          tryNextVnode = true;
        } else {
          tscDebug("%p no result in current subquery anymore", pSub);
        }
      }
    }

    if (tryNextVnode) {
      return;
    }
H
Haojun Liao 已提交
1344 1345 1346 1347

    pSql->res.completed = true;
    freeJoinSubqueryObj(pSql);

H
hjxilinx 已提交
1348 1349 1350
    if (pSql->res.code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, 0);
    } else {
H
Haojun Liao 已提交
1351
      tscAsyncResultOnError(pSql);
H
hjxilinx 已提交
1352
    }
1353

H
hjxilinx 已提交
1354 1355 1356 1357
    return;
  }

  // TODO multi-vnode retrieve for projection query with limitation has bugs, since the global limiation is not handled
H
Haojun Liao 已提交
1358
  // retrieve data from current vnode.
1359
  tscDebug("%p retrieve data from %d subqueries", pSql, numOfFetch);
H
Haojun Liao 已提交
1360
  SJoinSupporter* pSupporter = NULL;
1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373

  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
    SSqlObj* pSql1 = pSql->pSubs[i];
    if (pSql1 == NULL) {
      continue;
    }

    SSqlRes* pRes1 = &pSql1->res;

    if (pRes1->row >= pRes1->numOfRows) {
      subquerySetState(pSql1, &pSql->subState, i, 0);
    }
  }
H
Haojun Liao 已提交
1374

H
Haojun Liao 已提交
1375
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
1376 1377 1378 1379
    SSqlObj* pSql1 = pSql->pSubs[i];
    if (pSql1 == NULL) {
      continue;
    }
H
Haojun Liao 已提交
1380

H
hjxilinx 已提交
1381 1382 1383
    SSqlRes* pRes1 = &pSql1->res;
    SSqlCmd* pCmd1 = &pSql1->cmd;

1384
    pSupporter = (SJoinSupporter*)pSql1->param;
H
hjxilinx 已提交
1385 1386 1387 1388 1389 1390 1391 1392

    // wait for all subqueries completed
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd1, 0);
    assert(pRes1->numOfRows >= 0 && pQueryInfo->numOfTables == 1);

    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
    
    if (pRes1->row >= pRes1->numOfRows) {
1393
      tscDebug("%p subquery:%p retrieve data from vnode, subquery:%d, vgroupIndex:%d", pSql, pSql1,
H
hjxilinx 已提交
1394
               pSupporter->subqueryIndex, pTableMetaInfo->vgroupIndex);
H
hjxilinx 已提交
1395 1396

      tscResetForNextRetrieve(pRes1);
H
Haojun Liao 已提交
1397
      pSql1->fp = joinRetrieveFinalResCallback;
H
hjxilinx 已提交
1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413

      if (pCmd1->command < TSDB_SQL_LOCAL) {
        pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
      }

      tscProcessSql(pSql1);
    }
  }
}

// all subqueries return, set the result output index
void tscSetupOutputColumnIndex(SSqlObj* pSql) {
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;


H
Haojun Liao 已提交
1414
  // the column transfer support struct has been built
H
hjxilinx 已提交
1415
  if (pRes->pColumnIndex != NULL) {
H
Haojun Liao 已提交
1416
    return;
H
hjxilinx 已提交
1417 1418 1419 1420
  }

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

S
Shengliang Guan 已提交
1421
  int32_t numOfExprs = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
H
Haojun Liao 已提交
1422
  pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * numOfExprs);
H
Haojun Liao 已提交
1423 1424 1425 1426
  if (pRes->pColumnIndex == NULL) {
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return;
  }
H
Haojun Liao 已提交
1427 1428

  for (int32_t i = 0; i < numOfExprs; ++i) {
H
hjxilinx 已提交
1429 1430 1431 1432 1433
    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);

    int32_t tableIndexOfSub = -1;
    for (int32_t j = 0; j < pQueryInfo->numOfTables; ++j) {
      STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, j);
1434
      if (pTableMetaInfo->pTableMeta->id.uid == pExpr->uid) {
H
hjxilinx 已提交
1435 1436 1437 1438 1439 1440 1441 1442 1443 1444
        tableIndexOfSub = j;
        break;
      }
    }

    assert(tableIndexOfSub >= 0 && tableIndexOfSub < pQueryInfo->numOfTables);
    
    SSqlCmd* pSubCmd = &pSql->pSubs[tableIndexOfSub]->cmd;
    SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(pSubCmd, 0);
    
H
Haojun Liao 已提交
1445 1446
    size_t numOfSubExpr = taosArrayGetSize(pSubQueryInfo->exprList);
    for (int32_t k = 0; k < numOfSubExpr; ++k) {
H
hjxilinx 已提交
1447 1448 1449 1450 1451 1452 1453
      SSqlExpr* pSubExpr = tscSqlExprGet(pSubQueryInfo, k);
      if (pExpr->functionId == pSubExpr->functionId && pExpr->colInfo.colId == pSubExpr->colInfo.colId) {
        pRes->pColumnIndex[i] = (SColumnIndex){.tableIndex = tableIndexOfSub, .columnIndex = k};
        break;
      }
    }
  }
H
Haojun Liao 已提交
1454 1455 1456 1457

  // restore the offset value for super table query in case of final result.
  tscRestoreSQLFuncForSTableQuery(pQueryInfo);
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
1458 1459 1460 1461
}

void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
  SSqlObj* pSql = (SSqlObj*)tres;
H
Haojun Liao 已提交
1462

1463
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
1464
  SSqlObj* pParentSql = pSupporter->pObj;
D
fix bug  
dapan1121 已提交
1465
  
H
hjxilinx 已提交
1466
  // There is only one subquery and table for each subquery.
H
hjxilinx 已提交
1467
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
D
fix bug  
dapan1121 已提交
1468 1469
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);

H
hjxilinx 已提交
1470
  assert(pQueryInfo->numOfTables == 1 && pSql->cmd.numOfClause == 1);
H
hjxilinx 已提交
1471

H
Haojun Liao 已提交
1472 1473 1474
  // retrieve actual query results from vnode during the second stage join subquery
  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code, pParentSql->res.code);
1475
    quitAllSubquery(pSql, pParentSql, pSupporter);
D
fix bug  
dapan1121 已提交
1476

H
Haojun Liao 已提交
1477
    tscAsyncResultOnError(pParentSql);
H
hjxilinx 已提交
1478

H
Haojun Liao 已提交
1479 1480
    return;
  }
H
hjxilinx 已提交
1481

H
Haojun Liao 已提交
1482 1483 1484
  // TODO here retry is required, not directly returns to client
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(taos_errno(pSql) == code);
H
hjxilinx 已提交
1485

1486
    tscError("%p abort query, code:%s, global code:%s", pSql, tstrerror(code), tstrerror(pParentSql->res.code));
H
Haojun Liao 已提交
1487 1488
    pParentSql->res.code = code;

1489
    quitAllSubquery(pSql, pParentSql, pSupporter);
H
Haojun Liao 已提交
1490
    tscAsyncResultOnError(pParentSql);
H
Haojun Liao 已提交
1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510

    return;
  }

  // retrieve <tid, tag> tuples from vnode
  if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) {
    pSql->fp = tidTagRetrieveCallback;
    pSql->cmd.command = TSDB_SQL_FETCH;
    tscProcessSql(pSql);
    return;
  }

  // retrieve ts_comp info from vnode
  if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
    pSql->fp = tsCompRetrieveCallback;
    pSql->cmd.command = TSDB_SQL_FETCH;
    tscProcessSql(pSql);
    return;
  }

1511 1512
  // In case of consequence query from other vnode, do not wait for other query response here.
  if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) {
D
fix bug  
dapan1121 已提交
1513
    if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
1514
      return;
1515
    }      
H
Haojun Liao 已提交
1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526
  }

  tscSetupOutputColumnIndex(pParentSql);

  /**
   * if the query is a continue query (vgroupIndex > 0 for projection query) for next vnode, do the retrieval of
   * data instead of returning to its invoker
   */
  if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
    pSql->fp = joinRetrieveFinalResCallback;  // continue retrieve data
    pSql->cmd.command = TSDB_SQL_FETCH;
1527
    
H
Haojun Liao 已提交
1528 1529 1530 1531 1532
    tscProcessSql(pSql);
  } else {  // first retrieve from vnode during the secondary stage sub-query
    // set the command flag must be after the semaphore been correctly set.
    if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
      (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
H
hjxilinx 已提交
1533
    } else {
H
Haojun Liao 已提交
1534
      tscAsyncResultOnError(pParentSql);
H
hjxilinx 已提交
1535 1536 1537 1538 1539 1540 1541
    }
  }
}

/////////////////////////////////////////////////////////////////////////////////////////
static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code);

1542
static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
H
hjxilinx 已提交
1543

H
Haojun Liao 已提交
1544
int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) {
H
hjxilinx 已提交
1545 1546 1547 1548
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  
  pSql->res.qhandle = 0x1;
H
Haojun Liao 已提交
1549 1550
  assert(pSql->res.numOfRows == 0);

H
hjxilinx 已提交
1551
  if (pSql->pSubs == NULL) {
H
Haojun Liao 已提交
1552
    pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
H
hjxilinx 已提交
1553
    if (pSql->pSubs == NULL) {
1554
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
1555 1556 1557
    }
  }
  
1558
  SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL);
H
hjxilinx 已提交
1559
  if (pNew == NULL) {
1560
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
1561 1562
  }
  
1563
  pSql->pSubs[tableIndex] = pNew;
H
hjxilinx 已提交
1564 1565 1566 1567 1568 1569 1570 1571
  
  if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
    addGroupInfoForSubquery(pSql, pNew, 0, tableIndex);
    
    // refactor as one method
    SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
    assert(pNewQueryInfo != NULL);
    
1572 1573 1574 1575 1576 1577 1578
    // update the table index
    size_t num = taosArrayGetSize(pNewQueryInfo->colList);
    for (int32_t i = 0; i < num; ++i) {
      SColumn* pCol = taosArrayGetP(pNewQueryInfo->colList, i);
      pCol->colIndex.tableIndex = 0;
    }
    
H
hjxilinx 已提交
1579 1580 1581 1582 1583 1584 1585
    pSupporter->colList = pNewQueryInfo->colList;
    pNewQueryInfo->colList = NULL;
    
    pSupporter->exprList = pNewQueryInfo->exprList;
    pNewQueryInfo->exprList = NULL;
    
    pSupporter->fieldsInfo = pNewQueryInfo->fieldsInfo;
H
hjxilinx 已提交
1586
  
H
hjxilinx 已提交
1587 1588
    // this data needs to be transfer to support struct
    memset(&pNewQueryInfo->fieldsInfo, 0, sizeof(SFieldInfo));
H
Haojun Liao 已提交
1589 1590 1591
    if (tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond) != 0) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
1592

H
Haojun Liao 已提交
1593 1594 1595
    pSupporter->groupInfo = pNewQueryInfo->groupbyExpr;
    memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));

H
hjxilinx 已提交
1596
    pNew->cmd.numOfCols = 0;
1597
    pNewQueryInfo->interval.interval = 0;
H
Haojun Liao 已提交
1598 1599 1600 1601 1602
    pSupporter->limit = pNewQueryInfo->limit;

    pNewQueryInfo->limit.limit = -1;
    pNewQueryInfo->limit.offset = 0;

H
hjxilinx 已提交
1603 1604 1605
    // backup the data and clear it in the sqlcmd object
    memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));
    
H
hjxilinx 已提交
1606
    tscInitQueryInfo(pNewQueryInfo);
H
hjxilinx 已提交
1607 1608
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
    
weixin_48148422's avatar
weixin_48148422 已提交
1609
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // return the tableId & tag
H
Haojun Liao 已提交
1610
      SColumnIndex colIndex = {0};
H
Haojun Liao 已提交
1611 1612 1613 1614

      STagCond* pTagCond = &pSupporter->tagCond;
      assert(pTagCond->joinInfo.hasJoin);

1615
      int32_t tagColId = tscGetJoinTagColIdByUid(pTagCond, pTableMetaInfo->pTableMeta->id.uid);
H
Haojun Liao 已提交
1616 1617 1618
      SSchema* s = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);

      colIndex.columnIndex = tscGetTagColIndexById(pTableMetaInfo->pTableMeta, tagColId);
1619

H
Haojun Liao 已提交
1620
      int16_t bytes = 0;
H
Haojun Liao 已提交
1621
      int16_t type  = 0;
H
Haojun Liao 已提交
1622 1623
      int32_t inter = 0;

H
Haojun Liao 已提交
1624
      getResultDataInfo(s->type, s->bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0);
H
Haojun Liao 已提交
1625

S
Shengliang Guan 已提交
1626
      SSchema s1 = {.colId = s->colId, .type = (uint8_t)type, .bytes = bytes};
H
Haojun Liao 已提交
1627
      pSupporter->tagSize = s1.bytes;
1628
      assert(isValidDataType(s1.type) && s1.bytes > 0);
H
Haojun Liao 已提交
1629

1630 1631
      // set get tags query type
      TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
H
Haojun Liao 已提交
1632
      tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &colIndex, &s1, TSDB_COL_TAG);
1633
      size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
1634
  
1635
      tscDebug(
1636
          "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to tid_tag query to retrieve (tableId, tags), "
S
TD-1057  
Shengliang Guan 已提交
1637
          "exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, tagIndex:%d, name:%s",
1638
          pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo),
1639
          numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, colIndex.columnIndex, tNameGetTableName(&pNewQueryInfo->pTableMetaInfo[0]->name));
1640 1641
    } else {
      SSchema      colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
H
Haojun Liao 已提交
1642 1643
      SColumnIndex colIndex = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
      tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &colIndex, &colSchema, TSDB_COL_NORMAL);
1644 1645 1646 1647

      // set the tags value for ts_comp function
      SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0);

H
Haojun Liao 已提交
1648
      if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
1649
        int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
1650
        pExpr->param->i64 = tagColId;
H
Haojun Liao 已提交
1651 1652
        pExpr->numOfParams = 1;
      }
1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669

      // add the filter tag column
      if (pSupporter->colList != NULL) {
        size_t s = taosArrayGetSize(pSupporter->colList);

        for (int32_t i = 0; i < s; ++i) {
          SColumn *pCol = taosArrayGetP(pSupporter->colList, i);

          if (pCol->numOfFilters > 0) {  // copy to the pNew->cmd.colList if it is filtered.
            SColumn *p = tscColumnClone(pCol);
            taosArrayPush(pNewQueryInfo->colList, &p);
          }
        }
      }

      size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);

1670
      tscDebug(
B
Bomin Zhang 已提交
1671
          "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%u, transfer to ts_comp query to retrieve timestamps, "
S
TD-1057  
Shengliang Guan 已提交
1672
          "exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s",
1673
          pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo),
1674
          numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pNewQueryInfo->pTableMetaInfo[0]->name));
1675
    }
H
hjxilinx 已提交
1676
  } else {
H
hjxilinx 已提交
1677
    assert(0);
H
hjxilinx 已提交
1678 1679 1680 1681
    SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
    pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
  }

H
Haojun Liao 已提交
1682
  return TSDB_CODE_SUCCESS;
H
hjxilinx 已提交
1683 1684
}

H
Haojun Liao 已提交
1685
void tscHandleMasterJoinQuery(SSqlObj* pSql) {
H
hjxilinx 已提交
1686
  SSqlCmd* pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1687 1688
  SSqlRes* pRes = &pSql->res;

H
hjxilinx 已提交
1689 1690
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0);
1691

H
Haojun Liao 已提交
1692
  int32_t code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1693
  pSql->subState.numOfSub = pQueryInfo->numOfTables;
H
Haojun Liao 已提交
1694

1695 1696 1697 1698 1699 1700
  if (pSql->subState.states == NULL) {
    pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states));
    if (pSql->subState.states == NULL) {
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      goto _error;
    }
D
fix bug  
dapan1121 已提交
1701 1702
    
    pthread_mutex_init(&pSql->subState.mutex, NULL);
1703
  }
D
dapan1121 已提交
1704 1705 1706

  memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub);
  tscDebug("%p reset all sub states to 0", pSql);
1707
  
H
Haojun Liao 已提交
1708 1709
  bool hasEmptySub = false;

1710
  tscDebug("%p start subquery, total:%d", pSql, pQueryInfo->numOfTables);
H
hjxilinx 已提交
1711
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
Haojun Liao 已提交
1712

H
Haojun Liao 已提交
1713
    SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i);
H
hjxilinx 已提交
1714 1715 1716
    
    if (pSupporter == NULL) {  // failed to create support struct, abort current query
      tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i);
H
Haojun Liao 已提交
1717 1718
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      goto _error;
H
hjxilinx 已提交
1719 1720
    }
    
H
Haojun Liao 已提交
1721
    code = tscCreateJoinSubquery(pSql, i, pSupporter);
H
hjxilinx 已提交
1722 1723
    if (code != TSDB_CODE_SUCCESS) {  // failed to create subquery object, quit query
      tscDestroyJoinSupporter(pSupporter);
H
Haojun Liao 已提交
1724 1725 1726 1727 1728 1729 1730
      goto _error;
    }

    SSqlObj* pSub = pSql->pSubs[i];
    STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSub->cmd, 0, 0);
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) && (pTableMetaInfo->vgroupList->numOfVgroups == 0)) {
      hasEmptySub = true;
H
hjxilinx 已提交
1731 1732 1733
      break;
    }
  }
H
Haojun Liao 已提交
1734

H
Haojun Liao 已提交
1735 1736 1737 1738 1739
  if (hasEmptySub) {  // at least one subquery is empty, do nothing and return
    freeJoinSubqueryObj(pSql);
    pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
    (*pSql->fp)(pSql->param, pSql, 0);
  } else {
D
fix bug  
dapan1121 已提交
1740
    int fail = 0;
H
Haojun Liao 已提交
1741
    for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
Haojun Liao 已提交
1742
      SSqlObj* pSub = pSql->pSubs[i];
D
fix bug  
dapan1121 已提交
1743 1744 1745 1746 1747
      if (fail) {
        (*pSub->fp)(pSub->param, pSub, 0);
        continue;
      }
      
H
Haojun Liao 已提交
1748
      if ((code = tscProcessSql(pSub)) != TSDB_CODE_SUCCESS) {
D
fix bug  
dapan1121 已提交
1749 1750
        pRes->code = code;
        (*pSub->fp)(pSub->param, pSub, 0);
D
fix bug  
dapan1121 已提交
1751
        fail = 1;
H
Haojun Liao 已提交
1752 1753 1754
      }
    }

D
fix bug  
dapan1121 已提交
1755 1756 1757 1758
    if(fail) {
      return;
    }

H
Haojun Liao 已提交
1759 1760 1761 1762 1763 1764 1765
    pSql->cmd.command = TSDB_SQL_TABLE_JOIN_RETRIEVE;
  }

  return;

  _error:
  pRes->code = code;
H
Haojun Liao 已提交
1766
  tscAsyncResultOnError(pSql);
H
hjxilinx 已提交
1767 1768
}

H
Haojun Liao 已提交
1769 1770
static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) {
  assert(numOfSubs <= pSql->subState.numOfSub && numOfSubs >= 0);
H
hjxilinx 已提交
1771 1772 1773 1774 1775 1776 1777
  
  for(int32_t i = 0; i < numOfSubs; ++i) {
    SSqlObj* pSub = pSql->pSubs[i];
    assert(pSub != NULL);
    
    SRetrieveSupport* pSupport = pSub->param;
    
S
TD-1848  
Shengliang Guan 已提交
1778 1779
    tfree(pSupport->localBuffer);
    tfree(pSupport);
H
hjxilinx 已提交
1780
    
1781
    taos_free_result(pSub);
H
hjxilinx 已提交
1782 1783 1784
  }
}

D
TD-2516  
dapan1121 已提交
1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803
void tscLockByThread(int64_t *lockedBy) {
  int64_t tid = taosGetSelfPthreadId();
  int     i = 0;
  while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) {
    if (++i % 100 == 0) {
      sched_yield();
    }
  }
}

void tscUnlockByThread(int64_t *lockedBy) {
  int64_t tid = taosGetSelfPthreadId();
  if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) {
    assert(false);
  }
}



H
hjxilinx 已提交
1804 1805 1806 1807 1808
int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
  
  // pRes->code check only serves in launching metric sub-queries
1809
  if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
H
hjxilinx 已提交
1810
    pCmd->command = TSDB_SQL_RETRIEVE_LOCALMERGE;  // enable the abort of kill super table function.
H
hjxilinx 已提交
1811 1812 1813 1814
    return pRes->code;
  }
  
  tExtMemBuffer **  pMemoryBuf = NULL;
H
Haojun Liao 已提交
1815 1816
  tOrderDescriptor *pDesc  = NULL;
  SColumnModel     *pModel = NULL;
H
Haojun Liao 已提交
1817
  SColumnModel     *pFinalModel = NULL;
H
Haojun Liao 已提交
1818

H
Haojun Liao 已提交
1819
  pRes->qhandle = 0x1;  // hack the qhandle check
H
hjxilinx 已提交
1820
  
H
Haojun Liao 已提交
1821
  const uint32_t nBufferSize = (1u << 16u);  // 64KB
H
hjxilinx 已提交
1822
  
H
Haojun Liao 已提交
1823
  SQueryInfo     *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
1824
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
1825 1826
  SSubqueryState *pState = &pSql->subState;

H
Haojun Liao 已提交
1827 1828 1829 1830
  pState->numOfSub = 0;
  if (pTableMetaInfo->pVgroupTables == NULL) {
    pState->numOfSub = pTableMetaInfo->vgroupList->numOfVgroups;
  } else {
S
Shengliang Guan 已提交
1831
    pState->numOfSub = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
H
Haojun Liao 已提交
1832 1833
  }

H
Haojun Liao 已提交
1834
  assert(pState->numOfSub > 0);
H
hjxilinx 已提交
1835
  
H
Haojun Liao 已提交
1836
  int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, &pFinalModel, nBufferSize);
H
hjxilinx 已提交
1837
  if (ret != 0) {
1838
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1839
    tscAsyncResultOnError(pSql);
S
TD-1848  
Shengliang Guan 已提交
1840
    tfree(pMemoryBuf);
H
hjxilinx 已提交
1841
    return ret;
H
hjxilinx 已提交
1842 1843
  }
  
H
Haojun Liao 已提交
1844
  pSql->pSubs = calloc(pState->numOfSub, POINTER_BYTES);
H
Haojun Liao 已提交
1845

H
Haojun Liao 已提交
1846
  tscDebug("%p retrieved query data from %d vnode(s)", pSql, pState->numOfSub);
H
Haojun Liao 已提交
1847

H
Haojun Liao 已提交
1848
  if (pSql->pSubs == NULL) {
S
TD-1848  
Shengliang Guan 已提交
1849
    tfree(pSql->pSubs);
H
Haojun Liao 已提交
1850
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1851
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel,pState->numOfSub);
H
Haojun Liao 已提交
1852

H
Haojun Liao 已提交
1853
    tscAsyncResultOnError(pSql);
H
Haojun Liao 已提交
1854 1855 1856
    return ret;
  }

1857 1858 1859 1860 1861 1862 1863 1864
  if (pState->states == NULL) {
    pState->states = calloc(pState->numOfSub, sizeof(*pState->states));
    if (pState->states == NULL) {
      pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      tscAsyncResultOnError(pSql);
      tfree(pMemoryBuf);
      return ret;
    }
D
fix bug  
dapan1121 已提交
1865 1866

    pthread_mutex_init(&pState->mutex, NULL);
1867 1868 1869
  }

  memset(pState->states, 0, sizeof(*pState->states) * pState->numOfSub);
D
fix bug  
dapan1121 已提交
1870
  tscDebug("%p reset all sub states to 0", pSql);
1871
  
H
hjxilinx 已提交
1872 1873 1874
  pRes->code = TSDB_CODE_SUCCESS;
  
  int32_t i = 0;
H
Haojun Liao 已提交
1875
  for (; i < pState->numOfSub; ++i) {
H
hjxilinx 已提交
1876 1877 1878 1879 1880 1881 1882 1883
    SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport));
    if (trs == NULL) {
      tscError("%p failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
      break;
    }
    
    trs->pExtMemBuffer = pMemoryBuf;
    trs->pOrderDescriptor = pDesc;
H
Haojun Liao 已提交
1884

H
hjxilinx 已提交
1885 1886 1887
    trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage));
    if (trs->localBuffer == NULL) {
      tscError("%p failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
S
TD-1848  
Shengliang Guan 已提交
1888
      tfree(trs);
H
hjxilinx 已提交
1889 1890 1891
      break;
    }
    
H
Haojun Liao 已提交
1892 1893
    trs->subqueryIndex  = i;
    trs->pParentSql     = pSql;
H
hjxilinx 已提交
1894
    trs->pFinalColModel = pModel;
H
Haojun Liao 已提交
1895
    trs->pFFColModel    = pFinalModel;
H
Haojun Liao 已提交
1896

1897
    SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL);
H
hjxilinx 已提交
1898 1899
    if (pNew == NULL) {
      tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
S
TD-1848  
Shengliang Guan 已提交
1900 1901
      tfree(trs->localBuffer);
      tfree(trs);
H
hjxilinx 已提交
1902 1903 1904 1905 1906 1907 1908
      break;
    }
    
    // todo handle multi-vnode situation
    if (pQueryInfo->tsBuf) {
      SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
      pNewQueryInfo->tsBuf = tsBufClone(pQueryInfo->tsBuf);
H
Haojun Liao 已提交
1909
      assert(pNewQueryInfo->tsBuf != NULL);
H
hjxilinx 已提交
1910 1911
    }
    
1912
    tscDebug("%p sub:%p create subquery success. orderOfSub:%d", pSql, pNew, trs->subqueryIndex);
H
hjxilinx 已提交
1913 1914
  }
  
H
Haojun Liao 已提交
1915
  if (i < pState->numOfSub) {
H
hjxilinx 已提交
1916
    tscError("%p failed to prepare subquery structure and launch subqueries", pSql);
1917
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
1918
    
H
Haojun Liao 已提交
1919
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel, pState->numOfSub);
H
Haojun Liao 已提交
1920
    doCleanupSubqueries(pSql, i);
H
hjxilinx 已提交
1921 1922 1923
    return pRes->code;   // free all allocated resource
  }
  
1924
  if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
H
Haojun Liao 已提交
1925
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel, pState->numOfSub);
H
Haojun Liao 已提交
1926
    doCleanupSubqueries(pSql, i);
H
hjxilinx 已提交
1927 1928 1929
    return pRes->code;
  }
  
H
Haojun Liao 已提交
1930
  for(int32_t j = 0; j < pState->numOfSub; ++j) {
H
hjxilinx 已提交
1931 1932 1933
    SSqlObj* pSub = pSql->pSubs[j];
    SRetrieveSupport* pSupport = pSub->param;
    
1934
    tscDebug("%p sub:%p launch subquery, orderOfSub:%d.", pSql, pSub, pSupport->subqueryIndex);
H
hjxilinx 已提交
1935 1936
    tscProcessSql(pSub);
  }
H
Haojun Liao 已提交
1937

H
hjxilinx 已提交
1938 1939 1940
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
1941 1942
static void tscFreeRetrieveSup(SSqlObj *pSql) {
  SRetrieveSupport *trsupport = pSql->param;
1943

H
Haojun Liao 已提交
1944 1945 1946 1947 1948 1949 1950
  void* p = atomic_val_compare_exchange_ptr(&pSql->param, trsupport, 0);
  if (p == NULL) {
    tscDebug("%p retrieve supp already released", pSql);
    return;
  }

  tscDebug("%p start to free subquery supp obj:%p", pSql, trsupport);
S
TD-1848  
Shengliang Guan 已提交
1951 1952
  tfree(trsupport->localBuffer);
  tfree(trsupport);
H
hjxilinx 已提交
1953 1954
}

1955
static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows);
1956
static void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows);
H
hjxilinx 已提交
1957

H
Haojun Liao 已提交
1958
static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t code) {
H
hjxilinx 已提交
1959
// set no disk space error info
1960
  tscError("sub:%p failed to flush data to disk, reason:%s", tres, tstrerror(code));
H
Haojun Liao 已提交
1961
  SSqlObj* pParentSql = trsupport->pParentSql;
H
Haojun Liao 已提交
1962 1963

  pParentSql->res.code = code;
H
hjxilinx 已提交
1964
  trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
H
Haojun Liao 已提交
1965
  tscHandleSubqueryError(trsupport, tres, pParentSql->res.code);
H
hjxilinx 已提交
1966 1967
}

H
Haojun Liao 已提交
1968 1969 1970 1971 1972 1973 1974 1975
/*
 * current query failed, and the retry count is less than the available
 * count, retry query clear previous retrieved data, then launch a new sub query
 */
static int32_t tscReissueSubquery(SRetrieveSupport *trsupport, SSqlObj *pSql, int32_t code) {
  SSqlObj *pParentSql = trsupport->pParentSql;
  int32_t  subqueryIndex = trsupport->subqueryIndex;

S
TD-1732  
Shengliang Guan 已提交
1976 1977
  STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
  SVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
H
Haojun Liao 已提交
1978 1979 1980 1981 1982

  tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]);

  // clear local saved number of results
  trsupport->localBuffer->num = 0;
1983
  tscError("%p sub:%p retrieve/query failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSql, pSql,
H
Haojun Liao 已提交
1984 1985
           tstrerror(code), subqueryIndex, trsupport->numOfRetry);

1986
  SSqlObj *pNew = tscCreateSTableSubquery(trsupport->pParentSql, trsupport, pSql);
H
Haojun Liao 已提交
1987
  if (pNew == NULL) {
1988 1989
    tscError("%p sub:%p failed to create new subquery due to error:%s, abort retry, vgId:%d, orderOfSub:%d",
             trsupport->pParentSql, pSql, tstrerror(terrno), pVgroup->vgId, trsupport->subqueryIndex);
H
Haojun Liao 已提交
1990

1991
    pParentSql->res.code = terrno;
H
Haojun Liao 已提交
1992 1993 1994 1995 1996
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;

    return pParentSql->res.code;
  }

1997 1998 1999 2000 2001
  int32_t ret = tscProcessSql(pNew);

  // if failed to process sql, let following code handle the pSql
  if (ret == TSDB_CODE_SUCCESS) {
    taos_free_result(pSql);
H
Haojun Liao 已提交
2002 2003 2004
    return ret;
  } else {
    return ret;
2005
  }
H
Haojun Liao 已提交
2006 2007
}

2008
void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
H
Haojun Liao 已提交
2009 2010 2011 2012 2013
  // it has been freed already
  if (pSql->param != trsupport || pSql->param == NULL) {
    return;
  }

H
Haojun Liao 已提交
2014
  SSqlObj *pParentSql = trsupport->pParentSql;
H
hjxilinx 已提交
2015 2016 2017
  int32_t  subqueryIndex = trsupport->subqueryIndex;
  
  assert(pSql != NULL);
H
Haojun Liao 已提交
2018

H
Haojun Liao 已提交
2019
  SSubqueryState* pState = &pParentSql->subState;
H
Haojun Liao 已提交
2020

2021
  // retrieved in subquery failed. OR query cancelled in retrieve phase.
H
Haojun Liao 已提交
2022 2023
  if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) {

H
hjxilinx 已提交
2024 2025
    /*
     * kill current sub-query connection, which may retrieve data from vnodes;
2026
     * Here we get: pPObj->res.code == TSDB_CODE_TSC_QUERY_CANCELLED
H
hjxilinx 已提交
2027 2028 2029
     */
    pSql->res.numOfRows = 0;
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;  // disable retry efforts
2030 2031
    tscDebug("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%s", pParentSql, pSql,
             subqueryIndex, tstrerror(pParentSql->res.code));
H
hjxilinx 已提交
2032
  }
H
Haojun Liao 已提交
2033

H
hjxilinx 已提交
2034
  if (numOfRows >= 0) {  // current query is successful, but other sub query failed, still abort current query.
2035
    tscDebug("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pParentSql, pSql, numOfRows, subqueryIndex);
2036 2037
    tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%s", pParentSql, pSql,
             subqueryIndex, tstrerror(pParentSql->res.code));
H
hjxilinx 已提交
2038
  } else {
H
Haojun Liao 已提交
2039
    if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pParentSql->res.code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2040
      if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
2041 2042 2043
        return;
      }
    } else {  // reach the maximum retry count, abort
H
Haojun Liao 已提交
2044 2045 2046
      atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows);
      tscError("%p sub:%p retrieve failed,code:%s,orderOfSub:%d failed.no more retry,set global code:%s", pParentSql, pSql,
               tstrerror(numOfRows), subqueryIndex, tstrerror(pParentSql->res.code));
H
hjxilinx 已提交
2047 2048
    }
  }
H
Haojun Liao 已提交
2049

D
fix bug  
dapan1121 已提交
2050
  if (!subAndCheckDone(pSql, pParentSql, subqueryIndex)) {
2051
    tscDebug("%p sub:%p,%d freed, not finished, total:%d", pParentSql, pSql, trsupport->subqueryIndex, pState->numOfSub);
H
Haojun Liao 已提交
2052

H
Haojun Liao 已提交
2053
    tscFreeRetrieveSup(pSql);
S
Shengliang Guan 已提交
2054
    return;
2055
  }  
H
hjxilinx 已提交
2056 2057
  
  // all subqueries are failed
H
Haojun Liao 已提交
2058
  tscError("%p retrieve from %d vnode(s) completed,code:%s.FAILED.", pParentSql, pState->numOfSub,
H
Haojun Liao 已提交
2059 2060
      tstrerror(pParentSql->res.code));

H
hjxilinx 已提交
2061
  // release allocated resource
H
Haojun Liao 已提交
2062
  tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel, trsupport->pFFColModel,
H
Haojun Liao 已提交
2063
                            pState->numOfSub);
H
hjxilinx 已提交
2064
  
H
Haojun Liao 已提交
2065
  tscFreeRetrieveSup(pSql);
2066

H
hjxilinx 已提交
2067
  // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
2068 2069 2070 2071 2072 2073
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0);

  if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
    (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code);
  } else {  // regular super table query
    if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2074
      tscAsyncResultOnError(pParentSql);
2075 2076
    }
  }
H
hjxilinx 已提交
2077 2078
}

2079 2080
static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* pSql) {
  int32_t           idx = trsupport->subqueryIndex;
H
Haojun Liao 已提交
2081
  SSqlObj *         pParentSql = trsupport->pParentSql;
2082 2083
  tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
  
H
Haojun Liao 已提交
2084
  SSubqueryState* pState = &pParentSql->subState;
2085 2086
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  
2087 2088
  STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
  
2089
  // data in from current vnode is stored in cache and disk
S
Shengliang Guan 已提交
2090
  uint32_t numOfRowsFromSubquery = (uint32_t)(trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num);
H
Haojun Liao 已提交
2091 2092 2093
  SVgroupsInfo* vgroupsInfo = pTableMetaInfo->vgroupList;
  tscDebug("%p sub:%p all data retrieved from ep:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pParentSql, pSql,
           vgroupsInfo->vgroups[0].epAddr[0].fqdn, vgroupsInfo->vgroups[0].vgId, numOfRowsFromSubquery, idx);
2094 2095 2096 2097
  
  tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity);

#ifdef _DEBUG_VIEW
2098
  printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->num);
2099 2100
    SSrcColumnInfo colInfo[256] = {0};
    tscGetSrcColumnInfo(colInfo, pQueryInfo);
2101 2102
    tColModelDisplayEx(pDesc->pColumnModel, trsupport->localBuffer->data, trsupport->localBuffer->num,
                       trsupport->localBuffer->num, colInfo);
2103 2104
#endif
  
H
Haojun Liao 已提交
2105 2106 2107
  if (tsTotalTmpDirGB != 0 && tsAvailTmpDirectorySpace < tsReservedTmpDirectorySpace) {
    tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pParentSql, pSql,
             tsAvailTmpDirectorySpace, tsReservedTmpDirectorySpace);
S
Shengliang Guan 已提交
2108 2109
    tscAbortFurtherRetryRetrieval(trsupport, pSql, TSDB_CODE_TSC_NO_DISKSPACE);
    return;
2110 2111 2112
  }
  
  // each result for a vnode is ordered as an independant list,
H
Haojun Liao 已提交
2113
  // then used as an input of loser tree for disk-based merge
2114 2115
  int32_t code = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pQueryInfo->groupbyExpr.orderType);
  if (code != 0) { // set no disk space error info, and abort retry
S
Shengliang Guan 已提交
2116 2117
    tscAbortFurtherRetryRetrieval(trsupport, pSql, code);
    return;
2118 2119
  }
  
D
fix bug  
dapan1121 已提交
2120
  if (!subAndCheckDone(pSql, pParentSql, idx)) {
2121
    tscDebug("%p sub:%p orderOfSub:%d freed, not finished", pParentSql, pSql, trsupport->subqueryIndex);
H
Haojun Liao 已提交
2122

H
Haojun Liao 已提交
2123
    tscFreeRetrieveSup(pSql);
S
Shengliang Guan 已提交
2124
    return;
2125
  }  
2126 2127 2128 2129
  
  // all sub-queries are returned, start to local merge process
  pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage;
  
H
Haojun Liao 已提交
2130
  tscDebug("%p retrieve from %d vnodes completed.final NumOfRows:%" PRId64 ",start to build loser tree", pParentSql,
H
Haojun Liao 已提交
2131
           pState->numOfSub, pState->numOfRetrievedRows);
2132
  
H
Haojun Liao 已提交
2133
  SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0);
2134 2135
  tscClearInterpInfo(pPQueryInfo);
  
H
Haojun Liao 已提交
2136
  tscCreateLocalMerger(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, trsupport->pFinalColModel, trsupport->pFFColModel, pParentSql);
H
Haojun Liao 已提交
2137
  tscDebug("%p build loser tree completed", pParentSql);
2138
  
H
Haojun Liao 已提交
2139 2140 2141
  pParentSql->res.precision = pSql->res.precision;
  pParentSql->res.numOfRows = 0;
  pParentSql->res.row = 0;
2142
  
H
Haojun Liao 已提交
2143
  tscFreeRetrieveSup(pSql);
H
Haojun Liao 已提交
2144

2145 2146 2147 2148 2149
  // set the command flag must be after the semaphore been correctly set.
  pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE;
  if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
    (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
  } else {
H
Haojun Liao 已提交
2150
    tscAsyncResultOnError(pParentSql);
2151
  }
2152 2153 2154
}

static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
H
Haojun Liao 已提交
2155 2156 2157 2158
  SSqlObj *pSql = (SSqlObj *)tres;
  assert(pSql != NULL);

  // this query has been freed already
H
hjxilinx 已提交
2159
  SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
H
Haojun Liao 已提交
2160 2161 2162 2163 2164 2165
  if (pSql->param == NULL || param == NULL) {
    tscDebug("%p already freed in dnodecallback", pSql);
    assert(pSql->res.code == TSDB_CODE_TSC_QUERY_CANCELLED);
    return;
  }

H
hjxilinx 已提交
2166
  tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
H
Haojun Liao 已提交
2167
  int32_t           idx   = trsupport->subqueryIndex;
H
Haojun Liao 已提交
2168
  SSqlObj *         pParentSql = trsupport->pParentSql;
H
Haojun Liao 已提交
2169

H
Haojun Liao 已提交
2170
  SSubqueryState* pState = &pParentSql->subState;
H
hjxilinx 已提交
2171
  
H
Haojun Liao 已提交
2172
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
TD-1732  
Shengliang Guan 已提交
2173
  SVgroupInfo  *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
H
Haojun Liao 已提交
2174 2175 2176

  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
2177
    tscDebug("%p query cancelled/failed, sub:%p, vgId:%d, orderOfSub:%d, code:%s, global code:%s",
H
Haojun Liao 已提交
2178 2179 2180 2181 2182 2183 2184 2185 2186
             pParentSql, pSql, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(numOfRows), tstrerror(pParentSql->res.code));

    tscHandleSubqueryError(param, tres, numOfRows);
    return;
  }

  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(numOfRows == taos_errno(pSql));

H
Haojun Liao 已提交
2187 2188 2189 2190
    if (numOfRows == TSDB_CODE_TSC_QUERY_CANCELLED) {
      trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
    }

H
Haojun Liao 已提交
2191
    if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
2192
      tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(numOfRows), trsupport->numOfRetry);
H
Haojun Liao 已提交
2193 2194 2195 2196 2197

      if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) {
        return;
      }
    } else {
H
Haojun Liao 已提交
2198
      tscDebug("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(numOfRows));
H
Haojun Liao 已提交
2199 2200 2201 2202 2203
      atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows);  // set global code and abort
    }

    tscHandleSubqueryError(param, tres, numOfRows);
    return;
H
hjxilinx 已提交
2204 2205 2206 2207 2208 2209 2210 2211 2212
  }
  
  SSqlRes *   pRes = &pSql->res;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  
  if (numOfRows > 0) {
    assert(pRes->numOfRows == numOfRows);
    int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows);
    
H
Haojun Liao 已提交
2213
    tscDebug("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%" PRIu64 " from ep:%s, orderOfSub:%d", pParentSql, pSql,
2214
             pRes->numOfRows, pState->numOfRetrievedRows, pSql->epSet.fqdn[pSql->epSet.inUse], idx);
2215

H
hjxilinx 已提交
2216
    if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
B
Bomin Zhang 已提交
2217
      tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64,
H
Haojun Liao 已提交
2218
               pParentSql, pSql, tsMaxNumOfOrderedResults, num);
S
Shengliang Guan 已提交
2219 2220
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_SORTED_RES_TOO_MANY);
      return;
H
hjxilinx 已提交
2221 2222 2223
    }

#ifdef _DEBUG_VIEW
2224
    printf("received data from vnode: %"PRIu64" rows\n", pRes->numOfRows);
H
hjxilinx 已提交
2225 2226 2227 2228 2229
    SSrcColumnInfo colInfo[256] = {0};

    tscGetSrcColumnInfo(colInfo, pQueryInfo);
    tColModelDisplayEx(pDesc->pColumnModel, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo);
#endif
2230
    
H
Haojun Liao 已提交
2231 2232 2233 2234
    // no disk space for tmp directory
    if (tsTotalTmpDirGB != 0 && tsAvailTmpDirectorySpace < tsReservedTmpDirectorySpace) {
      tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pParentSql, pSql,
               tsAvailTmpDirectorySpace, tsReservedTmpDirectorySpace);
S
Shengliang Guan 已提交
2235 2236
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_NO_DISKSPACE);
      return;
H
hjxilinx 已提交
2237 2238 2239
    }
    
    int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data,
H
Haojun Liao 已提交
2240
                               pRes->numOfRows, pQueryInfo->groupbyExpr.orderType);
2241
    if (ret != 0) { // set no disk space error info, and abort retry
2242
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_NO_DISKSPACE);
2243 2244 2245 2246
    } else if (pRes->completed) {
      tscAllDataRetrievedFromDnode(trsupport, pSql);
    } else { // continue fetch data from dnode
      taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
H
hjxilinx 已提交
2247
    }
2248
    
2249 2250
  } else { // all data has been retrieved to client
    tscAllDataRetrievedFromDnode(trsupport, pSql);
H
hjxilinx 已提交
2251 2252 2253
  }
}

2254
static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
H
hjxilinx 已提交
2255 2256
  const int32_t table_index = 0;
  
2257
  SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, TSDB_SQL_SELECT, prevSqlObj);
H
hjxilinx 已提交
2258 2259
  if (pNew != NULL) {  // the sub query of two-stage super table query
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
2260

H
hjxilinx 已提交
2261
    pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
H
Haojun Liao 已提交
2262
    assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1 && trsupport->subqueryIndex < pSql->subState.numOfSub);
H
hjxilinx 已提交
2263
    
H
hjxilinx 已提交
2264
    // launch subquery for each vnode, so the subquery index equals to the vgroupIndex.
H
hjxilinx 已提交
2265
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index);
H
hjxilinx 已提交
2266
    pTableMetaInfo->vgroupIndex = trsupport->subqueryIndex;
H
hjxilinx 已提交
2267 2268 2269 2270 2271 2272 2273
    
    pSql->pSubs[trsupport->subqueryIndex] = pNew;
  }
  
  return pNew;
}

2274
// todo there is are race condition in this function, while cancel is called by user.
H
hjxilinx 已提交
2275
void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
2276 2277 2278 2279 2280 2281 2282
  // the param may be null, since it may be done by other query threads. and the asyncOnError may enter in this
  // function while kill query by a user.
  if (param == NULL) {
    assert(code != TSDB_CODE_SUCCESS);
    return;
  }

2283
  SRetrieveSupport *trsupport = (SRetrieveSupport *) param;
H
hjxilinx 已提交
2284
  
H
Haojun Liao 已提交
2285
  SSqlObj*  pParentSql = trsupport->pParentSql;
2286
  SSqlObj*  pSql = (SSqlObj *) tres;
2287

2288 2289
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  assert(pSql->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1);
H
hjxilinx 已提交
2290
  
2291
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
TD-1732  
Shengliang Guan 已提交
2292
  SVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[trsupport->subqueryIndex];
H
Haojun Liao 已提交
2293

H
Haojun Liao 已提交
2294
  // stable query killed or other subquery failed, all query stopped
H
Haojun Liao 已提交
2295
  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
2296
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
H
Haojun Liao 已提交
2297
    tscError("%p query cancelled or failed, sub:%p, vgId:%d, orderOfSub:%d, code:%s, global code:%s",
H
Haojun Liao 已提交
2298 2299 2300 2301
        pParentSql, pSql, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(code), tstrerror(pParentSql->res.code));

    tscHandleSubqueryError(param, tres, code);
    return;
H
hjxilinx 已提交
2302 2303 2304
  }
  
  /*
H
Haojun Liao 已提交
2305
   * if a subquery on a vnode failed, all retrieve operations from vnode that occurs later
2306
   * than this one are actually not necessary, we simply call the tscRetrieveFromDnodeCallBack
H
hjxilinx 已提交
2307 2308
   * function to abort current and remain retrieve process.
   *
2309
   * NOTE: thread safe is required.
H
hjxilinx 已提交
2310
   */
H
Haojun Liao 已提交
2311 2312
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(code == taos_errno(pSql));
2313

H
Haojun Liao 已提交
2314
    if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
2315
      tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry);
H
Haojun Liao 已提交
2316
      if (tscReissueSubquery(trsupport, pSql, code) == TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
2317 2318
        return;
      }
2319
    } else {
H
Haojun Liao 已提交
2320
      tscError("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(code));
H
Haojun Liao 已提交
2321
      atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code);  // set global code and abort
2322
    }
H
Haojun Liao 已提交
2323 2324 2325 2326 2327

    tscHandleSubqueryError(param, tres, pParentSql->res.code);
    return;
  }

H
Haojun Liao 已提交
2328
  tscDebug("%p sub:%p query complete, ep:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSql, pSql,
2329
             pVgroup->epAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex);
H
Haojun Liao 已提交
2330 2331 2332 2333 2334

  if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode
    tscRetrieveFromDnodeCallBack(param, pSql, 0);
  } else {
    taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
H
hjxilinx 已提交
2335 2336 2337
  }
}

2338 2339
static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) {
  if (pParentObj->retry > pParentObj->maxRetry) {
H
Haojun Liao 已提交
2340
    tscError("%p max retry reached, abort the retry effort", pParentObj);
2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360
    return false;
  }

  for (int32_t i = 0; i < numOfSub; ++i) {
    int32_t code = pParentObj->pSubs[i]->res.code;
    if (code == TSDB_CODE_SUCCESS) {
      continue;
    }

    if (code != TSDB_CODE_TDB_TABLE_RECONFIGURE && code != TSDB_CODE_TDB_INVALID_TABLE_ID &&
        code != TSDB_CODE_VND_INVALID_VGROUP_ID && code != TSDB_CODE_RPC_NETWORK_UNAVAIL &&
        code != TSDB_CODE_APP_NOT_READY) {
      pParentObj->res.code = code;
      return false;
    }
  }

  return true;
}

H
Haojun Liao 已提交
2361 2362 2363 2364 2365 2366 2367 2368 2369
static void doFreeInsertSupporter(SSqlObj* pSqlObj) {
  assert(pSqlObj != NULL && pSqlObj->subState.numOfSub > 0);

  for(int32_t i = 0; i < pSqlObj->subState.numOfSub; ++i) {
    SSqlObj* pSql = pSqlObj->pSubs[i];
    tfree(pSql->param);
  }
}

H
Haojun Liao 已提交
2370
static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) {
H
hjxilinx 已提交
2371 2372
  SInsertSupporter *pSupporter = (SInsertSupporter *)param;
  SSqlObj* pParentObj = pSupporter->pSql;
H
Haojun Liao 已提交
2373

H
Haojun Liao 已提交
2374
  // record the total inserted rows
H
Haojun Liao 已提交
2375 2376
  if (numOfRows > 0) {
    pParentObj->res.numOfRows += numOfRows;
H
Haojun Liao 已提交
2377 2378
  }

H
Haojun Liao 已提交
2379
  if (taos_errno(tres) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2380 2381 2382 2383
    SSqlObj* pSql = (SSqlObj*) tres;
    assert(pSql != NULL && pSql->res.code == numOfRows);
    
    pParentObj->res.code = pSql->res.code;
H
Haojun Liao 已提交
2384

2385 2386 2387 2388 2389
    // set the flag in the parent sqlObj
    if (pSql->cmd.submitSchema) {
      pParentObj->cmd.submitSchema = 1;
    }
  }
H
Haojun Liao 已提交
2390

D
fix bug  
dapan1121 已提交
2391
  if (!subAndCheckDone(tres, pParentObj, pSupporter->index)) {
2392
    tscDebug("%p insert:%p,%d completed, total:%d", pParentObj, tres, pSupporter->index, pParentObj->subState.numOfSub);
H
hjxilinx 已提交
2393 2394
    return;
  }
H
Haojun Liao 已提交
2395

H
hjxilinx 已提交
2396 2397
  // restore user defined fp
  pParentObj->fp = pParentObj->fetchFp;
2398
  int32_t numOfSub = pParentObj->subState.numOfSub;
2399
  doFreeInsertSupporter(pParentObj);
2400 2401 2402 2403 2404 2405 2406 2407 2408

  if (pParentObj->res.code == TSDB_CODE_SUCCESS) {
    tscDebug("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows);

    // todo remove this parameter in async callback function definition.
    // all data has been sent to vnode, call user function
    int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows;
    (*pParentObj->fp)(pParentObj->param, pParentObj, v);
  } else {
2409
    if (!needRetryInsert(pParentObj, numOfSub)) {
H
Haojun Liao 已提交
2410
      tscAsyncResultOnError(pParentObj);
2411 2412
      return;
    }
2413

2414
    int32_t numOfFailed = 0;
2415 2416 2417 2418 2419 2420
    for(int32_t i = 0; i < numOfSub; ++i) {
      SSqlObj* pSql = pParentObj->pSubs[i];
      if (pSql->res.code != TSDB_CODE_SUCCESS) {
        numOfFailed += 1;

        // clean up tableMeta in cache
2421
        tscFreeQueryInfo(&pSql->cmd);
2422 2423
        SQueryInfo* pQueryInfo = tscGetQueryInfoDetailSafely(&pSql->cmd, 0);
        STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, pSql->cmd.clauseIndex, 0);
2424
        tscAddTableMetaInfo(pQueryInfo, &pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL);
2425

2426 2427
        subquerySetState(pSql, &pParentObj->subState, i, 0);

2428 2429 2430 2431 2432 2433 2434
        tscDebug("%p, failed sub:%d, %p", pParentObj, i, pSql);
      }
    }

    tscError("%p Async insertion completed, total inserted:%d rows, numOfFailed:%d, numOfTotal:%d", pParentObj,
             pParentObj->res.numOfRows, numOfFailed, numOfSub);

2435
    tscDebug("%p cleanup %d tableMeta in hashTable", pParentObj, pParentObj->cmd.numOfTables);
2436
    for(int32_t i = 0; i < pParentObj->cmd.numOfTables; ++i) {
2437 2438
      char name[TSDB_TABLE_FNAME_LEN] = {0};
      tNameExtractFullName(pParentObj->cmd.pTableNameList[i], name);
2439
      taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
2440
    }
2441

2442 2443
    pParentObj->cmd.parseFinished = false;

2444
    tscResetSqlCmd(&pParentObj->cmd, false);
2445

H
Haojun Liao 已提交
2446 2447 2448
    // in case of insert, redo parsing the sql string and build new submit data block for two reasons:
    // 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly.
    // 2. vnode may need the schema information along with submit block to update its local table schema.
S
TD-2475  
Shengliang Guan 已提交
2449 2450 2451
    tscDebug("%p re-parse sql to generate submit data, retry:%d", pParentObj, pParentObj->retry);
    pParentObj->retry++;

2452 2453 2454 2455 2456
    int32_t code = tsParseSql(pParentObj, true);
    if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;

    if (code != TSDB_CODE_SUCCESS) {
      pParentObj->res.code = code;
H
Haojun Liao 已提交
2457
      tscAsyncResultOnError(pParentObj);
2458 2459 2460 2461 2462
      return;
    }

    tscDoQuery(pParentObj);
  }
2463 2464 2465 2466 2467 2468 2469
}

/**
 * it is a subquery, so after parse the sql string, copy the submit block to payload of itself
 * @param pSql
 * @return
 */
2470
int32_t tscHandleInsertRetry(SSqlObj* pParent, SSqlObj* pSql) {
2471 2472 2473 2474
  assert(pSql != NULL && pSql->param != NULL);
  SSqlRes* pRes = &pSql->res;

  SInsertSupporter* pSupporter = (SInsertSupporter*) pSql->param;
H
Haojun Liao 已提交
2475
  assert(pSupporter->index < pSupporter->pSql->subState.numOfSub);
2476

2477
  STableDataBlocks* pTableDataBlock = taosArrayGetP(pParent->cmd.pDataBlocks, pSupporter->index);
H
Haojun Liao 已提交
2478 2479 2480
  int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock);

  if ((pRes->code = code)!= TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2481
    tscAsyncResultOnError(pSql);
H
Haojun Liao 已提交
2482
    return code;  // here the pSql may have been released already.
2483 2484 2485
  }

  return tscProcessSql(pSql);
H
hjxilinx 已提交
2486 2487 2488 2489
}

int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
H
Haojun Liao 已提交
2490 2491
  SSqlRes *pRes = &pSql->res;

2492 2493 2494 2495
  // it is the failure retry insert
  if (pSql->pSubs != NULL) {
    for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
      SSqlObj* pSub = pSql->pSubs[i];
2496 2497 2498
      SInsertSupporter* pSup = calloc(1, sizeof(SInsertSupporter));
      pSup->index = i;
      pSup->pSql = pSql;
2499

2500
      pSub->param = pSup;
2501 2502 2503 2504 2505 2506 2507 2508 2509
      tscDebug("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, i);
      if (pSub->res.code != TSDB_CODE_SUCCESS) {
        tscHandleInsertRetry(pSql, pSub);
      }
    }

    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
2510 2511
  pSql->subState.numOfSub = (uint16_t)taosArrayGetSize(pCmd->pDataBlocks);
  assert(pSql->subState.numOfSub > 0);
H
Haojun Liao 已提交
2512 2513

  pRes->code = TSDB_CODE_SUCCESS;
2514

H
Haojun Liao 已提交
2515 2516 2517
  // the number of already initialized subqueries
  int32_t numOfSub = 0;

2518 2519 2520 2521 2522 2523
  if (pSql->subState.states == NULL) {
    pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states));
    if (pSql->subState.states == NULL) {
      pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      goto _error;
    }
D
fix bug  
dapan1121 已提交
2524 2525

    pthread_mutex_init(&pSql->subState.mutex, NULL);
2526 2527 2528
  }

  memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub);
D
fix bug  
dapan1121 已提交
2529
  tscDebug("%p reset all sub states to 0", pSql);
2530

H
Haojun Liao 已提交
2531
  pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
H
Haojun Liao 已提交
2532 2533 2534 2535
  if (pSql->pSubs == NULL) {
    goto _error;
  }

H
Haojun Liao 已提交
2536
  tscDebug("%p submit data to %d vnode(s)", pSql, pSql->subState.numOfSub);
2537

H
Haojun Liao 已提交
2538
  while(numOfSub < pSql->subState.numOfSub) {
H
Haojun Liao 已提交
2539
    SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter));
H
Haojun Liao 已提交
2540 2541 2542 2543
    if (pSupporter == NULL) {
      goto _error;
    }

2544 2545 2546 2547
    pSupporter->pSql   = pSql;
    pSupporter->index  = numOfSub;

    SSqlObj *pNew = createSimpleSubObj(pSql, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT);
H
hjxilinx 已提交
2548
    if (pNew == NULL) {
H
Haojun Liao 已提交
2549
      tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, numOfSub, strerror(errno));
H
Haojun Liao 已提交
2550
      goto _error;
H
hjxilinx 已提交
2551
    }
2552 2553 2554
  
    /*
     * assign the callback function to fetchFp to make sure that the error process function can restore
H
Haojun Liao 已提交
2555
     * the callback function (multiVnodeInsertFinalize) correctly.
2556 2557
     */
    pNew->fetchFp = pNew->fp;
H
Haojun Liao 已提交
2558
    pSql->pSubs[numOfSub] = pNew;
H
Haojun Liao 已提交
2559

2560 2561
    STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, numOfSub);
    pRes->code = tscCopyDataBlockToPayload(pNew, pTableDataBlock);
H
Haojun Liao 已提交
2562
    if (pRes->code == TSDB_CODE_SUCCESS) {
2563
      tscDebug("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, numOfSub);
2564
      numOfSub++;
H
Haojun Liao 已提交
2565
    } else {
H
Haojun Liao 已提交
2566
      tscDebug("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%s", pSql, numOfSub,
H
Haojun Liao 已提交
2567
               pSql->subState.numOfSub, tstrerror(pRes->code));
H
Haojun Liao 已提交
2568
      goto _error;
H
Haojun Liao 已提交
2569
    }
H
hjxilinx 已提交
2570 2571
  }
  
H
Haojun Liao 已提交
2572
  if (numOfSub < pSql->subState.numOfSub) {
H
hjxilinx 已提交
2573
    tscError("%p failed to prepare subObj structure and launch sub-insertion", pSql);
2574
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
Haojun Liao 已提交
2575
    goto _error;
H
hjxilinx 已提交
2576
  }
H
Haojun Liao 已提交
2577

2578 2579
  pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);

H
Haojun Liao 已提交
2580 2581
  // use the local variable
  for (int32_t j = 0; j < numOfSub; ++j) {
H
hjxilinx 已提交
2582
    SSqlObj *pSub = pSql->pSubs[j];
2583
    tscDebug("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, j);
H
hjxilinx 已提交
2584 2585
    tscProcessSql(pSub);
  }
H
Haojun Liao 已提交
2586

H
hjxilinx 已提交
2587
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2588 2589 2590

  _error:
  return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
2591
}
H
hjxilinx 已提交
2592

H
Haojun Liao 已提交
2593 2594 2595
static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t columnIndex, int16_t* bytes) {
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

H
Haojun Liao 已提交
2596
  SInternalField* pInfo = (SInternalField*) TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, columnIndex);
H
Haojun Liao 已提交
2597 2598 2599
  assert(pInfo->pSqlExpr != NULL);

  *bytes = pInfo->pSqlExpr->resBytes;
H
Haojun Liao 已提交
2600
  char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows + pRes->row * (*bytes);
H
Haojun Liao 已提交
2601 2602 2603 2604 2605 2606 2607 2608 2609 2610

  return pData;
}

static void doBuildResFromSubqueries(SSqlObj* pSql) {
  SSqlRes* pRes = &pSql->res;

  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);

  int32_t numOfRes = INT32_MAX;
H
Haojun Liao 已提交
2611
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
Haojun Liao 已提交
2612 2613
    SSqlObj* pSub = pSql->pSubs[i];
    if (pSub == NULL) {
H
Haojun Liao 已提交
2614 2615 2616
      continue;
    }

S
Shengliang Guan 已提交
2617
    int32_t remain = (int32_t)(pSub->res.numOfRows - pSub->res.row);
H
Haojun Liao 已提交
2618
    numOfRes = (int32_t)(MIN(numOfRes, remain));
H
Haojun Liao 已提交
2619 2620
  }

H
Haojun Liao 已提交
2621 2622
  if (numOfRes == 0) {  // no result any more, free all subquery objects
    freeJoinSubqueryObj(pSql);
H
Haojun Liao 已提交
2623 2624 2625
    return;
  }

H
Haojun Liao 已提交
2626
  int32_t rowSize = tscGetResRowLength(pQueryInfo->exprList);
H
Haojun Liao 已提交
2627

H
Haojun Liao 已提交
2628 2629
  assert(numOfRes * rowSize > 0);
  char* tmp = realloc(pRes->pRsp, numOfRes * rowSize + sizeof(tFilePage));
H
Haojun Liao 已提交
2630 2631 2632 2633 2634 2635 2636
  if (tmp == NULL) {
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return;
  } else {
    pRes->pRsp = tmp;
  }

H
Haojun Liao 已提交
2637 2638
  tFilePage* pFilePage = (tFilePage*) pRes->pRsp;
  pFilePage->num = numOfRes;
H
Haojun Liao 已提交
2639

H
Haojun Liao 已提交
2640
  pRes->data = pFilePage->data;
H
Haojun Liao 已提交
2641
  char* data = pRes->data;
H
Haojun Liao 已提交
2642

H
Haojun Liao 已提交
2643 2644 2645 2646 2647
  int16_t bytes = 0;

  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
  for(int32_t i = 0; i < numOfExprs; ++i) {
    SColumnIndex* pIndex = &pRes->pColumnIndex[i];
H
Haojun Liao 已提交
2648 2649
    SSqlRes*      pRes1 = &pSql->pSubs[pIndex->tableIndex]->res;
    SSqlCmd*      pCmd1 = &pSql->pSubs[pIndex->tableIndex]->cmd;
H
Haojun Liao 已提交
2650 2651 2652 2653 2654

    char* pData = getResultBlockPosition(pCmd1, pRes1, pIndex->columnIndex, &bytes);
    memcpy(data, pData, bytes * numOfRes);

    data += bytes * numOfRes;
H
Haojun Liao 已提交
2655 2656 2657 2658 2659 2660 2661 2662 2663 2664
  }

  for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
    SSqlObj* pSub = pSql->pSubs[i];
    if (pSub == NULL) {
      continue;
    }

    pSub->res.row += numOfRes;
    assert(pSub->res.row <= pSub->res.numOfRows);
H
Haojun Liao 已提交
2665 2666 2667 2668
  }

  pRes->numOfRows = numOfRes;
  pRes->numOfClauseTotal += numOfRes;
H
Haojun Liao 已提交
2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679

  int32_t finalRowSize = 0;
  for(int32_t i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
    TAOS_FIELD* pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
    finalRowSize += pField->bytes;
  }

  doArithmeticCalculate(pQueryInfo, pFilePage, rowSize, finalRowSize);

  pRes->data = pFilePage->data;
  tscSetResRawPtr(pRes, pQueryInfo);
H
Haojun Liao 已提交
2680 2681
}

H
hjxilinx 已提交
2682
void tscBuildResFromSubqueries(SSqlObj *pSql) {
H
Haojun Liao 已提交
2683 2684
  SSqlRes* pRes = &pSql->res;

H
hjxilinx 已提交
2685
  if (pRes->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2686
    tscAsyncResultOnError(pSql);
H
hjxilinx 已提交
2687 2688
    return;
  }
H
Haojun Liao 已提交
2689 2690 2691

  if (pRes->tsrow == NULL) {
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
H
Haojun Liao 已提交
2692
    pRes->numOfCols = (int16_t) tscSqlExprNumOfExprs(pQueryInfo);
H
Haojun Liao 已提交
2693

H
Haojun Liao 已提交
2694 2695 2696 2697
    pRes->tsrow  = calloc(pRes->numOfCols, POINTER_BYTES);
    pRes->urow   = calloc(pRes->numOfCols, POINTER_BYTES);
    pRes->buffer = calloc(pRes->numOfCols, POINTER_BYTES);
    pRes->length = calloc(pRes->numOfCols, sizeof(int32_t));
H
Haojun Liao 已提交
2698

H
Haojun Liao 已提交
2699 2700
    if (pRes->tsrow == NULL || pRes->buffer == NULL || pRes->length == NULL) {
      pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
Haojun Liao 已提交
2701
      tscAsyncResultOnError(pSql);
H
Haojun Liao 已提交
2702 2703 2704
      return;
    }

H
Haojun Liao 已提交
2705 2706 2707
    tscRestoreSQLFuncForSTableQuery(pQueryInfo);
  }

H
Haojun Liao 已提交
2708 2709 2710 2711 2712
  assert (pRes->row >= pRes->numOfRows);
  doBuildResFromSubqueries(pSql);
  if (pRes->code == TSDB_CODE_SUCCESS) {
    (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
  } else {
H
Haojun Liao 已提交
2713
    tscAsyncResultOnError(pSql);
H
hjxilinx 已提交
2714 2715 2716
  }
}

2717
static UNUSED_FUNC void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pField) {
H
hjxilinx 已提交
2718 2719
  SSqlRes *pRes = &pSql->res;
  
H
Haojun Liao 已提交
2720
  if (pRes->tsrow[columnIndex] != NULL && pField->type == TSDB_DATA_TYPE_NCHAR) {
H
hjxilinx 已提交
2721 2722 2723 2724 2725 2726 2727 2728
    // convert unicode to native code in a temporary buffer extra one byte for terminated symbol
    if (pRes->buffer[columnIndex] == NULL) {
      pRes->buffer[columnIndex] = malloc(pField->bytes + TSDB_NCHAR_SIZE);
    }
    
    /* string terminated char for binary data*/
    memset(pRes->buffer[columnIndex], 0, pField->bytes + TSDB_NCHAR_SIZE);
    
2729 2730
    int32_t length = taosUcs4ToMbs(pRes->tsrow[columnIndex], pRes->length[columnIndex], pRes->buffer[columnIndex]);
    if ( length >= 0 ) {
2731
      pRes->tsrow[columnIndex] = (unsigned char*)pRes->buffer[columnIndex];
2732
      pRes->length[columnIndex] = length;
H
hjxilinx 已提交
2733
    } else {
B
Bomin Zhang 已提交
2734
      tscError("%p charset:%s to %s. val:%s convert failed.", pSql, DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)pRes->tsrow[columnIndex]);
H
hjxilinx 已提交
2735
      pRes->tsrow[columnIndex] = NULL;
2736
      pRes->length[columnIndex] = 0;
H
hjxilinx 已提交
2737 2738 2739 2740
    }
  }
}

H
Haojun Liao 已提交
2741
char *getArithmeticInputSrc(void *param, const char *name, int32_t colId) {
2742 2743 2744 2745 2746 2747 2748
  SArithmeticSupport *pSupport = (SArithmeticSupport *) param;

  int32_t index = -1;
  SSqlExpr* pExpr = NULL;
  
  for (int32_t i = 0; i < pSupport->numOfCols; ++i) {
    pExpr = taosArrayGetP(pSupport->exprList, i);
B
Bomin Zhang 已提交
2749
    if (strncmp(name, pExpr->aliasName, sizeof(pExpr->aliasName) - 1) == 0) {
2750 2751 2752 2753 2754 2755 2756 2757 2758
      index = i;
      break;
    }
  }

  assert(index >= 0 && index < pSupport->numOfCols);
  return pSupport->data[index] + pSupport->offset * pExpr->resBytes;
}

2759
TAOS_ROW doSetResultRowData(SSqlObj *pSql) {
H
hjxilinx 已提交
2760 2761
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
H
Haojun Liao 已提交
2762

H
hjxilinx 已提交
2763 2764
  assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows);
  if (pRes->row >= pRes->numOfRows) {  // all the results has returned to invoker
S
TD-1848  
Shengliang Guan 已提交
2765
    tfree(pRes->tsrow);
H
hjxilinx 已提交
2766 2767
    return pRes->tsrow;
  }
H
Haojun Liao 已提交
2768

H
hjxilinx 已提交
2769
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
Haojun Liao 已提交
2770

H
Haojun Liao 已提交
2771 2772
  size_t size = tscNumOfFields(pQueryInfo);
  for (int i = 0; i < size; ++i) {
2773
    SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
H
Haojun Liao 已提交
2774

H
Haojun Liao 已提交
2775
    int32_t type  = pInfo->field.type;
2776
    int32_t bytes = pInfo->field.bytes;
H
Haojun Liao 已提交
2777

2778 2779 2780 2781 2782
    if (type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR) {
      pRes->tsrow[i] = isNull(pRes->urow[i], type) ? NULL : pRes->urow[i];
    } else {
      pRes->tsrow[i] = isNull(pRes->urow[i], type) ? NULL : varDataVal(pRes->urow[i]);
      pRes->length[i] = varDataLen(pRes->urow[i]);
H
hjxilinx 已提交
2783
    }
H
Haojun Liao 已提交
2784

H
Haojun Liao 已提交
2785
    ((char**) pRes->urow)[i] += bytes;
H
hjxilinx 已提交
2786
  }
H
Haojun Liao 已提交
2787

H
hjxilinx 已提交
2788 2789 2790 2791
  pRes->row++;  // index increase one-step
  return pRes->tsrow;
}

H
Haojun Liao 已提交
2792
static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
H
hjxilinx 已提交
2793 2794 2795 2796 2797 2798 2799
  bool     hasData = true;
  SSqlCmd *pCmd = &pSql->cmd;
  
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
    bool allSubqueryExhausted = true;
    
H
Haojun Liao 已提交
2800
    for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825
      if (pSql->pSubs[i] == NULL) {
        continue;
      }
      
      SSqlRes *pRes1 = &pSql->pSubs[i]->res;
      SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd;
      
      SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(pCmd1, pCmd1->clauseIndex);
      assert(pQueryInfo1->numOfTables == 1);
      
      STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo1, 0);
      
      /*
       * if the global limitation is not reached, and current result has not exhausted, or next more vnodes are
       * available, goes on
       */
      if (pTableMetaInfo->vgroupIndex < pTableMetaInfo->vgroupList->numOfVgroups && pRes1->row < pRes1->numOfRows &&
          (!tscHasReachLimitation(pQueryInfo1, pRes1))) {
        allSubqueryExhausted = false;
        break;
      }
    }
    
    hasData = !allSubqueryExhausted;
  } else {  // otherwise, in case inner join, if any subquery exhausted, query completed.
H
Haojun Liao 已提交
2826
    for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
2827 2828 2829 2830 2831 2832 2833 2834
      if (pSql->pSubs[i] == 0) {
        continue;
      }
      
      SSqlRes *   pRes1 = &pSql->pSubs[i]->res;
      SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0);
      
      if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pQueryInfo1, pRes1) &&
H
Haojun Liao 已提交
2835
          tscIsProjectionQuery(pQueryInfo1)) || (pRes1->numOfRows == 0)) {
H
hjxilinx 已提交
2836 2837 2838 2839 2840 2841 2842 2843
        hasData = false;
        break;
      }
    }
  }
  
  return hasData;
}