tscSubquery.c 114.1 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
H
Hui Li 已提交
15
#define _GNU_SOURCE
16

H
Haojun Liao 已提交
17
#include "os.h"
H
hjxilinx 已提交
18

H
Haojun Liao 已提交
19
#include "texpr.h"
H
Haojun Liao 已提交
20
#include "qTsbuf.h"
H
Haojun Liao 已提交
21
#include "tcompare.h"
S
slguan 已提交
22
#include "tscLog.h"
H
Haojun Liao 已提交
23 24
#include "tscSubquery.h"
#include "tschemautil.h"
25
#include "tsclient.h"
26
#include "qUtil.h"
H
hjxilinx 已提交
27 28 29

typedef struct SInsertSupporter {
  SSqlObj*  pSql;
30
  int32_t   index;
H
hjxilinx 已提交
31 32
} SInsertSupporter;

H
hjxilinx 已提交
33
static void freeJoinSubqueryObj(SSqlObj* pSql);
H
Haojun Liao 已提交
34
static bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql);
H
hjxilinx 已提交
35

H
Haojun Liao 已提交
36 37 38 39 40
static int32_t tsCompare(int32_t order, int64_t left, int64_t right) {
  if (left == right) {
    return 0;
  }

41
  if (order == TSDB_ORDER_ASC) {
H
Haojun Liao 已提交
42
    return left < right? -1:1;
H
hjxilinx 已提交
43
  } else {
H
Haojun Liao 已提交
44
    return left > right? -1:1;
H
hjxilinx 已提交
45 46 47
  }
}

48
static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) {
D
fix bug  
dapan1121 已提交
49 50 51 52 53 54 55
  STSElem el1 = tsBufGetElem(pTSBuf);

  int32_t res = tVariantCompare(el1.tag, tag1);
  if (res != 0) { // it is a record with new tag
    return;
  }

56 57 58 59 60 61 62 63 64 65
  while (tsBufNextPos(pTSBuf)) {
    STSElem el1 = tsBufGetElem(pTSBuf);

    int32_t res = tVariantCompare(el1.tag, tag1);
    if (res != 0) { // it is a record with new tag
      return;
    }
  }
}

66 67 68 69
static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, int8_t state) {
  assert(idx < subState->numOfSub);
  assert(subState->states);

D
fix bug  
dapan1121 已提交
70 71
  pthread_mutex_lock(&subState->mutex);
  
72 73 74 75
  tscDebug("subquery:%p,%d state set to %d", pSql, idx, state);
  
  subState->states[idx] = state;

D
fix bug  
dapan1121 已提交
76
  pthread_mutex_unlock(&subState->mutex);
77 78
}

D
fix bug  
dapan1121 已提交
79
static bool allSubqueryDone(SSqlObj *pParentSql) {
80
  bool done = true;
D
fix bug  
dapan1121 已提交
81
  SSubqueryState *subState = &pParentSql->subState;
82 83

  //lock in caller
H
Haojun Liao 已提交
84
  tscDebug("0x%"PRIx64" total subqueries: %d", pParentSql->self, subState->numOfSub);
85 86
  for (int i = 0; i < subState->numOfSub; i++) {
    if (0 == subState->states[i]) {
H
Haojun Liao 已提交
87
      tscDebug("0x%"PRIx64" subquery:%p, index: %d NOT finished, abort query completion check", pParentSql->self, pParentSql->pSubs[i], i);
88 89 90
      done = false;
      break;
    } else {
H
Haojun Liao 已提交
91
      tscDebug("0x%"PRIx64" subquery:%p, index: %d finished", pParentSql->self, pParentSql->pSubs[i], i);
92 93 94 95 96 97
    }
  }

  return done;
}

D
fix bug  
dapan1121 已提交
98 99 100
static bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
  SSubqueryState *subState = &pParentSql->subState;

101 102
  assert(idx < subState->numOfSub);

D
fix bug  
dapan1121 已提交
103
  pthread_mutex_lock(&subState->mutex);
104

D
fix bug  
dapan1121 已提交
105 106 107
  bool done = allSubqueryDone(pParentSql);

  if (done) {
H
Haojun Liao 已提交
108
    tscDebug("0x%"PRIx64" subquery:%p,%d all subs already done", pParentSql->self, pSql, idx);
D
fix bug  
dapan1121 已提交
109 110 111 112 113 114
    
    pthread_mutex_unlock(&subState->mutex);
    
    return false;
  }
  
H
Haojun Liao 已提交
115
  tscDebug("0x%"PRIx64" subquery:%p,%d state set to 1", pParentSql->self, pSql, idx);
116 117 118
  
  subState->states[idx] = 1;

D
fix bug  
dapan1121 已提交
119
  done = allSubqueryDone(pParentSql);
120

D
fix bug  
dapan1121 已提交
121
  pthread_mutex_unlock(&subState->mutex);
122 123 124 125 126 127

  return done;
}



D
dapan1121 已提交
128 129 130 131 132 133 134 135 136
static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);

  win->skey = INT64_MAX;
  win->ekey = INT64_MIN;

  SLimitVal* pLimit = &pQueryInfo->limit;
  int32_t    order = pQueryInfo->order.order;
  int32_t    joinNum = pSql->subState.numOfSub;
D
dapan1121 已提交
137
  SMergeTsCtx ctxlist[TSDB_MAX_JOIN_TABLE_NUM] = {{0}};
D
dapan1121 已提交
138
  SMergeTsCtx* ctxStack[TSDB_MAX_JOIN_TABLE_NUM] = {0};
D
fix bug  
dapan1121 已提交
139
  int32_t slot = 0;
D
dapan1121 已提交
140
  size_t tableNum = 0;
D
fix bug  
dapan1121 已提交
141 142 143 144 145 146 147 148 149 150 151
  int16_t* tableMIdx = 0;
  int32_t equalNum = 0;
  int32_t stackidx = 0;
  SMergeTsCtx* ctx = NULL;
  SMergeTsCtx* pctx = NULL;  
  SMergeTsCtx* mainCtx = NULL;
  STSElem cur;
  STSElem prev;
  SArray*   tsCond = NULL;
  int32_t mergeDone = 0;

D
dapan1121 已提交
152 153 154 155 156 157 158 159
  for (int32_t i = 0; i < joinNum; ++i) {
    STSBuf* output = tsBufCreate(true, pQueryInfo->order.order);
    SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0);

    pSubQueryInfo->tsBuf = output;
    
    SJoinSupporter* pSupporter = pSql->pSubs[i]->param;

D
fix bug  
dapan1121 已提交
160
    if (pSupporter->pTSBuf == NULL) {
H
Haojun Liao 已提交
161
      tscDebug("0x%"PRIx64" at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql->self);
D
dapan1121 已提交
162 163 164
      return 0;
    }

D
fix bug  
dapan1121 已提交
165
    tsBufResetPos(pSupporter->pTSBuf);
D
dapan1121 已提交
166

D
fix bug  
dapan1121 已提交
167
    if (!tsBufNextPos(pSupporter->pTSBuf)) {
H
Haojun Liao 已提交
168
      tscDebug("0x%"PRIx64" input1 is empty, 0 for secondary query after ts blocks intersecting", pSql->self);
D
dapan1121 已提交
169 170 171
      return 0;
    }

H
Haojun Liao 已提交
172
    tscDebug("0x%"PRIx64" sub:%p table idx:%d, input group number:%d", pSql->self, pSql->pSubs[i], i, pSupporter->pTSBuf->numOfGroups);
D
dapan1121 已提交
173

D
dapan1121 已提交
174 175 176 177 178 179 180 181 182 183 184 185
    ctxlist[i].p = pSupporter;
    ctxlist[i].res = output;
  }

  TSKEY st = taosGetTimestampUs();

  for (int16_t tidx = 0; tidx < joinNum; tidx++) {
    pctx = &ctxlist[tidx];
    if (pctx->compared) {
      continue;
    }

D
fix bug  
dapan1121 已提交
186
    assert(pctx->numOfInput == 0);
D
dapan1121 已提交
187

D
fix bug  
dapan1121 已提交
188
    tsCond = pQueryInfo->tagCond.joinInfo.joinTables[tidx]->tsJoin;
D
dapan1121 已提交
189

D
fix bug  
dapan1121 已提交
190 191
    tableNum = taosArrayGetSize(tsCond);
    assert(tableNum >= 2);
D
dapan1121 已提交
192

D
fix bug  
dapan1121 已提交
193 194 195 196 197
    for (int32_t i = 0; i < tableNum; ++i) {
      tableMIdx = taosArrayGet(tsCond, i);
      SMergeTsCtx* tctx = &ctxlist[*tableMIdx];
      tctx->compared = 1;
    }
D
dapan1121 已提交
198

D
fix bug  
dapan1121 已提交
199 200
    tableMIdx = taosArrayGet(tsCond, 0);
    pctx = &ctxlist[*tableMIdx];
D
dapan1121 已提交
201

D
fix bug  
dapan1121 已提交
202
    mainCtx = pctx;
D
dapan1121 已提交
203

D
fix bug  
dapan1121 已提交
204 205
    while (1) {          
      pctx = mainCtx;
D
dapan1121 已提交
206

D
fix bug  
dapan1121 已提交
207
      prev = tsBufGetElem(pctx->p->pTSBuf);
D
dapan1121 已提交
208

D
fix bug  
dapan1121 已提交
209
      ctxStack[stackidx++] = pctx;
D
dapan1121 已提交
210

D
fix bug  
dapan1121 已提交
211 212 213
      if (!tsBufIsValidElem(&prev)) {
        break;
      }
D
dapan1121 已提交
214

D
fix bug  
dapan1121 已提交
215 216
      tVariant tag = {0};
      tVariantAssign(&tag, prev.tag);
D
dapan1121 已提交
217

D
fix bug  
dapan1121 已提交
218
      int32_t skipped = 0;
D
dapan1121 已提交
219

D
fix bug  
dapan1121 已提交
220 221
      for (int32_t i = 1; i < tableNum; ++i) {        
        SMergeTsCtx* tctx = &ctxlist[i];
D
dapan1121 已提交
222
        
D
fix bug  
dapan1121 已提交
223 224
        // find the data in supporter2 with the same tag value
        STSElem e2 = tsBufFindElemStartPosByTag(tctx->p->pTSBuf, &tag);
D
dapan1121 已提交
225

D
fix bug  
dapan1121 已提交
226 227 228
        if (!tsBufIsValidElem(&e2)) {
          skipRemainValue(pctx->p->pTSBuf, &tag);
          skipped = 1;
D
dapan1121 已提交
229 230
          break;
        }
D
fix bug  
dapan1121 已提交
231
      }
D
dapan1121 已提交
232

D
fix bug  
dapan1121 已提交
233 234
      if (skipped) {
        slot = 0;
D
dapan1121 已提交
235
        stackidx = 0;
D
fix bug  
dapan1121 已提交
236
        continue;
D
dapan1121 已提交
237
      }
D
fix bug  
dapan1121 已提交
238 239 240
      
      tableMIdx = taosArrayGet(tsCond, ++slot);
      equalNum = 1;
D
dapan1121 已提交
241 242

      while (1) {
D
fix bug  
dapan1121 已提交
243 244 245 246
        ctx = &ctxlist[*tableMIdx];

        prev = tsBufGetElem(pctx->p->pTSBuf);
        cur = tsBufGetElem(ctx->p->pTSBuf);
D
dapan1121 已提交
247 248

        // data with current are exhausted
D
fix bug  
dapan1121 已提交
249
        if (!tsBufIsValidElem(&prev) || tVariantCompare(prev.tag, &tag) != 0) {
D
dapan1121 已提交
250 251 252
          break;
        }

D
fix bug  
dapan1121 已提交
253
        if (!tsBufIsValidElem(&cur) || tVariantCompare(cur.tag, &tag) != 0) { // ignore all records with the same tag
D
dapan1121 已提交
254 255 256
          break;
        }

D
fix bug  
dapan1121 已提交
257 258 259 260 261 262 263 264 265
        ctxStack[stackidx++] = ctx;

        int32_t ret = tsCompare(order, prev.ts, cur.ts);
        if (ret == 0) {
          if (++equalNum < tableNum) {
            pctx = ctx;
          
            if (++slot >= tableNum) {
              slot = 0;
D
dapan1121 已提交
266 267
            }

D
fix bug  
dapan1121 已提交
268 269 270 271 272 273 274 275 276 277 278 279 280
            tableMIdx = taosArrayGet(tsCond, slot);
            continue;
          }
          
          assert(stackidx == tableNum);

          if (pLimit->offset == 0 || pQueryInfo->interval.interval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
            if (win->skey > prev.ts) {
              win->skey = prev.ts;
            }
          
            if (win->ekey < prev.ts) {
              win->ekey = prev.ts;
D
dapan1121 已提交
281 282
            }

D
fix bug  
dapan1121 已提交
283 284 285 286 287 288
            for (int32_t i = 0; i < stackidx; ++i) {
              SMergeTsCtx* tctx = ctxStack[i];
              prev = tsBufGetElem(tctx->p->pTSBuf);
            
              tsBufAppend(tctx->res, prev.id, prev.tag, (const char*)&prev.ts, sizeof(prev.ts));                     
            }
D
dapan1121 已提交
289 290 291 292
          } else {
            pLimit->offset -= 1;//offset apply to projection?
          }

D
fix bug  
dapan1121 已提交
293 294 295
          for (int32_t i = 0; i < stackidx; ++i) {
            SMergeTsCtx* tctx = ctxStack[i];
          
D
dapan1121 已提交
296
            if (!tsBufNextPos(tctx->p->pTSBuf) && tctx == mainCtx) {
D
fix bug  
dapan1121 已提交
297 298 299 300 301 302 303 304 305 306 307 308 309 310
              mergeDone = 1;
            }
            tctx->numOfInput++;            
          }

          if (mergeDone) {
            break;
          }

          stackidx = 0;
          equalNum = 1;          

          ctxStack[stackidx++] = pctx;
        } else if (ret > 0) {
D
dapan1121 已提交
311
          if (!tsBufNextPos(ctx->p->pTSBuf) && ctx == mainCtx) {
D
fix bug  
dapan1121 已提交
312 313 314 315 316 317 318 319 320 321 322 323
            mergeDone = 1;
            break;
          }
          
          ctx->numOfInput++;
          stackidx--;
        } else {          
          stackidx--;
          
          for (int32_t i = 0; i < stackidx; ++i) {
            SMergeTsCtx* tctx = ctxStack[i];
            
D
dapan1121 已提交
324
            if (!tsBufNextPos(tctx->p->pTSBuf) && tctx == mainCtx) {
D
fix bug  
dapan1121 已提交
325 326 327 328 329 330 331 332
              mergeDone = 1;
            }
            tctx->numOfInput++;
          }

          if (mergeDone) {
            break;
          }
D
dapan1121 已提交
333

D
fix bug  
dapan1121 已提交
334 335 336 337
          stackidx = 0;        
          equalNum = 1;
          
          ctxStack[stackidx++] = pctx;
D
dapan1121 已提交
338
        }
D
fix bug  
dapan1121 已提交
339 340 341 342 343

      }

      if (mergeDone) {
        break;
D
dapan1121 已提交
344
      }
D
fix bug  
dapan1121 已提交
345 346 347 348 349

      slot = 0;
      stackidx = 0;
      
      skipRemainValue(mainCtx->p->pTSBuf, &tag);
D
dapan1121 已提交
350
    }
D
fix bug  
dapan1121 已提交
351 352 353 354

    stackidx = 0;
    slot = 0;
    mergeDone = 0;
D
dapan1121 已提交
355 356 357 358 359 360 361
  }

  /*
   * failed to set the correct ts order yet in two cases:
   * 1. only one element
   * 2. only one element for each tag.
   */
D
fix bug  
dapan1121 已提交
362 363 364 365
  if (ctxlist[0].res->tsOrder == -1) {
    for (int32_t i = 0; i < joinNum; ++i) {
      ctxlist[i].res->tsOrder = TSDB_ORDER_ASC;
    }
D
dapan1121 已提交
366 367
  }

D
fix bug  
dapan1121 已提交
368 369 370 371 372 373
  for (int32_t i = 0; i < joinNum; ++i) {
    tsBufFlush(ctxlist[i].res);
    
    tsBufDestroy(ctxlist[i].p->pTSBuf);
    ctxlist[i].p->pTSBuf = NULL;
  }
D
dapan1121 已提交
374 375 376
    
  TSKEY et = taosGetTimestampUs();

D
fix bug  
dapan1121 已提交
377
  for (int32_t i = 0; i < joinNum; ++i) {
H
Haojun Liao 已提交
378
    tscDebug("0x%"PRIx64" sub:%p tblidx:%d, input:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks "
D
fix bug  
dapan1121 已提交
379
             "intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us",
H
Haojun Liao 已提交
380
             pSql->self, pSql->pSubs[i], i, ctxlist[i].numOfInput, ctxlist[i].res->numOfTotal, ctxlist[i].res->numOfGroups, win->skey, win->ekey,
D
fix bug  
dapan1121 已提交
381 382 383 384
             tsBufGetNumOfGroup(ctxlist[i].res), et - st);
  }  

  return ctxlist[0].res->numOfTotal;
D
dapan1121 已提交
385 386 387
}


H
hjxilinx 已提交
388
// todo handle failed to create sub query
H
Haojun Liao 已提交
389
SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, int32_t index) {
390
  SJoinSupporter* pSupporter = calloc(1, sizeof(SJoinSupporter));
H
hjxilinx 已提交
391 392 393 394 395 396 397 398 399
  if (pSupporter == NULL) {
    return NULL;
  }

  pSupporter->pObj = pSql;

  pSupporter->subqueryIndex = index;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
  
400
  memcpy(&pSupporter->interval, &pQueryInfo->interval, sizeof(pSupporter->interval));
H
hjxilinx 已提交
401 402 403
  pSupporter->limit = pQueryInfo->limit;

  STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, index);
404
  pSupporter->uid = pTableMetaInfo->pTableMeta->id.uid;
H
hjxilinx 已提交
405 406
  assert (pSupporter->uid != 0);

S
slguan 已提交
407
  taosGetTmpfilePath("join-", pSupporter->path);
H
hjxilinx 已提交
408

D
fix bug  
dapan1121 已提交
409 410
  // do NOT create file here to reduce crash generated file left issue
  pSupporter->f = NULL;
H
hjxilinx 已提交
411 412 413 414

  return pSupporter;
}

H
Haojun Liao 已提交
415
static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
H
hjxilinx 已提交
416 417 418 419
  if (pSupporter == NULL) {
    return;
  }

H
hjxilinx 已提交
420 421 422 423 424 425 426
  if (pSupporter->exprList != NULL) {
    tscSqlExprInfoDestroy(pSupporter->exprList);
  }
  
  if (pSupporter->colList != NULL) {
    tscColumnListDestroy(pSupporter->colList);
  }
H
hjxilinx 已提交
427

H
hjxilinx 已提交
428
  tscFieldInfoClear(&pSupporter->fieldsInfo);
H
hjxilinx 已提交
429

D
fix bug  
dapan1121 已提交
430 431 432 433 434 435 436
  if (pSupporter->pTSBuf != NULL) {
    tsBufDestroy(pSupporter->pTSBuf);
    pSupporter->pTSBuf = NULL;
  }

  unlink(pSupporter->path);
  
H
hjxilinx 已提交
437 438
  if (pSupporter->f != NULL) {
    fclose(pSupporter->f);
H
hjxilinx 已提交
439
    pSupporter->f = NULL;
H
hjxilinx 已提交
440 441
  }

D
fix bug  
dapan1121 已提交
442

443 444 445 446 447
  if (pSupporter->pVgroupTables != NULL) {
    taosArrayDestroy(pSupporter->pVgroupTables);
    pSupporter->pVgroupTables = NULL;
  }

S
TD-1848  
Shengliang Guan 已提交
448
  tfree(pSupporter->pIdTagList);
H
hjxilinx 已提交
449 450 451 452 453 454 455 456 457 458
  tscTagCondRelease(&pSupporter->tagCond);
  free(pSupporter);
}

/*
 * need the secondary query process
 * In case of count(ts)/count(*)/spread(ts) query, that are only applied to
 * primary timestamp column , the secondary query is not necessary
 *
 */
H
Haojun Liao 已提交
459
static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
460 461 462
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  
  for (int32_t i = 0; i < numOfCols; ++i) {
463 464
    SColumn* base = taosArrayGet(pQueryInfo->colList, i);
    if (base->colIndex.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
H
hjxilinx 已提交
465 466 467 468 469 470 471
      return true;
    }
  }

  return false;
}

472 473 474
static void filterVgroupTables(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
  int32_t  num = 0;
  int32_t* list = NULL;
H
Haojun Liao 已提交
475
  tsBufGetGroupIdList(pQueryInfo->tsBuf, &num, &list);
476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500

  // The virtual node, of which all tables are disqualified after the timestamp intersection,
  // is removed to avoid next stage query.
  // TODO: If tables from some vnodes are not qualified for next stage query, discard them.
  for (int32_t k = 0; k < taosArrayGetSize(pVgroupTables);) {
    SVgroupTableInfo* p = taosArrayGet(pVgroupTables, k);

    bool found = false;
    for (int32_t f = 0; f < num; ++f) {
      if (p->vgInfo.vgId == list[f]) {
        found = true;
        break;
      }
    }

    if (!found) {
      tscRemoveVgroupTableGroup(pVgroupTables, k);
    } else {
      k++;
    }
  }

  assert(taosArrayGetSize(pVgroupTables) > 0);
  TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);

S
TD-1848  
Shengliang Guan 已提交
501
  tfree(list);
502 503 504 505 506
}

static SArray* buildVgroupTableByResult(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
  int32_t  num = 0;
  int32_t* list = NULL;
H
Haojun Liao 已提交
507
  tsBufGetGroupIdList(pQueryInfo->tsBuf, &num, &list);
508

S
Shengliang Guan 已提交
509
  size_t numOfGroups = taosArrayGetSize(pVgroupTables);
510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527

  SArray* pNew = taosArrayInit(num, sizeof(SVgroupTableInfo));

  SVgroupTableInfo info;
  for (int32_t i = 0; i < num; ++i) {
    int32_t vnodeId = list[i];

    for (int32_t j = 0; j < numOfGroups; ++j) {
      SVgroupTableInfo* p1 = taosArrayGet(pVgroupTables, j);
      if (p1->vgInfo.vgId == vnodeId) {
        tscVgroupTableCopy(&info, p1);
        break;
      }
    }

    taosArrayPush(pNew, &info);
  }

S
TD-1848  
Shengliang Guan 已提交
528
  tfree(list);
529 530 531 532 533
  TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);

  return pNew;
}

H
hjxilinx 已提交
534 535 536
/*
 * launch secondary stage query to fetch the result that contains timestamp in set
 */
H
Haojun Liao 已提交
537
static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
H
hjxilinx 已提交
538
  int32_t         numOfSub = 0;
539
  SJoinSupporter* pSupporter = NULL;
H
hjxilinx 已提交
540
  
H
Haojun Liao 已提交
541
  //If the columns are not involved in the final select clause, the corresponding query will not be issued.
H
Haojun Liao 已提交
542
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
543
    pSupporter = pSql->pSubs[i]->param;
H
hjxilinx 已提交
544
    if (taosArrayGetSize(pSupporter->exprList) > 0) {
H
hjxilinx 已提交
545 546 547 548 549 550 551
      ++numOfSub;
    }
  }
  
  assert(numOfSub > 0);
  
  // scan all subquery, if one sub query has only ts, ignore it
H
Haojun Liao 已提交
552
  tscDebug("0x%"PRIx64" start to launch secondary subqueries, %d out of %d needs to query", pSql->self, numOfSub, pSql->subState.numOfSub);
H
hjxilinx 已提交
553 554 555

  bool success = true;
  
H
Haojun Liao 已提交
556
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
557 558 559 560 561
    SSqlObj *pPrevSub = pSql->pSubs[i];
    pSql->pSubs[i] = NULL;
    
    pSupporter = pPrevSub->param;
  
H
hjxilinx 已提交
562
    if (taosArrayGetSize(pSupporter->exprList) == 0) {
H
Haojun Liao 已提交
563
      tscDebug("0x%"PRIx64" subIndex: %d, no need to launch query, ignore it", pSql->self, i);
H
hjxilinx 已提交
564 565
    
      tscDestroyJoinSupporter(pSupporter);
566
      taos_free_result(pPrevSub);
H
hjxilinx 已提交
567 568 569 570 571 572
    
      pSql->pSubs[i] = NULL;
      continue;
    }
  
    SQueryInfo *pSubQueryInfo = tscGetQueryInfoDetail(&pPrevSub->cmd, 0);
H
Haojun Liao 已提交
573
    STSBuf     *pTsBuf = pSubQueryInfo->tsBuf;
H
hjxilinx 已提交
574 575 576
    pSubQueryInfo->tsBuf = NULL;
  
    // free result for async object will also free sqlObj
H
Haojun Liao 已提交
577
    assert(tscSqlExprNumOfExprs(pSubQueryInfo) == 1); // ts_comp query only requires one result columns
H
hjxilinx 已提交
578 579
    taos_free_result(pPrevSub);
  
580
    SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL);
H
hjxilinx 已提交
581 582 583 584 585
    if (pNew == NULL) {
      tscDestroyJoinSupporter(pSupporter);
      success = false;
      break;
    }
586
    
B
Bomin Zhang 已提交
587

H
hjxilinx 已提交
588 589 590 591
    tscClearSubqueryInfo(&pNew->cmd);
    pSql->pSubs[i] = pNew;
  
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
H
Haojun Liao 已提交
592
    pQueryInfo->tsBuf = pTsBuf;  // transfer the ownership of timestamp comp-z data to the new created object
B
Bomin Zhang 已提交
593

H
hjxilinx 已提交
594
    // set the second stage sub query for join process
H
hjxilinx 已提交
595
    TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE);
596
    memcpy(&pQueryInfo->interval, &pSupporter->interval, sizeof(pQueryInfo->interval));
B
Bomin Zhang 已提交
597

H
hjxilinx 已提交
598
    tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond);
B
Bomin Zhang 已提交
599

H
hjxilinx 已提交
600 601 602
    pQueryInfo->colList = pSupporter->colList;
    pQueryInfo->exprList = pSupporter->exprList;
    pQueryInfo->fieldsInfo = pSupporter->fieldsInfo;
H
Haojun Liao 已提交
603
    pQueryInfo->groupbyExpr = pSupporter->groupInfo;
H
Haojun Liao 已提交
604

605
    assert(pNew->subState.numOfSub == 0 && pNew->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1);
H
hjxilinx 已提交
606
  
607
    tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
608
  
609
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
610
    pTableMetaInfo->pVgroupTables = pSupporter->pVgroupTables;
H
Haojun Liao 已提交
611 612 613

    pSupporter->exprList = NULL;
    pSupporter->colList = NULL;
614
    pSupporter->pVgroupTables = NULL;
H
Haojun Liao 已提交
615 616
    memset(&pSupporter->fieldsInfo, 0, sizeof(SFieldInfo));
    memset(&pSupporter->groupInfo, 0, sizeof(SSqlGroupbyExpr));
H
Haojun Liao 已提交
617

H
hjxilinx 已提交
618 619 620 621 622
    /*
     * When handling the projection query, the offset value will be modified for table-table join, which is changed
     * during the timestamp intersection.
     */
    pSupporter->limit = pQueryInfo->limit;
623
    pQueryInfo->limit = pSupporter->limit;
H
Haojun Liao 已提交
624 625 626 627 628

    SColumnIndex index = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
    SSchema* s = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, 0);

    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, 0);
H
Haojun Liao 已提交
629 630
    int16_t funcId = pExpr->functionId;

H
Haojun Liao 已提交
631
    // add the invisible timestamp column
H
Haojun Liao 已提交
632
    if ((pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) ||
H
Haojun Liao 已提交
633 634 635 636
        (funcId != TSDB_FUNC_TS && funcId != TSDB_FUNC_TS_DUMMY && funcId != TSDB_FUNC_PRJ)) {

      int16_t functionId = tscIsProjectionQuery(pQueryInfo)? TSDB_FUNC_PRJ : TSDB_FUNC_TS;

637
      tscAddFuncInSelectClause(pQueryInfo, 0, functionId, &index, s, TSDB_COL_NORMAL);
H
Haojun Liao 已提交
638
      tscPrintSelectClause(pNew, 0);
639
      tscFieldInfoUpdateOffset(pQueryInfo);
H
Haojun Liao 已提交
640 641 642 643

      pExpr = tscSqlExprGet(pQueryInfo, 0);
    }

644
    // set the join condition tag column info, todo extract method
H
Haojun Liao 已提交
645 646
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
      assert(pQueryInfo->tagCond.joinInfo.hasJoin);
647
      int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
H
Haojun Liao 已提交
648

649
      // set the tag column id for executor to extract correct tag value
650
      pExpr->param[0] = (tVariant) {.i64 = colId, .nType = TSDB_DATA_TYPE_BIGINT, .nLen = sizeof(int64_t)};
H
Haojun Liao 已提交
651
      pExpr->numOfParams = 1;
H
Haojun Liao 已提交
652 653
    }

654 655 656 657 658 659 660 661
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
      assert(pTableMetaInfo->pVgroupTables != NULL);
      if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
        SArray* p = buildVgroupTableByResult(pQueryInfo, pTableMetaInfo->pVgroupTables);
        tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables);
        pTableMetaInfo->pVgroupTables = p;
      } else {
        filterVgroupTables(pQueryInfo, pTableMetaInfo->pVgroupTables);
H
Haojun Liao 已提交
662 663 664
      }
    }

D
fix bug  
dapan1121 已提交
665 666
    subquerySetState(pPrevSub, &pSql->subState, i, 0);
    
667
    size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
H
Haojun Liao 已提交
668 669
    tscDebug("0x%"PRIx64" subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s",
             pSql->self, pNew, 0, pTableMetaInfo->vgroupIndex, pQueryInfo->type, taosArrayGetSize(pQueryInfo->exprList),
670
             numOfCols, pQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name));
H
hjxilinx 已提交
671 672 673 674
  }
  
  //prepare the subqueries object failed, abort
  if (!success) {
675
    pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
D
dapan1121 已提交
676
    tscError("0x%"PRIx64" failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d", pSql->self,
H
Haojun Liao 已提交
677
        pSql->subState.numOfSub, pSql->res.code);
H
hjxilinx 已提交
678
    freeJoinSubqueryObj(pSql);
H
hjxilinx 已提交
679 680 681 682
    
    return pSql->res.code;
  }
  
H
Haojun Liao 已提交
683
  for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
Haojun Liao 已提交
684
    if (pSql->pSubs[i] == NULL) {
H
hjxilinx 已提交
685 686
      continue;
    }
H
Haojun Liao 已提交
687

H
Haojun Liao 已提交
688
    tscDoQuery(pSql->pSubs[i]);
H
hjxilinx 已提交
689 690 691 692 693
  }

  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
694
void freeJoinSubqueryObj(SSqlObj* pSql) {
H
Haojun Liao 已提交
695
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
696 697 698 699 700
    SSqlObj* pSub = pSql->pSubs[i];
    if (pSub == NULL) {
      continue;
    }
    
701
    SJoinSupporter* p = pSub->param;
H
hjxilinx 已提交
702
    tscDestroyJoinSupporter(p);
H
hjxilinx 已提交
703

704 705
    taos_free_result(pSub);
    pSql->pSubs[i] = NULL;
H
hjxilinx 已提交
706 707
  }

D
fix bug  
dapan1121 已提交
708 709 710 711
  if (pSql->subState.states) {
    pthread_mutex_destroy(&pSql->subState.mutex);
  }
  
712 713
  tfree(pSql->subState.states);
  
D
fix bug  
dapan1121 已提交
714
  
H
Haojun Liao 已提交
715
  pSql->subState.numOfSub = 0;
H
hjxilinx 已提交
716 717
}

D
fix bug  
dapan1121 已提交
718
static int32_t quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
D
fix bug  
dapan1121 已提交
719
  if (subAndCheckDone(pSqlSub, pSqlObj, pSupporter->subqueryIndex)) {
D
dapan1121 已提交
720
    tscError("0x%"PRIx64" all subquery return and query failed, global code:%s", pSqlObj->self, tstrerror(pSqlObj->res.code));  
H
hjxilinx 已提交
721
    freeJoinSubqueryObj(pSqlObj);
D
fix bug  
dapan1121 已提交
722
    return 0;
H
hjxilinx 已提交
723
  }
D
fix bug  
dapan1121 已提交
724

D
fix bug  
dapan1121 已提交
725
  return 1;
D
TD-2516  
dapan1121 已提交
726
  //tscDestroyJoinSupporter(pSupporter);
H
hjxilinx 已提交
727 728 729
}

// update the query time range according to the join results on timestamp
H
Haojun Liao 已提交
730 731 732
static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) {
  assert(pQueryInfo->window.skey <= win->skey && pQueryInfo->window.ekey >= win->ekey);
  pQueryInfo->window = *win;
H
Haojun Liao 已提交
733 734


H
hjxilinx 已提交
735 736
}

H
Haojun Liao 已提交
737 738 739
int32_t tidTagsCompar(const void* p1, const void* p2) {
  const STidTags* t1 = (const STidTags*) (p1);
  const STidTags* t2 = (const STidTags*) (p2);
740 741
  
  if (t1->vgId != t2->vgId) {
weixin_48148422's avatar
weixin_48148422 已提交
742 743
    return (t1->vgId > t2->vgId) ? 1 : -1;
  }
744

H
Haojun Liao 已提交
745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763
  tstr* tag1 = (tstr*) t1->tag;
  tstr* tag2 = (tstr*) t2->tag;

  if (tag1->len != tag2->len) {
    return (tag1->len > tag2->len)? 1: -1;
  }

  return strncmp(tag1->data, tag2->data, tag1->len);
}

int32_t tagValCompar(const void* p1, const void* p2) {
  const STidTags* t1 = (const STidTags*) varDataVal(p1);
  const STidTags* t2 = (const STidTags*) varDataVal(p2);

  tstr* tag1 = (tstr*) t1->tag;
  tstr* tag2 = (tstr*) t2->tag;

  if (tag1->len != tag2->len) {
    return (tag1->len > tag2->len)? 1: -1;
764
  }
H
Haojun Liao 已提交
765

H
Haojun Liao 已提交
766
  return memcmp(tag1->data, tag2->data, tag1->len);
767 768
}

H
Haojun Liao 已提交
769
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables) {
H
Haojun Liao 已提交
770 771
  SArray*   result = taosArrayInit(4, sizeof(SVgroupTableInfo));
  SArray*   vgTables = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
772 773
  STidTags* prev = NULL;

H
Haojun Liao 已提交
774 775
  size_t numOfTables = taosArrayGetSize(tables);
  for (size_t i = 0; i < numOfTables; i++) {
D
fix bug  
dapan1121 已提交
776
    STidTags* tt = taosArrayGet(tables, i);
weixin_48148422's avatar
weixin_48148422 已提交
777

H
Haojun Liao 已提交
778
    if (prev == NULL || tt->vgId != prev->vgId) {
779
      SVgroupsInfo* pvg = pTableMetaInfo->vgroupList;
weixin_48148422's avatar
weixin_48148422 已提交
780

H
Haojun Liao 已提交
781 782 783
      SVgroupTableInfo info = {{0}};
      for (int32_t m = 0; m < pvg->numOfVgroups; ++m) {
        if (tt->vgId == pvg->vgroups[m].vgId) {
S
TD-1732  
Shengliang Guan 已提交
784
          tscSVgroupInfoCopy(&info.vgInfo, &pvg->vgroups[m]);
785 786 787
          break;
        }
      }
788
      assert(info.vgInfo.numOfEps != 0);
weixin_48148422's avatar
weixin_48148422 已提交
789

H
Haojun Liao 已提交
790
      vgTables = taosArrayInit(4, sizeof(STableIdInfo));
weixin_48148422's avatar
weixin_48148422 已提交
791
      info.itemList = vgTables;
H
Haojun Liao 已提交
792 793 794

      if (taosArrayGetSize(result) > 0) {
        SVgroupTableInfo* prevGroup = taosArrayGet(result, taosArrayGetSize(result) - 1);
H
Haojun Liao 已提交
795
        tscDebug("0x%"PRIx64" vgId:%d, tables:%"PRIzu, pSql->self, prevGroup->vgInfo.vgId, taosArrayGetSize(prevGroup->itemList));
H
Haojun Liao 已提交
796 797
      }

H
Haojun Liao 已提交
798
      taosArrayPush(result, &info);
799
    }
weixin_48148422's avatar
weixin_48148422 已提交
800

H
Haojun Liao 已提交
801 802
    STableIdInfo item = {.uid = tt->uid, .tid = tt->tid, .key = INT64_MIN};
    taosArrayPush(vgTables, &item);
H
Haojun Liao 已提交
803

D
dapan1121 已提交
804
    tscTrace("0x%"PRIx64" tid:%d, uid:%"PRIu64",vgId:%d added", pSql->self, tt->tid, tt->uid, tt->vgId);
weixin_48148422's avatar
weixin_48148422 已提交
805
    prev = tt;
806
  }
weixin_48148422's avatar
weixin_48148422 已提交
807

H
Haojun Liao 已提交
808
  pTableMetaInfo->vgroupIndex = 0;
D
fix bug  
dapan1121 已提交
809 810 811 812 813 814
  
  if (taosArrayGetSize(result) <= 0) {
    pTableMetaInfo->pVgroupTables = NULL;
    taosArrayDestroy(result);
  } else {
    pTableMetaInfo->pVgroupTables = result;
H
Haojun Liao 已提交
815 816

    SVgroupTableInfo* g = taosArrayGet(result, taosArrayGetSize(result) - 1);
H
Haojun Liao 已提交
817
    tscDebug("0x%"PRIx64" vgId:%d, tables:%"PRIzu, pSql->self, g->vgInfo.vgId, taosArrayGetSize(g->itemList));
H
Haojun Liao 已提交
818
  }
819 820
}

821
static void issueTsCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* pParent) {
822 823 824
  SSqlCmd* pCmd = &pSql->cmd;
  tscClearSubqueryInfo(pCmd);
  tscFreeSqlResult(pSql);
H
Haojun Liao 已提交
825

826
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
Haojun Liao 已提交
827 828
  assert(pQueryInfo->numOfTables == 1);

829 830 831 832 833 834 835 836 837 838 839 840
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  tscInitQueryInfo(pQueryInfo);

  TSDB_QUERY_CLEAR_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
  TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
  
  pCmd->command = TSDB_SQL_SELECT;
  pSql->fp = tscJoinQueryCallback;
  
  SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
  
  SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
841
  tscAddFuncInSelectClause(pQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL);
842 843
  
  // set the tags value for ts_comp function
H
Haojun Liao 已提交
844 845
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0);
846
    int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
847
    pExpr->param->i64 = tagColId;
H
Haojun Liao 已提交
848 849 850
    pExpr->numOfParams = 1;
  }

851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866
  // add the filter tag column
  if (pSupporter->colList != NULL) {
    size_t s = taosArrayGetSize(pSupporter->colList);
    
    for (int32_t i = 0; i < s; ++i) {
      SColumn *pCol = taosArrayGetP(pSupporter->colList, i);
      
      if (pCol->numOfFilters > 0) {  // copy to the pNew->cmd.colList if it is filtered.
        SColumn *p = tscColumnClone(pCol);
        taosArrayPush(pQueryInfo->colList, &p);
      }
    }
  }
  
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  
867
  tscDebug(
H
Haojun Liao 已提交
868
      "%p subquery:%p tableIndex:%d, vgroupIndex:%d, numOfVgroups:%d, type:%d, ts_comp query to retrieve timestamps, "
S
TD-1057  
Shengliang Guan 已提交
869
      "numOfExpr:%" PRIzu ", colList:%" PRIzu ", numOfOutputFields:%d, name:%s",
H
Haojun Liao 已提交
870
      pParent, pSql, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pQueryInfo->type,
871
      tscSqlExprNumOfExprs(pQueryInfo), numOfCols, pQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name));
872 873 874 875
  
  tscProcessSql(pSql);
}

H
Haojun Liao 已提交
876
static bool checkForDuplicateTagVal(SSchema* pColSchema, SJoinSupporter* p1, SSqlObj* pPSqlObj) {
H
Haojun Liao 已提交
877 878 879
  for(int32_t i = 1; i < p1->num; ++i) {
    STidTags* prev = (STidTags*) varDataVal(p1->pIdTagList + (i - 1) * p1->tagSize);
    STidTags* p = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize);
880
    assert(prev->vgId >= 1 && p->vgId >= 1);
H
Haojun Liao 已提交
881 882

    if (doCompare(prev->tag, p->tag, pColSchema->type, pColSchema->bytes) == 0) {
D
dapan1121 已提交
883
      tscError("0x%"PRIx64" join tags have same value for different table, free all sub SqlObj and quit", pPSqlObj->self);
H
Haojun Liao 已提交
884
      pPSqlObj->res.code = TSDB_CODE_QRY_DUP_JOIN_KEY;
H
Haojun Liao 已提交
885 886 887 888 889 890 891 892
      return false;
    }
  }

  return true;
}


D
dapan1121 已提交
893 894
static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray* resList) {
  int16_t joinNum = pParentSql->subState.numOfSub;
H
Haojun Liao 已提交
895
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
896
  int16_t tagColId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
D
dapan1121 已提交
897
  SJoinSupporter* p0 = pParentSql->pSubs[0]->param;
D
dapan1121 已提交
898
  SMergeCtx ctxlist[TSDB_MAX_JOIN_TABLE_NUM] = {{0}};
D
dapan1121 已提交
899 900 901 902
  SMergeCtx* ctxStack[TSDB_MAX_JOIN_TABLE_NUM] = {0};

  // int16_t for padding
  int32_t size = p0->tagSize - sizeof(int16_t);
H
Haojun Liao 已提交
903

H
Haojun Liao 已提交
904
  SSchema* pColSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
D
dapan1121 已提交
905
  
H
Haojun Liao 已提交
906
  tscDebug("0x%"PRIx64" all subquery retrieve <tid, tags> complete, do tags match", pParentSql->self);
H
Haojun Liao 已提交
907

D
dapan1121 已提交
908 909
  for (int32_t i = 0; i < joinNum; i++) {
    SJoinSupporter* p = pParentSql->pSubs[i]->param;
H
Haojun Liao 已提交
910

D
dapan1121 已提交
911
    ctxlist[i].p = p;
D
fix bug  
dapan1121 已提交
912
    ctxlist[i].res = taosArrayInit(p->num, size);
D
dapan1121 已提交
913 914 915 916 917 918 919 920 921 922 923 924
    
    tscDebug("Join %d - num:%d", i, p->num);
  
    // sort according to the tag valu
    qsort(p->pIdTagList, p->num, p->tagSize, tagValCompar);
    
    if (!checkForDuplicateTagVal(pColSchema, p, pParentSql)) {
      for (int32_t j = 0; j <= i; j++) {
        taosArrayDestroy(ctxlist[j].res);
      }
      return TSDB_CODE_QRY_DUP_JOIN_KEY;
    }
H
Haojun Liao 已提交
925 926
  }

D
dapan1121 已提交
927
  int32_t slot = 0;
D
dapan1121 已提交
928
  size_t tableNum = 0;
D
dapan1121 已提交
929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949
  int16_t* tableMIdx = 0;
  int32_t equalNum = 0;
  int32_t stackidx = 0;
  int32_t mergeDone = 0;
  SMergeCtx* ctx = NULL;
  SMergeCtx* pctx = NULL;
  STidTags* cur = NULL;
  STidTags* prev = NULL;
  SArray*   tagCond = NULL;

  for (int16_t tidx = 0; tidx < joinNum; tidx++) {
    pctx = &ctxlist[tidx];
    if (pctx->compared) {
      continue;
    }

    assert(pctx->idx == 0 && taosArrayGetSize(pctx->res) == 0);

    tagCond = pQueryInfo->tagCond.joinInfo.joinTables[tidx]->tagJoin;

    tableNum = taosArrayGetSize(tagCond);
D
fix bug  
dapan1121 已提交
950 951 952 953 954 955 956
    assert(tableNum >= 2);

    for (int32_t i = 0; i < tableNum; ++i) {
      tableMIdx = taosArrayGet(tagCond, i);
      SMergeCtx* tctx = &ctxlist[*tableMIdx];
      tctx->compared = 1;
    }
D
dapan1121 已提交
957 958 959 960 961 962 963 964 965 966 967 968 969 970

    for (int32_t i = 0; i < tableNum; ++i) {
      tableMIdx = taosArrayGet(tagCond, i);
      SMergeCtx* tctx = &ctxlist[*tableMIdx];
      if (tctx->p->num <= 0 || tctx->p->pIdTagList == NULL) {
        mergeDone = 1;
        break;
      }
    }

    if (mergeDone) {
      mergeDone = 0;
      continue;
    }
D
dapan1121 已提交
971
    
D
fix bug  
dapan1121 已提交
972 973 974 975
    tableMIdx = taosArrayGet(tagCond, slot);
    
    pctx = &ctxlist[*tableMIdx];

D
dapan1121 已提交
976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006
    prev = (STidTags*) varDataVal(pctx->p->pIdTagList + pctx->idx * pctx->p->tagSize);

    ctxStack[stackidx++] = pctx;

    tableMIdx = taosArrayGet(tagCond, ++slot);

    equalNum = 1;
    
    while (1) {
      ctx = &ctxlist[*tableMIdx];
      
      cur = (STidTags*) varDataVal(ctx->p->pIdTagList + ctx->idx * ctx->p->tagSize);

      assert(cur->tid != 0 && prev->tid != 0);

      ctxStack[stackidx++] = ctx;

      int32_t ret = doCompare(prev->tag, cur->tag, pColSchema->type, pColSchema->bytes);
      if (ret == 0) {
        if (++equalNum < tableNum) {
          prev = cur;
          pctx = ctx;
        
          if (++slot >= tableNum) {
            slot = 0;
          }

          tableMIdx = taosArrayGet(tagCond, slot);
          continue;
        }
        
H
Haojun Liao 已提交
1007
        tscDebug("0x%"PRIx64" tag matched, vgId:%d, val:%d, tid:%d, uid:%"PRIu64", tid:%d, uid:%"PRIu64, pParentSql->self, prev->vgId,
D
dapan1121 已提交
1008 1009 1010 1011 1012 1013 1014 1015
                 *(int*) prev->tag, prev->tid, prev->uid, cur->tid, cur->uid);

        assert(stackidx == tableNum);
        
        for (int32_t i = 0; i < stackidx; ++i) {
          SMergeCtx* tctx = ctxStack[i];
          prev = (STidTags*) varDataVal(tctx->p->pIdTagList + tctx->idx * tctx->p->tagSize);

D
fix bug  
dapan1121 已提交
1016
          taosArrayPush(tctx->res, prev);
D
dapan1121 已提交
1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038
        }

        for (int32_t i = 0; i < stackidx; ++i) {
          SMergeCtx* tctx = ctxStack[i];
        
          if (++tctx->idx >= tctx->p->num) {
            mergeDone = 1;
            break;
          }
        }

        if (mergeDone) {
          break;
        }

        stackidx = 0;
        equalNum = 1;
        
        prev = (STidTags*) varDataVal(pctx->p->pIdTagList + pctx->idx * pctx->p->tagSize);

        ctxStack[stackidx++] = pctx;
      } else if (ret > 0) {
D
fix bug  
dapan1121 已提交
1039 1040
        stackidx--;
        
D
dapan1121 已提交
1041 1042 1043 1044
        if (++ctx->idx >= ctx->p->num) {
          break;
        }
      } else {
D
fix bug  
dapan1121 已提交
1045 1046
        stackidx--;
        
D
dapan1121 已提交
1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065
        for (int32_t i = 0; i < stackidx; ++i) {
          SMergeCtx* tctx = ctxStack[i];
          if (++tctx->idx >= tctx->p->num) {
            mergeDone = 1;
            break;
          }
        }

        if (mergeDone) {
          break;
        }

        stackidx = 0;        
        equalNum = 1;
        
        prev = (STidTags*) varDataVal(pctx->p->pIdTagList + pctx->idx * pctx->p->tagSize);
        ctxStack[stackidx++] = pctx;
      }

H
Haojun Liao 已提交
1066
    }
D
dapan1121 已提交
1067 1068 1069

    slot = 0;
    mergeDone = 0;
D
fix bug  
dapan1121 已提交
1070
    stackidx = 0;
H
Haojun Liao 已提交
1071
  }
1072

D
dapan1121 已提交
1073 1074 1075 1076 1077 1078
  for (int32_t i = 0; i < joinNum; ++i) {
    // reorganize the tid-tag value according to both the vgroup id and tag values
    // sort according to the tag value
    size_t num = taosArrayGetSize(ctxlist[i].res);
    
    qsort((ctxlist[i].res)->pData, num, size, tidTagsCompar);
H
Haojun Liao 已提交
1079

D
dapan1121 已提交
1080
    taosArrayPush(resList, &ctxlist[i].res);
H
Haojun Liao 已提交
1081

H
Haojun Liao 已提交
1082
    tscDebug("0x%"PRIx64" tags match complete, result num: %"PRIzu, pParentSql->self, num);
H
Haojun Liao 已提交
1083
  }
D
dapan1121 已提交
1084 1085 1086
  
  return TSDB_CODE_SUCCESS;
}
H
Haojun Liao 已提交
1087

D
dapan1121 已提交
1088
bool emptyTagList(SArray* resList, int32_t size) {
D
dapan1121 已提交
1089
  size_t rsize = taosArrayGetSize(resList);
D
dapan1121 已提交
1090 1091
  if (rsize != size) {
    return true;
H
Haojun Liao 已提交
1092 1093
  }

D
dapan1121 已提交
1094 1095 1096 1097 1098 1099 1100 1101
  for (int32_t i = 0; i < size; ++i) {
    SArray** s = taosArrayGet(resList, i);
    if (taosArrayGetSize(*s) <= 0) {
      return true;
    }
  }

  return false;
H
Haojun Liao 已提交
1102 1103
}

H
Haojun Liao 已提交
1104
static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
1105
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
1106

H
hjxilinx 已提交
1107
  SSqlObj* pParentSql = pSupporter->pObj;
H
Haojun Liao 已提交
1108

H
hjxilinx 已提交
1109 1110
  SSqlObj* pSql = (SSqlObj*)tres;
  SSqlCmd* pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1111 1112 1113
  SSqlRes* pRes = &pSql->res;

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
Haojun Liao 已提交
1114 1115

  // todo, the type may not include TSDB_QUERY_TYPE_TAG_FILTER_QUERY
H
Haojun Liao 已提交
1116
  assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY));
H
Haojun Liao 已提交
1117

D
dapan1121 已提交
1118
  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
1119
    tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, numOfRows, pParentSql->res.code);
D
fix bug  
dapan1121 已提交
1120 1121 1122
    if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
      return;
    }
D
dapan1121 已提交
1123 1124 1125 1126 1127 1128

    tscAsyncResultOnError(pParentSql);

    return;
  }

H
Haojun Liao 已提交
1129 1130 1131
  // check for the error code firstly
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    // todo retry if other subqueries are not failed
H
Haojun Liao 已提交
1132

H
Haojun Liao 已提交
1133
    assert(numOfRows < 0 && numOfRows == taos_errno(pSql));
D
dapan1121 已提交
1134
    tscError("0x%"PRIx64" sub query failed, code:%s, index:%d", pSql->self, tstrerror(numOfRows), pSupporter->subqueryIndex);
H
Haojun Liao 已提交
1135

H
Haojun Liao 已提交
1136
    pParentSql->res.code = numOfRows;
D
fix bug  
dapan1121 已提交
1137 1138 1139
    if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
      return;
    }
H
Haojun Liao 已提交
1140

H
Haojun Liao 已提交
1141
    tscAsyncResultOnError(pParentSql);
H
Haojun Liao 已提交
1142 1143
    return;
  }
H
Haojun Liao 已提交
1144

H
Haojun Liao 已提交
1145 1146
  // keep the results in memory
  if (numOfRows > 0) {
1147
    size_t validLen = (size_t)(pSupporter->tagSize * pRes->numOfRows);
H
Haojun Liao 已提交
1148
    size_t length = pSupporter->totalLen + validLen;
H
Haojun Liao 已提交
1149

H
Haojun Liao 已提交
1150 1151 1152
    // todo handle memory error
    char* tmp = realloc(pSupporter->pIdTagList, length);
    if (tmp == NULL) {
D
dapan1121 已提交
1153
      tscError("0x%"PRIx64" failed to malloc memory", pSql->self);
H
Haojun Liao 已提交
1154

H
Haojun Liao 已提交
1155
      pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
D
fix bug  
dapan1121 已提交
1156 1157 1158
      if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
        return;
      }
H
Haojun Liao 已提交
1159

H
Haojun Liao 已提交
1160
      tscAsyncResultOnError(pParentSql);
H
Haojun Liao 已提交
1161 1162
      return;
    }
H
Haojun Liao 已提交
1163

H
Haojun Liao 已提交
1164
    pSupporter->pIdTagList = tmp;
H
Haojun Liao 已提交
1165

H
Haojun Liao 已提交
1166
    memcpy(pSupporter->pIdTagList + pSupporter->totalLen, pRes->data, validLen);
S
Shengliang Guan 已提交
1167 1168
    pSupporter->totalLen += (int32_t)validLen;
    pSupporter->num += (int32_t)pRes->numOfRows;
H
Haojun Liao 已提交
1169

H
Haojun Liao 已提交
1170 1171 1172 1173 1174 1175
    // query not completed, continue to retrieve tid + tag tuples
    if (!pRes->completed) {
      taos_fetch_rows_a(tres, tidTagRetrieveCallback, param);
      return;
    }
  }
H
Haojun Liao 已提交
1176

H
Haojun Liao 已提交
1177 1178 1179 1180
  // data in current vnode has all returned to client, try next vnode if exits
  // <tid + tag> tuples have been retrieved to client, try <tid + tag> tuples from the next vnode
  if (hasMoreVnodesToTry(pSql)) {
    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
1181

H
Haojun Liao 已提交
1182 1183 1184
    int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
    pTableMetaInfo->vgroupIndex += 1;
    assert(pTableMetaInfo->vgroupIndex < totalVgroups);
H
Haojun Liao 已提交
1185

H
Haojun Liao 已提交
1186 1187
    tscDebug("0x%"PRIx64" tid_tag from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%d",
             pSql->self, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, pSupporter->num);
H
Haojun Liao 已提交
1188

H
Haojun Liao 已提交
1189 1190
    pCmd->command = TSDB_SQL_SELECT;
    tscResetForNextRetrieve(&pSql->res);
H
Haojun Liao 已提交
1191

H
Haojun Liao 已提交
1192 1193 1194 1195 1196
    // set the callback function
    pSql->fp = tscJoinQueryCallback;
    tscProcessSql(pSql);
    return;
  }
H
Haojun Liao 已提交
1197

H
Haojun Liao 已提交
1198 1199
  // no data exists in next vnode, mark the <tid, tags> query completed
  // only when there is no subquery exits any more, proceeds to get the intersect of the <tid, tags> tuple sets.
D
fix bug  
dapan1121 已提交
1200
  if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
H
Haojun Liao 已提交
1201
    tscDebug("0x%"PRIx64" tagRetrieve:%p,%d completed, total:%d", pParentSql->self, tres, pSupporter->subqueryIndex, pParentSql->subState.numOfSub);
H
Haojun Liao 已提交
1202
    return;
1203
  }  
H
Haojun Liao 已提交
1204

D
dapan1121 已提交
1205 1206 1207
  SArray* resList = taosArrayInit(pParentSql->subState.numOfSub, sizeof(SArray *));

  int32_t code = getIntersectionOfTableTuple(pQueryInfo, pParentSql, resList);
1208 1209 1210
  if (code != TSDB_CODE_SUCCESS) {
    freeJoinSubqueryObj(pParentSql);
    pParentSql->res.code = code;
H
Haojun Liao 已提交
1211
    tscAsyncResultOnError(pParentSql);
1212

D
dapan1121 已提交
1213
    taosArrayDestroy(resList);
1214 1215 1216
    return;
  }

D
dapan1121 已提交
1217
  if (emptyTagList(resList, pParentSql->subState.numOfSub)) {  // no results,return.
H
Haojun Liao 已提交
1218 1219
    assert(pParentSql->fp != tscJoinQueryCallback);

H
Haojun Liao 已提交
1220 1221
    tscDebug("0x%"PRIx64" tag intersect does not generated qualified tables for join, free all sub SqlObj and quit",
        pParentSql->self);
H
Haojun Liao 已提交
1222
    freeJoinSubqueryObj(pParentSql);
H
Haojun Liao 已提交
1223

1224 1225
    // set no result command
    pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
H
Haojun Liao 已提交
1226 1227
    assert(pParentSql->fp != tscJoinQueryCallback);

H
Haojun Liao 已提交
1228
    (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
B
Bomin Zhang 已提交
1229
  } else {
H
Haojun Liao 已提交
1230
    for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) {
D
dapan1121 已提交
1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242
      // proceed to for ts_comp query
      SSqlCmd* pSubCmd = &pParentSql->pSubs[m]->cmd;
      SArray** s = taosArrayGet(resList, m);
      
      SQueryInfo*     pQueryInfo = tscGetQueryInfoDetail(pSubCmd, 0);
      STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
      tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo, *s);
      
      SSqlObj* psub = pParentSql->pSubs[m];
      ((SJoinSupporter*)psub->param)->pVgroupTables =  tscVgroupTableInfoDup(pTableMetaInfo->pVgroupTables);
            
      memset(pParentSql->subState.states, 0, sizeof(pParentSql->subState.states[0]) * pParentSql->subState.numOfSub);
H
Haojun Liao 已提交
1243
      tscDebug("0x%"PRIx64" reset all sub states to 0", pParentSql->self);
D
dapan1121 已提交
1244 1245
      
      issueTsCompQuery(psub, psub->param, pParentSql);
B
Bomin Zhang 已提交
1246
    }
H
Haojun Liao 已提交
1247
  }
B
Bomin Zhang 已提交
1248

D
dapan1121 已提交
1249
  size_t rsize = taosArrayGetSize(resList);
D
dapan1121 已提交
1250 1251 1252 1253 1254 1255 1256 1257
  for (int32_t i = 0; i < rsize; ++i) {
    SArray** s = taosArrayGet(resList, i);
    if (*s) {
      taosArrayDestroy(*s);
    }
  }

  taosArrayDestroy(resList);
H
Haojun Liao 已提交
1258
}
H
Haojun Liao 已提交
1259

H
Haojun Liao 已提交
1260 1261
static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
1262

H
Haojun Liao 已提交
1263 1264 1265 1266 1267 1268 1269 1270 1271
  SSqlObj* pParentSql = pSupporter->pObj;

  SSqlObj* pSql = (SSqlObj*)tres;
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  assert(!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE));

D
dapan1121 已提交
1272
  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
1273
    tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, numOfRows, pParentSql->res.code);
D
fix bug  
dapan1121 已提交
1274 1275 1276
    if (quitAllSubquery(pSql, pParentSql, pSupporter)){
      return;
    }
D
dapan1121 已提交
1277 1278 1279 1280 1281 1282

    tscAsyncResultOnError(pParentSql);

    return;
  }

H
Haojun Liao 已提交
1283 1284 1285 1286
  // check for the error code firstly
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    // todo retry if other subqueries are not failed yet 
    assert(numOfRows < 0 && numOfRows == taos_errno(pSql));
D
dapan1121 已提交
1287
    tscError("0x%"PRIx64" sub query failed, code:%s, index:%d", pSql->self, tstrerror(numOfRows), pSupporter->subqueryIndex);
H
Haojun Liao 已提交
1288 1289

    pParentSql->res.code = numOfRows;
D
fix bug  
dapan1121 已提交
1290 1291 1292
    if (quitAllSubquery(pSql, pParentSql, pSupporter)){
      return;
    }
H
Haojun Liao 已提交
1293

H
Haojun Liao 已提交
1294
    tscAsyncResultOnError(pParentSql);
1295 1296
    return;
  }
H
Haojun Liao 已提交
1297

H
Haojun Liao 已提交
1298
  if (numOfRows > 0) {  // write the compressed timestamp to disk file
D
fix bug  
dapan1121 已提交
1299
    if(pSupporter->f == NULL) {
S
TD-1207  
Shengliang Guan 已提交
1300
      pSupporter->f = fopen(pSupporter->path, "wb");
D
fix bug  
dapan1121 已提交
1301 1302

      if (pSupporter->f == NULL) {
D
dapan1121 已提交
1303
        tscError("0x%"PRIx64" failed to create tmp file:%s, reason:%s", pSql->self, pSupporter->path, strerror(errno));
D
fix bug  
dapan1121 已提交
1304 1305 1306
        
        pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);

D
fix bug  
dapan1121 已提交
1307 1308 1309
        if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
          return;
        }
D
fix bug  
dapan1121 已提交
1310 1311 1312 1313 1314 1315 1316
        
        tscAsyncResultOnError(pParentSql);

        return;
      }
    }
      
1317
    fwrite(pRes->data, (size_t)pRes->numOfRows, 1, pSupporter->f);
H
Haojun Liao 已提交
1318 1319 1320 1321 1322
    fclose(pSupporter->f);
    pSupporter->f = NULL;

    STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true);
    if (pBuf == NULL) {  // in error process, close the fd
D
dapan1121 已提交
1323
      tscError("0x%"PRIx64" invalid ts comp file from vnode, abort subquery, file size:%d", pSql->self, numOfRows);
H
Haojun Liao 已提交
1324 1325

      pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
D
fix bug  
dapan1121 已提交
1326 1327 1328
      if (quitAllSubquery(pSql, pParentSql, pSupporter)){
        return;
      }
D
fix bug  
dapan1121 已提交
1329
      
H
Haojun Liao 已提交
1330
      tscAsyncResultOnError(pParentSql);
H
Haojun Liao 已提交
1331

H
hjxilinx 已提交
1332 1333
      return;
    }
1334

H
Haojun Liao 已提交
1335
    if (pSupporter->pTSBuf == NULL) {
H
Haojun Liao 已提交
1336
      tscDebug("0x%"PRIx64" create tmp file for ts block:%s, size:%d bytes", pSql->self, pBuf->path, numOfRows);
H
Haojun Liao 已提交
1337 1338 1339
      pSupporter->pTSBuf = pBuf;
    } else {
      assert(pQueryInfo->numOfTables == 1);  // for subquery, only one
H
Haojun Liao 已提交
1340
      tsBufMerge(pSupporter->pTSBuf, pBuf);
H
Haojun Liao 已提交
1341
      tsBufDestroy(pBuf);
H
Haojun Liao 已提交
1342
    }
H
hjxilinx 已提交
1343

H
Haojun Liao 已提交
1344 1345
    // continue to retrieve ts-comp data from vnode
    if (!pRes->completed) {
S
slguan 已提交
1346
      taosGetTmpfilePath("ts-join", pSupporter->path);
S
TD-1207  
Shengliang Guan 已提交
1347
      pSupporter->f = fopen(pSupporter->path, "wb");
H
Haojun Liao 已提交
1348
      pRes->row = pRes->numOfRows;
H
hjxilinx 已提交
1349

H
Haojun Liao 已提交
1350 1351
      taos_fetch_rows_a(tres, tsCompRetrieveCallback, param);
      return;
H
hjxilinx 已提交
1352
    }
H
Haojun Liao 已提交
1353
  }
H
Haojun Liao 已提交
1354

H
Haojun Liao 已提交
1355 1356
  if (hasMoreVnodesToTry(pSql)) {
    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
1357

H
Haojun Liao 已提交
1358 1359 1360
    int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
    pTableMetaInfo->vgroupIndex += 1;
    assert(pTableMetaInfo->vgroupIndex < totalVgroups);
H
Haojun Liao 已提交
1361

H
Haojun Liao 已提交
1362 1363
    tscDebug("0x%"PRIx64" results from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%" PRId64,
             pSql->self, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups,
H
Haojun Liao 已提交
1364
             pRes->numOfClauseTotal);
H
Haojun Liao 已提交
1365

H
Haojun Liao 已提交
1366 1367
    pCmd->command = TSDB_SQL_SELECT;
    tscResetForNextRetrieve(&pSql->res);
H
Haojun Liao 已提交
1368

H
Haojun Liao 已提交
1369
    assert(pSupporter->f == NULL);
S
slguan 已提交
1370
    taosGetTmpfilePath("ts-join", pSupporter->path);
H
Haojun Liao 已提交
1371 1372
    
    // TODO check for failure
S
TD-1207  
Shengliang Guan 已提交
1373
    pSupporter->f = fopen(pSupporter->path, "wb");
H
Haojun Liao 已提交
1374
    pRes->row = pRes->numOfRows;
H
Haojun Liao 已提交
1375

H
Haojun Liao 已提交
1376 1377 1378 1379 1380
    // set the callback function
    pSql->fp = tscJoinQueryCallback;
    tscProcessSql(pSql);
    return;
  }
H
Haojun Liao 已提交
1381

D
fix bug  
dapan1121 已提交
1382
  if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
H
Haojun Liao 已提交
1383
    return;
1384
  }  
H
hjxilinx 已提交
1385

H
Haojun Liao 已提交
1386
  tscDebug("0x%"PRIx64" all subquery retrieve ts complete, do ts block intersect", pParentSql->self);
H
hjxilinx 已提交
1387

H
Haojun Liao 已提交
1388
  STimeWindow win = TSWINDOW_INITIALIZER;
D
dapan1121 已提交
1389
  int64_t num = doTSBlockIntersect(pParentSql, &win);
H
Haojun Liao 已提交
1390
  if (num <= 0) {  // no result during ts intersect
H
Haojun Liao 已提交
1391
    tscDebug("0x%"PRIx64" no results generated in ts intersection, free all sub SqlObj and quit", pParentSql->self);
H
Haojun Liao 已提交
1392
    freeJoinSubqueryObj(pParentSql);
1393 1394 1395

    // set no result command
    pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
H
Haojun Liao 已提交
1396
    (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
H
Haojun Liao 已提交
1397 1398 1399 1400 1401 1402
    return;
  }

  // launch the query the retrieve actual results from vnode along with the filtered timestamp
  SQueryInfo* pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
  updateQueryTimeRange(pPQueryInfo, &win);
H
Haojun Liao 已提交
1403 1404

  //update the vgroup that involved in real data query
H
Haojun Liao 已提交
1405
  tscLaunchRealSubqueries(pParentSql);
H
Haojun Liao 已提交
1406
}
H
Haojun Liao 已提交
1407

H
Haojun Liao 已提交
1408 1409
static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfRows) {
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
1410

H
Haojun Liao 已提交
1411
  SSqlObj* pParentSql = pSupporter->pObj;
H
Haojun Liao 已提交
1412

H
Haojun Liao 已提交
1413 1414 1415
  SSqlObj* pSql = (SSqlObj*)tres;
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;
H
Haojun Liao 已提交
1416

H
Haojun Liao 已提交
1417
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
D
dapan1121 已提交
1418 1419

  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
1420
    tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, numOfRows, pParentSql->res.code);
D
fix bug  
dapan1121 已提交
1421 1422 1423 1424
    if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
      return;
    }
    
D
dapan1121 已提交
1425 1426 1427 1428 1429 1430
    tscAsyncResultOnError(pParentSql);

    return;
  }

  
H
Haojun Liao 已提交
1431 1432
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(numOfRows == taos_errno(pSql));
H
hjxilinx 已提交
1433

H
Haojun Liao 已提交
1434
    pParentSql->res.code = numOfRows;
D
dapan1121 已提交
1435
    tscError("0x%"PRIx64" retrieve failed, index:%d, code:%s", pSql->self, pSupporter->subqueryIndex, tstrerror(numOfRows));
H
Haojun Liao 已提交
1436

H
Haojun Liao 已提交
1437
    tscAsyncResultOnError(pParentSql);
H
Haojun Liao 已提交
1438
    return;
H
Haojun Liao 已提交
1439
  }
H
Haojun Liao 已提交
1440

H
Haojun Liao 已提交
1441 1442 1443
  if (numOfRows >= 0) {
    pRes->numOfTotal += pRes->numOfRows;
  }
H
Haojun Liao 已提交
1444

H
Haojun Liao 已提交
1445
  SSubqueryState* pState = &pParentSql->subState;
H
Haojun Liao 已提交
1446 1447 1448
  if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) {
    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
    assert(pQueryInfo->numOfTables == 1);
H
Haojun Liao 已提交
1449

H
Haojun Liao 已提交
1450
    // for projection query, need to try next vnode if current vnode is exhausted
H
Haojun Liao 已提交
1451 1452
    int32_t numOfVgroups = 0;  // TODO refactor
    if (pTableMetaInfo->pVgroupTables != NULL) {
S
Shengliang Guan 已提交
1453
      numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
H
Haojun Liao 已提交
1454 1455 1456 1457 1458
    } else {
      numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
    }

    if ((++pTableMetaInfo->vgroupIndex) < numOfVgroups) {
H
Haojun Liao 已提交
1459
      tscDebug("0x%"PRIx64" no result in current vnode anymore, try next vnode, vgIndex:%d", pSql->self, pTableMetaInfo->vgroupIndex);
H
Haojun Liao 已提交
1460 1461
      pSql->cmd.command = TSDB_SQL_SELECT;
      pSql->fp = tscJoinQueryCallback;
H
Haojun Liao 已提交
1462

H
Haojun Liao 已提交
1463 1464
      tscProcessSql(pSql);
      return;
H
Haojun Liao 已提交
1465
    } else {
H
Haojun Liao 已提交
1466
      tscDebug("0x%"PRIx64" no result in current subquery anymore", pSql->self);
H
Haojun Liao 已提交
1467 1468 1469
    }
  }

D
fix bug  
dapan1121 已提交
1470
  if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
H
Haojun Liao 已提交
1471
    tscDebug("0x%"PRIx64" sub:%p,%d completed, total:%d", pParentSql->self, tres, pSupporter->subqueryIndex, pState->numOfSub);
H
Haojun Liao 已提交
1472 1473 1474
    return;
  }

H
Haojun Liao 已提交
1475
  tscDebug("0x%"PRIx64" all %d secondary subqueries retrieval completed, code:%d", pSql->self, pState->numOfSub, pParentSql->res.code);
H
Haojun Liao 已提交
1476 1477 1478 1479 1480 1481 1482

  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    freeJoinSubqueryObj(pParentSql);
    pParentSql->res.completed = true;
  }

  // update the records for each subquery in parent sql object.
1483
  bool stableQuery = tscIsTwoStageSTableQuery(pQueryInfo, 0);
H
Haojun Liao 已提交
1484
  for (int32_t i = 0; i < pState->numOfSub; ++i) {
H
Haojun Liao 已提交
1485
    if (pParentSql->pSubs[i] == NULL) {
H
Haojun Liao 已提交
1486
      tscDebug("0x%"PRIx64" %p sub:%d not retrieve data", pParentSql->self, NULL, i);
H
Haojun Liao 已提交
1487
      continue;
H
hjxilinx 已提交
1488
    }
H
Haojun Liao 已提交
1489 1490

    SSqlRes* pRes1 = &pParentSql->pSubs[i]->res;
H
Haojun Liao 已提交
1491

D
dapan1121 已提交
1492 1493
    pParentSql->res.precision = pRes1->precision;

H
Haojun Liao 已提交
1494
    if (pRes1->row > 0 && pRes1->numOfRows > 0) {
H
Haojun Liao 已提交
1495
      tscDebug("0x%"PRIx64" sub:%p index:%d numOfRows:%d total:%"PRId64 " (not retrieve)", pParentSql->self, pParentSql->pSubs[i], i,
H
Haojun Liao 已提交
1496 1497 1498
               pRes1->numOfRows, pRes1->numOfTotal);
      assert(pRes1->row < pRes1->numOfRows);
    } else {
1499
      if (!stableQuery) {
1500 1501 1502
        pRes1->numOfClauseTotal += pRes1->numOfRows;
      }

H
Haojun Liao 已提交
1503
      tscDebug("0x%"PRIx64" sub:%p index:%d numOfRows:%d total:%"PRId64, pParentSql->self, pParentSql->pSubs[i], i,
H
Haojun Liao 已提交
1504 1505
               pRes1->numOfRows, pRes1->numOfTotal);
    }
H
hjxilinx 已提交
1506
  }
H
Haojun Liao 已提交
1507 1508 1509

  // data has retrieved to client, build the join results
  tscBuildResFromSubqueries(pParentSql);
H
hjxilinx 已提交
1510 1511
}

1512
void tscFetchDatablockForSubquery(SSqlObj* pSql) {
H
Haojun Liao 已提交
1513
  assert(pSql->subState.numOfSub >= 1);
H
hjxilinx 已提交
1514
  
H
hjxilinx 已提交
1515
  int32_t numOfFetch = 0;
H
Haojun Liao 已提交
1516 1517
  bool    hasData = true;
  bool    reachLimit = false;
H
Haojun Liao 已提交
1518 1519

  // if the subquery is NULL, it does not involved in the final result generation
H
Haojun Liao 已提交
1520
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
1521 1522
    SSqlObj* pSub = pSql->pSubs[i];
    if (pSub == NULL) {
H
hjxilinx 已提交
1523 1524
      continue;
    }
H
Haojun Liao 已提交
1525

H
hjxilinx 已提交
1526
    SSqlRes *pRes = &pSub->res;
H
Haojun Liao 已提交
1527

H
hjxilinx 已提交
1528 1529
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0);

H
Haojun Liao 已提交
1530 1531
    if (!tscHasReachLimitation(pQueryInfo, pRes)) {
      if (pRes->row >= pRes->numOfRows) {
H
Haojun Liao 已提交
1532
        // no data left in current result buffer
H
Haojun Liao 已提交
1533 1534
        hasData = false;

H
Haojun Liao 已提交
1535 1536
        // The current query is completed for the active vnode, try next vnode if exists
        // If it is completed, no need to fetch anymore.
H
Haojun Liao 已提交
1537 1538
        if (!pRes->completed) {
          numOfFetch++;
H
Haojun Liao 已提交
1539
        }
H
hjxilinx 已提交
1540
      }
H
Haojun Liao 已提交
1541 1542
    } else {  // has reach the limitation, no data anymore
      if (pRes->row >= pRes->numOfRows) {
H
Haojun Liao 已提交
1543 1544
        reachLimit = true;
        hasData    = false;
H
Haojun Liao 已提交
1545 1546
        break;
      }
H
hjxilinx 已提交
1547
    }
H
Haojun Liao 已提交
1548
  }
H
hjxilinx 已提交
1549

H
hjxilinx 已提交
1550 1551 1552 1553
  // has data remains in client side, and continue to return data to app
  if (hasData) {
    tscBuildResFromSubqueries(pSql);
    return;
H
Haojun Liao 已提交
1554
  }
H
Haojun Liao 已提交
1555

H
Haojun Liao 已提交
1556 1557
  // If at least one subquery is completed in current vnode, try the next vnode in case of multi-vnode
  // super table projection query.
1558 1559 1560 1561 1562 1563 1564
  if (reachLimit) {
    pSql->res.completed = true;
    freeJoinSubqueryObj(pSql);

    if (pSql->res.code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, 0);
    } else {
H
Haojun Liao 已提交
1565
      tscAsyncResultOnError(pSql);
1566 1567 1568 1569 1570 1571
    }

    return;
  }

  if (numOfFetch <= 0) {
H
Haojun Liao 已提交
1572 1573
    bool tryNextVnode = false;

1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586
    bool orderedPrjQuery = false;
    for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
      SSqlObj* pSub = pSql->pSubs[i];
      if (pSub == NULL) {
        continue;
      }

      SQueryInfo* p = tscGetQueryInfoDetail(&pSub->cmd, 0);
      orderedPrjQuery = tscNonOrderedProjectionQueryOnSTable(p, 0);
      if (orderedPrjQuery) {
        break;
      }
    }
H
Haojun Liao 已提交
1587

D
fix bug  
dapan1121 已提交
1588

1589
    if (orderedPrjQuery) {
1590
      for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
Haojun Liao 已提交
1591 1592
        SSqlObj* pSub = pSql->pSubs[i];
        if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) {
D
fix bug  
dapan1121 已提交
1593
          subquerySetState(pSub, &pSql->subState, i, 0);
H
Haojun Liao 已提交
1594 1595 1596
        }
      }
    }
D
fix bug  
dapan1121 已提交
1597
    
H
Haojun Liao 已提交
1598 1599 1600 1601 1602 1603 1604 1605 1606

    for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
      SSqlObj* pSub = pSql->pSubs[i];
      if (pSub == NULL) {
        continue;
      }

      SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0);

1607 1608
      if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && pSub->res.row >= pSub->res.numOfRows &&
          pSub->res.completed) {
H
Haojun Liao 已提交
1609 1610 1611 1612 1613 1614
        STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
        assert(pQueryInfo->numOfTables == 1);

        // for projection query, need to try next vnode if current vnode is exhausted
        int32_t numOfVgroups = 0;  // TODO refactor
        if (pTableMetaInfo->pVgroupTables != NULL) {
S
Shengliang Guan 已提交
1615
          numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
H
Haojun Liao 已提交
1616 1617 1618 1619 1620
        } else {
          numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
        }

        if ((++pTableMetaInfo->vgroupIndex) < numOfVgroups) {
H
Haojun Liao 已提交
1621
          tscDebug("0x%"PRIx64" no result in current vnode anymore, try next vnode, vgIndex:%d", pSub->self,
H
Haojun Liao 已提交
1622 1623 1624 1625 1626 1627 1628
                   pTableMetaInfo->vgroupIndex);
          pSub->cmd.command = TSDB_SQL_SELECT;
          pSub->fp = tscJoinQueryCallback;

          tscProcessSql(pSub);
          tryNextVnode = true;
        } else {
H
Haojun Liao 已提交
1629
          tscDebug("0x%"PRIx64" no result in current subquery anymore", pSub->self);
H
Haojun Liao 已提交
1630 1631 1632 1633 1634 1635 1636
        }
      }
    }

    if (tryNextVnode) {
      return;
    }
H
Haojun Liao 已提交
1637 1638 1639 1640

    pSql->res.completed = true;
    freeJoinSubqueryObj(pSql);

H
hjxilinx 已提交
1641 1642 1643
    if (pSql->res.code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, 0);
    } else {
H
Haojun Liao 已提交
1644
      tscAsyncResultOnError(pSql);
H
hjxilinx 已提交
1645
    }
1646

H
hjxilinx 已提交
1647 1648 1649 1650
    return;
  }

  // TODO multi-vnode retrieve for projection query with limitation has bugs, since the global limiation is not handled
H
Haojun Liao 已提交
1651
  // retrieve data from current vnode.
H
Haojun Liao 已提交
1652
  tscDebug("0x%"PRIx64" retrieve data from %d subqueries", pSql->self, numOfFetch);
H
Haojun Liao 已提交
1653
  SJoinSupporter* pSupporter = NULL;
1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666

  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
    SSqlObj* pSql1 = pSql->pSubs[i];
    if (pSql1 == NULL) {
      continue;
    }

    SSqlRes* pRes1 = &pSql1->res;

    if (pRes1->row >= pRes1->numOfRows) {
      subquerySetState(pSql1, &pSql->subState, i, 0);
    }
  }
H
Haojun Liao 已提交
1667

H
Haojun Liao 已提交
1668
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
1669 1670 1671 1672
    SSqlObj* pSql1 = pSql->pSubs[i];
    if (pSql1 == NULL) {
      continue;
    }
H
Haojun Liao 已提交
1673

H
hjxilinx 已提交
1674 1675 1676
    SSqlRes* pRes1 = &pSql1->res;
    SSqlCmd* pCmd1 = &pSql1->cmd;

1677
    pSupporter = (SJoinSupporter*)pSql1->param;
H
hjxilinx 已提交
1678 1679 1680 1681 1682 1683 1684 1685

    // wait for all subqueries completed
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd1, 0);
    assert(pRes1->numOfRows >= 0 && pQueryInfo->numOfTables == 1);

    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
    
    if (pRes1->row >= pRes1->numOfRows) {
H
Haojun Liao 已提交
1686
      tscDebug("0x%"PRIx64" subquery:%p retrieve data from vnode, subquery:%d, vgroupIndex:%d", pSql->self, pSql1,
H
hjxilinx 已提交
1687
               pSupporter->subqueryIndex, pTableMetaInfo->vgroupIndex);
H
hjxilinx 已提交
1688 1689

      tscResetForNextRetrieve(pRes1);
H
Haojun Liao 已提交
1690
      pSql1->fp = joinRetrieveFinalResCallback;
H
hjxilinx 已提交
1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706

      if (pCmd1->command < TSDB_SQL_LOCAL) {
        pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
      }

      tscProcessSql(pSql1);
    }
  }
}

// all subqueries return, set the result output index
void tscSetupOutputColumnIndex(SSqlObj* pSql) {
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;


H
Haojun Liao 已提交
1707
  // the column transfer support struct has been built
H
hjxilinx 已提交
1708
  if (pRes->pColumnIndex != NULL) {
H
Haojun Liao 已提交
1709
    return;
H
hjxilinx 已提交
1710 1711 1712 1713
  }

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

S
Shengliang Guan 已提交
1714
  int32_t numOfExprs = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
H
Haojun Liao 已提交
1715
  pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * numOfExprs);
H
Haojun Liao 已提交
1716 1717 1718 1719
  if (pRes->pColumnIndex == NULL) {
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return;
  }
H
Haojun Liao 已提交
1720 1721

  for (int32_t i = 0; i < numOfExprs; ++i) {
H
hjxilinx 已提交
1722 1723 1724 1725 1726
    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);

    int32_t tableIndexOfSub = -1;
    for (int32_t j = 0; j < pQueryInfo->numOfTables; ++j) {
      STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, j);
1727
      if (pTableMetaInfo->pTableMeta->id.uid == pExpr->uid) {
H
hjxilinx 已提交
1728 1729 1730 1731 1732 1733 1734 1735 1736 1737
        tableIndexOfSub = j;
        break;
      }
    }

    assert(tableIndexOfSub >= 0 && tableIndexOfSub < pQueryInfo->numOfTables);
    
    SSqlCmd* pSubCmd = &pSql->pSubs[tableIndexOfSub]->cmd;
    SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(pSubCmd, 0);
    
H
Haojun Liao 已提交
1738 1739
    size_t numOfSubExpr = taosArrayGetSize(pSubQueryInfo->exprList);
    for (int32_t k = 0; k < numOfSubExpr; ++k) {
H
hjxilinx 已提交
1740 1741 1742 1743 1744 1745 1746
      SSqlExpr* pSubExpr = tscSqlExprGet(pSubQueryInfo, k);
      if (pExpr->functionId == pSubExpr->functionId && pExpr->colInfo.colId == pSubExpr->colInfo.colId) {
        pRes->pColumnIndex[i] = (SColumnIndex){.tableIndex = tableIndexOfSub, .columnIndex = k};
        break;
      }
    }
  }
H
Haojun Liao 已提交
1747 1748

  // restore the offset value for super table query in case of final result.
1749
  tscRestoreFuncForSTableQuery(pQueryInfo);
H
Haojun Liao 已提交
1750
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
1751 1752 1753 1754
}

void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
  SSqlObj* pSql = (SSqlObj*)tres;
H
Haojun Liao 已提交
1755

1756
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
1757
  SSqlObj* pParentSql = pSupporter->pObj;
D
fix bug  
dapan1121 已提交
1758
  
H
hjxilinx 已提交
1759
  // There is only one subquery and table for each subquery.
H
hjxilinx 已提交
1760
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
D
fix bug  
dapan1121 已提交
1761 1762
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);

H
hjxilinx 已提交
1763
  assert(pQueryInfo->numOfTables == 1 && pSql->cmd.numOfClause == 1);
H
hjxilinx 已提交
1764

H
Haojun Liao 已提交
1765 1766
  // retrieve actual query results from vnode during the second stage join subquery
  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
1767
    tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, code, pParentSql->res.code);
D
fix bug  
dapan1121 已提交
1768 1769 1770
    if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
      return;
    }
D
fix bug  
dapan1121 已提交
1771

H
Haojun Liao 已提交
1772
    tscAsyncResultOnError(pParentSql);
H
hjxilinx 已提交
1773

H
Haojun Liao 已提交
1774 1775
    return;
  }
H
hjxilinx 已提交
1776

H
Haojun Liao 已提交
1777 1778 1779
  // TODO here retry is required, not directly returns to client
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(taos_errno(pSql) == code);
H
hjxilinx 已提交
1780

D
dapan1121 已提交
1781
    tscError("0x%"PRIx64" abort query, code:%s, global code:%s", pSql->self, tstrerror(code), tstrerror(pParentSql->res.code));
H
Haojun Liao 已提交
1782 1783
    pParentSql->res.code = code;

D
fix bug  
dapan1121 已提交
1784 1785 1786 1787
    if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
      return;
    }
    
H
Haojun Liao 已提交
1788
    tscAsyncResultOnError(pParentSql);
H
Haojun Liao 已提交
1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808

    return;
  }

  // retrieve <tid, tag> tuples from vnode
  if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) {
    pSql->fp = tidTagRetrieveCallback;
    pSql->cmd.command = TSDB_SQL_FETCH;
    tscProcessSql(pSql);
    return;
  }

  // retrieve ts_comp info from vnode
  if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
    pSql->fp = tsCompRetrieveCallback;
    pSql->cmd.command = TSDB_SQL_FETCH;
    tscProcessSql(pSql);
    return;
  }

1809 1810
  // In case of consequence query from other vnode, do not wait for other query response here.
  if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) {
D
fix bug  
dapan1121 已提交
1811
    if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
1812
      return;
1813
    }      
H
Haojun Liao 已提交
1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824
  }

  tscSetupOutputColumnIndex(pParentSql);

  /**
   * if the query is a continue query (vgroupIndex > 0 for projection query) for next vnode, do the retrieval of
   * data instead of returning to its invoker
   */
  if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
    pSql->fp = joinRetrieveFinalResCallback;  // continue retrieve data
    pSql->cmd.command = TSDB_SQL_FETCH;
1825
    
H
Haojun Liao 已提交
1826 1827 1828 1829 1830
    tscProcessSql(pSql);
  } else {  // first retrieve from vnode during the secondary stage sub-query
    // set the command flag must be after the semaphore been correctly set.
    if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
      (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
H
hjxilinx 已提交
1831
    } else {
H
Haojun Liao 已提交
1832
      tscAsyncResultOnError(pParentSql);
H
hjxilinx 已提交
1833 1834 1835 1836 1837 1838 1839
    }
  }
}

/////////////////////////////////////////////////////////////////////////////////////////
static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code);

1840
static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
H
hjxilinx 已提交
1841

H
Haojun Liao 已提交
1842
int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) {
H
hjxilinx 已提交
1843 1844 1845
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  
H
Haojun Liao 已提交
1846
  pSql->res.qId = 0x1;
H
Haojun Liao 已提交
1847 1848
  assert(pSql->res.numOfRows == 0);

H
hjxilinx 已提交
1849
  if (pSql->pSubs == NULL) {
H
Haojun Liao 已提交
1850
    pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
H
hjxilinx 已提交
1851
    if (pSql->pSubs == NULL) {
1852
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
1853 1854 1855
    }
  }
  
1856
  SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL);
H
hjxilinx 已提交
1857
  if (pNew == NULL) {
1858
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
1859 1860
  }
  
1861
  pSql->pSubs[tableIndex] = pNew;
H
hjxilinx 已提交
1862 1863 1864 1865 1866 1867 1868 1869
  
  if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
    addGroupInfoForSubquery(pSql, pNew, 0, tableIndex);
    
    // refactor as one method
    SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
    assert(pNewQueryInfo != NULL);
    
1870 1871 1872 1873 1874 1875 1876
    // update the table index
    size_t num = taosArrayGetSize(pNewQueryInfo->colList);
    for (int32_t i = 0; i < num; ++i) {
      SColumn* pCol = taosArrayGetP(pNewQueryInfo->colList, i);
      pCol->colIndex.tableIndex = 0;
    }
    
H
hjxilinx 已提交
1877 1878 1879 1880 1881 1882 1883
    pSupporter->colList = pNewQueryInfo->colList;
    pNewQueryInfo->colList = NULL;
    
    pSupporter->exprList = pNewQueryInfo->exprList;
    pNewQueryInfo->exprList = NULL;
    
    pSupporter->fieldsInfo = pNewQueryInfo->fieldsInfo;
H
hjxilinx 已提交
1884
  
H
hjxilinx 已提交
1885 1886
    // this data needs to be transfer to support struct
    memset(&pNewQueryInfo->fieldsInfo, 0, sizeof(SFieldInfo));
H
Haojun Liao 已提交
1887 1888 1889
    if (tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond) != 0) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
1890

H
Haojun Liao 已提交
1891 1892 1893
    pSupporter->groupInfo = pNewQueryInfo->groupbyExpr;
    memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));

H
hjxilinx 已提交
1894
    pNew->cmd.numOfCols = 0;
1895
    pNewQueryInfo->interval.interval = 0;
H
Haojun Liao 已提交
1896 1897 1898 1899
    pSupporter->limit = pNewQueryInfo->limit;

    pNewQueryInfo->limit.limit = -1;
    pNewQueryInfo->limit.offset = 0;
D
dapan1121 已提交
1900 1901

    pNewQueryInfo->order.orderColId = INT32_MIN;
H
Haojun Liao 已提交
1902

H
hjxilinx 已提交
1903 1904 1905
    // backup the data and clear it in the sqlcmd object
    memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));
    
H
hjxilinx 已提交
1906
    tscInitQueryInfo(pNewQueryInfo);
H
hjxilinx 已提交
1907 1908
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
    
weixin_48148422's avatar
weixin_48148422 已提交
1909
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // return the tableId & tag
H
Haojun Liao 已提交
1910
      SColumnIndex colIndex = {0};
H
Haojun Liao 已提交
1911 1912 1913 1914

      STagCond* pTagCond = &pSupporter->tagCond;
      assert(pTagCond->joinInfo.hasJoin);

1915
      int32_t tagColId = tscGetJoinTagColIdByUid(pTagCond, pTableMetaInfo->pTableMeta->id.uid);
H
Haojun Liao 已提交
1916 1917 1918
      SSchema* s = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);

      colIndex.columnIndex = tscGetTagColIndexById(pTableMetaInfo->pTableMeta, tagColId);
1919

H
Haojun Liao 已提交
1920
      int16_t bytes = 0;
H
Haojun Liao 已提交
1921
      int16_t type  = 0;
H
Haojun Liao 已提交
1922 1923
      int32_t inter = 0;

H
Haojun Liao 已提交
1924
      getResultDataInfo(s->type, s->bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0);
H
Haojun Liao 已提交
1925

S
Shengliang Guan 已提交
1926
      SSchema s1 = {.colId = s->colId, .type = (uint8_t)type, .bytes = bytes};
H
Haojun Liao 已提交
1927
      pSupporter->tagSize = s1.bytes;
1928
      assert(isValidDataType(s1.type) && s1.bytes > 0);
H
Haojun Liao 已提交
1929

1930 1931
      // set get tags query type
      TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
1932
      tscAddFuncInSelectClause(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &colIndex, &s1, TSDB_COL_TAG);
1933
      size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
1934
  
1935
      tscDebug(
1936
          "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to tid_tag query to retrieve (tableId, tags), "
S
TD-1057  
Shengliang Guan 已提交
1937
          "exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, tagIndex:%d, name:%s",
1938
          pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo),
1939
          numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, colIndex.columnIndex, tNameGetTableName(&pNewQueryInfo->pTableMetaInfo[0]->name));
1940 1941
    } else {
      SSchema      colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
H
Haojun Liao 已提交
1942
      SColumnIndex colIndex = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
1943
      tscAddFuncInSelectClause(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &colIndex, &colSchema, TSDB_COL_NORMAL);
1944 1945 1946 1947

      // set the tags value for ts_comp function
      SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0);

H
Haojun Liao 已提交
1948
      if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
1949
        int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
1950
        pExpr->param->i64 = tagColId;
H
Haojun Liao 已提交
1951 1952
        pExpr->numOfParams = 1;
      }
1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969

      // add the filter tag column
      if (pSupporter->colList != NULL) {
        size_t s = taosArrayGetSize(pSupporter->colList);

        for (int32_t i = 0; i < s; ++i) {
          SColumn *pCol = taosArrayGetP(pSupporter->colList, i);

          if (pCol->numOfFilters > 0) {  // copy to the pNew->cmd.colList if it is filtered.
            SColumn *p = tscColumnClone(pCol);
            taosArrayPush(pNewQueryInfo->colList, &p);
          }
        }
      }

      size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);

1970
      tscDebug(
B
Bomin Zhang 已提交
1971
          "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%u, transfer to ts_comp query to retrieve timestamps, "
S
TD-1057  
Shengliang Guan 已提交
1972
          "exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s",
1973
          pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo),
1974
          numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pNewQueryInfo->pTableMetaInfo[0]->name));
1975
    }
H
hjxilinx 已提交
1976
  } else {
H
hjxilinx 已提交
1977
    assert(0);
H
hjxilinx 已提交
1978 1979 1980 1981
    SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
    pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
  }

H
Haojun Liao 已提交
1982
  return TSDB_CODE_SUCCESS;
H
hjxilinx 已提交
1983 1984
}

H
Haojun Liao 已提交
1985
void tscHandleMasterJoinQuery(SSqlObj* pSql) {
H
hjxilinx 已提交
1986
  SSqlCmd* pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1987 1988
  SSqlRes* pRes = &pSql->res;

H
hjxilinx 已提交
1989 1990
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0);
1991

H
Haojun Liao 已提交
1992
  int32_t code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1993
  pSql->subState.numOfSub = pQueryInfo->numOfTables;
H
Haojun Liao 已提交
1994

1995 1996 1997 1998 1999 2000
  if (pSql->subState.states == NULL) {
    pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states));
    if (pSql->subState.states == NULL) {
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      goto _error;
    }
D
fix bug  
dapan1121 已提交
2001 2002
    
    pthread_mutex_init(&pSql->subState.mutex, NULL);
2003
  }
D
dapan1121 已提交
2004 2005

  memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub);
H
Haojun Liao 已提交
2006
  tscDebug("0x%"PRIx64" reset all sub states to 0", pSql->self);
2007
  
H
Haojun Liao 已提交
2008 2009
  bool hasEmptySub = false;

H
Haojun Liao 已提交
2010
  tscDebug("0x%"PRIx64" start subquery, total:%d", pSql->self, pQueryInfo->numOfTables);
H
hjxilinx 已提交
2011
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
Haojun Liao 已提交
2012

H
Haojun Liao 已提交
2013
    SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i);
H
hjxilinx 已提交
2014 2015
    
    if (pSupporter == NULL) {  // failed to create support struct, abort current query
D
dapan1121 已提交
2016
      tscError("0x%"PRIx64" tableIndex:%d, failed to allocate join support object, abort further query", pSql->self, i);
H
Haojun Liao 已提交
2017 2018
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      goto _error;
H
hjxilinx 已提交
2019 2020
    }
    
H
Haojun Liao 已提交
2021
    code = tscCreateJoinSubquery(pSql, i, pSupporter);
H
hjxilinx 已提交
2022 2023
    if (code != TSDB_CODE_SUCCESS) {  // failed to create subquery object, quit query
      tscDestroyJoinSupporter(pSupporter);
H
Haojun Liao 已提交
2024 2025 2026 2027 2028 2029 2030
      goto _error;
    }

    SSqlObj* pSub = pSql->pSubs[i];
    STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSub->cmd, 0, 0);
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) && (pTableMetaInfo->vgroupList->numOfVgroups == 0)) {
      hasEmptySub = true;
H
hjxilinx 已提交
2031 2032 2033
      break;
    }
  }
H
Haojun Liao 已提交
2034

H
Haojun Liao 已提交
2035 2036 2037 2038 2039
  if (hasEmptySub) {  // at least one subquery is empty, do nothing and return
    freeJoinSubqueryObj(pSql);
    pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
    (*pSql->fp)(pSql->param, pSql, 0);
  } else {
D
fix bug  
dapan1121 已提交
2040
    int fail = 0;
H
Haojun Liao 已提交
2041
    for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
Haojun Liao 已提交
2042
      SSqlObj* pSub = pSql->pSubs[i];
D
fix bug  
dapan1121 已提交
2043 2044 2045 2046 2047
      if (fail) {
        (*pSub->fp)(pSub->param, pSub, 0);
        continue;
      }
      
H
Haojun Liao 已提交
2048
      if ((code = tscProcessSql(pSub)) != TSDB_CODE_SUCCESS) {
D
fix bug  
dapan1121 已提交
2049 2050
        pRes->code = code;
        (*pSub->fp)(pSub->param, pSub, 0);
D
fix bug  
dapan1121 已提交
2051
        fail = 1;
H
Haojun Liao 已提交
2052 2053 2054
      }
    }

D
fix bug  
dapan1121 已提交
2055 2056 2057 2058
    if(fail) {
      return;
    }

H
Haojun Liao 已提交
2059 2060 2061 2062 2063 2064 2065
    pSql->cmd.command = TSDB_SQL_TABLE_JOIN_RETRIEVE;
  }

  return;

  _error:
  pRes->code = code;
H
Haojun Liao 已提交
2066
  tscAsyncResultOnError(pSql);
H
hjxilinx 已提交
2067 2068
}

H
Haojun Liao 已提交
2069 2070
static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) {
  assert(numOfSubs <= pSql->subState.numOfSub && numOfSubs >= 0);
H
hjxilinx 已提交
2071 2072 2073 2074 2075 2076 2077
  
  for(int32_t i = 0; i < numOfSubs; ++i) {
    SSqlObj* pSub = pSql->pSubs[i];
    assert(pSub != NULL);
    
    SRetrieveSupport* pSupport = pSub->param;
    
S
TD-1848  
Shengliang Guan 已提交
2078 2079
    tfree(pSupport->localBuffer);
    tfree(pSupport);
H
hjxilinx 已提交
2080
    
2081
    taos_free_result(pSub);
H
hjxilinx 已提交
2082 2083 2084
  }
}

D
TD-2516  
dapan1121 已提交
2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101
void tscLockByThread(int64_t *lockedBy) {
  int64_t tid = taosGetSelfPthreadId();
  int     i = 0;
  while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) {
    if (++i % 100 == 0) {
      sched_yield();
    }
  }
}

void tscUnlockByThread(int64_t *lockedBy) {
  int64_t tid = taosGetSelfPthreadId();
  if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) {
    assert(false);
  }
}

2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117
typedef struct SFirstRoundQuerySup {
  SSqlObj  *pParent;
  int32_t   numOfRows;
  SArray   *pColsInfo;
  int32_t   tagLen;
  STColumn *pTagCols;
  SArray   *pResult;   // SArray<SInterResult>
  int64_t   interval;
  char*     buf;
  int32_t   bufLen;
} SFirstRoundQuerySup;

void doAppendData(SInterResult* pInterResult, TAOS_ROW row, int32_t numOfCols, SQueryInfo* pQueryInfo) {
  TSKEY key = INT64_MIN;
  for(int32_t i = 0; i < numOfCols; ++i) {
    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
D
fix bug  
dapan1121 已提交
2118
    if (TSDB_COL_IS_TAG(pExpr->colInfo.flag) || pExpr->functionId == TSDB_FUNC_PRJ) {
2119 2120 2121 2122 2123 2124 2125
      continue;
    }

    if (pExpr->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
      key = *(TSKEY*) row[i];
      continue;
    }
D
TD-2516  
dapan1121 已提交
2126

2127 2128 2129 2130 2131 2132 2133 2134
    double v = 0;
    if (row[i] != NULL) {
      v = *(double*) row[i];
    } else {
      SET_DOUBLE_NULL(&v);
    }

    int32_t id = pExpr->colInfo.colId;
H
Haojun Liao 已提交
2135
    int32_t numOfQueriedCols = (int32_t) taosArrayGetSize(pInterResult->pResult);
2136 2137 2138 2139 2140 2141 2142 2143 2144 2145

    SArray* p = NULL;
    for(int32_t j = 0; j < numOfQueriedCols; ++j) {
      SStddevInterResult* pColRes = taosArrayGet(pInterResult->pResult, j);
      if (pColRes->colId == id) {
        p = pColRes->pResult;
        break;
      }
    }

D
fix bug  
dapan1121 已提交
2146 2147 2148 2149 2150 2151 2152
    if (p && taosArrayGetSize(p) > 0) {
      SResPair *l = taosArrayGetLast(p);
      if (l->key == key && key == INT64_MIN) {
        continue;
      }
    }

2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164
    //append a new column
    if (p == NULL) {
      SStddevInterResult t = {.colId = id, .pResult = taosArrayInit(10, sizeof(SResPair)),};
      taosArrayPush(pInterResult->pResult, &t);
      p = t.pResult;
    }

    SResPair pair = {.avg = v, .key = key};
    taosArrayPush(p, &pair);
  }
}

2165 2166 2167 2168 2169 2170
static void destroySup(SFirstRoundQuerySup* pSup) {
  taosArrayDestroyEx(pSup->pResult, freeInterResult);
  taosArrayDestroy(pSup->pColsInfo);
  tfree(pSup);
}

2171 2172 2173 2174 2175 2176
void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
  SSqlObj* pSql = (SSqlObj*)tres;
  SSqlRes* pRes = &pSql->res;

  SFirstRoundQuerySup* pSup = param;

2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189
  SSqlObj*     pParent = pSup->pParent;
  SQueryInfo*  pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

  int32_t code = taos_errno(pSql);
  if (code != TSDB_CODE_SUCCESS) {
    destroySup(pSup);
    taos_free_result(pSql);
    pParent->res.code = code;
    tscAsyncResultOnError(pParent);
    return;
  }

  if (numOfRows > 0) {  // the number is not correct for group by column in super table query
2190 2191 2192 2193 2194 2195 2196 2197 2198
    TAOS_ROW row = NULL;
    int32_t  numOfCols = taos_field_count(tres);

    if (pSup->tagLen == 0) {  // no tags, all rows belong to one group
      SInterResult interResult = {.tags = NULL, .pResult = taosArrayInit(4, sizeof(SStddevInterResult))};
      taosArrayPush(pSup->pResult, &interResult);

      while ((row = taos_fetch_row(tres)) != NULL) {
        doAppendData(&interResult, row, numOfCols, pQueryInfo);
2199
        pSup->numOfRows += 1;
2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210
      }
    } else {  // tagLen > 0
      char* p = calloc(1, pSup->tagLen);

      while ((row = taos_fetch_row(tres)) != NULL) {
        int32_t* length = taos_fetch_lengths(tres);
        memset(p, 0, pSup->tagLen);

        int32_t offset = 0;
        for (int32_t i = 0; i < numOfCols && offset < pSup->tagLen; ++i) {
          SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
2211 2212 2213

          // tag or group by column
          if (TSDB_COL_IS_TAG(pExpr->colInfo.flag) || pExpr->functionId == TSDB_FUNC_PRJ) {
D
fix bug  
dapan1121 已提交
2214 2215 2216 2217 2218
            if (row[i] == NULL) {
              setNull(p + offset, pExpr->resType, pExpr->resBytes);
            } else {
              memcpy(p + offset, row[i], length[i]);
            }
2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245
            offset += pExpr->resBytes;
          }
        }

        assert(offset == pSup->tagLen);
        size_t size = taosArrayGetSize(pSup->pResult);

        if (size > 0) {
          SInterResult* pInterResult = taosArrayGetLast(pSup->pResult);
          if (memcmp(pInterResult->tags, p, pSup->tagLen) == 0) {  // belongs to the same group
            doAppendData(pInterResult, row, numOfCols, pQueryInfo);
          } else {
            char* tags = malloc( pSup->tagLen);
            memcpy(tags, p, pSup->tagLen);

            SInterResult interResult = {.tags = tags, .pResult = taosArrayInit(4, sizeof(SStddevInterResult))};
            taosArrayPush(pSup->pResult, &interResult);
            doAppendData(&interResult, row, numOfCols, pQueryInfo);
          }
        } else {
          char* tags = malloc(pSup->tagLen);
          memcpy(tags, p, pSup->tagLen);

          SInterResult interResult = {.tags = tags, .pResult = taosArrayInit(4, sizeof(SStddevInterResult))};
          taosArrayPush(pSup->pResult, &interResult);
          doAppendData(&interResult, row, numOfCols, pQueryInfo);
        }
2246 2247

        pSup->numOfRows += 1;
2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261
      }

      tfree(p);
    }
  }

  if (!pRes->completed) {
    taos_fetch_rows_a(tres, tscFirstRoundRetrieveCallback, param);
    return;
  }

  // set the parameters for the second round query process
  SSqlCmd    *pPCmd   = &pParent->cmd;
  SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(pPCmd, 0);
D
fix bug  
dapan1121 已提交
2262 2263
  int32_t resRows = pSup->numOfRows;
  
2264 2265 2266 2267
  if (pSup->numOfRows > 0) {
    SBufferWriter bw = tbufInitWriter(NULL, false);
    interResToBinary(&bw, pSup->pResult, pSup->tagLen);

H
Haojun Liao 已提交
2268
    pQueryInfo1->bufLen = (int32_t) tbufTell(&bw);
2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280
    pQueryInfo1->buf = tbufGetData(&bw, true);

    // set the serialized binary string as the parameter of arithmetic expression
    tbufCloseWriter(&bw);
  }

  taosArrayDestroyEx(pSup->pResult, freeInterResult);
  taosArrayDestroy(pSup->pColsInfo);
  tfree(pSup);

  taos_free_result(pSql);

D
fix bug  
dapan1121 已提交
2281 2282 2283 2284 2285 2286
  if (resRows == 0) {
    pParent->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
    (*pParent->fp)(pParent->param, pParent, 0);
    return;
  }

2287 2288 2289 2290 2291
  pQueryInfo1->round = 1;
  tscDoQuery(pParent);
}

void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) {
2292 2293 2294 2295 2296
  SFirstRoundQuerySup* pSup = (SFirstRoundQuerySup*) param;

  SSqlObj* pSql = (SSqlObj*) tres;
  int32_t c = taos_errno(pSql);

2297
  if (c != TSDB_CODE_SUCCESS) {
2298 2299 2300 2301 2302 2303 2304
    SSqlObj* parent = pSup->pParent;

    destroySup(pSup);
    taos_free_result(pSql);
    parent->res.code = code;
    tscAsyncResultOnError(parent);
    return;
2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337
  }

  taos_fetch_rows_a(tres, tscFirstRoundRetrieveCallback, param);
}

int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  STableMetaInfo* pTableMetaInfo1 = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);

  SFirstRoundQuerySup *pSup = calloc(1, sizeof(SFirstRoundQuerySup));

  pSup->pParent  = pSql;
  pSup->interval = pQueryInfo->interval.interval;
  pSup->pResult  = taosArrayInit(6, sizeof(SStddevInterResult));
  pSup->pColsInfo = taosArrayInit(6, sizeof(int16_t)); // result column id

  SSqlObj *pNew = createSubqueryObj(pSql, 0, tscFirstRoundCallback, pSup, TSDB_SQL_SELECT, NULL);
  SSqlCmd *pCmd = &pNew->cmd;

  tscClearSubqueryInfo(pCmd);
  tscFreeSqlResult(pSql);

  SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  assert(pQueryInfo->numOfTables == 1);

  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);

  tscInitQueryInfo(pNewQueryInfo);
  pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr;
  if (pQueryInfo->groupbyExpr.columnInfo != NULL) {
    pNewQueryInfo->groupbyExpr.columnInfo = taosArrayDup(pQueryInfo->groupbyExpr.columnInfo);
    if (pNewQueryInfo->groupbyExpr.columnInfo == NULL) {
      terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
2338
      goto _error;
2339 2340 2341 2342 2343
    }
  }

  if (tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond) != 0) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
2344
    goto _error;
2345 2346 2347 2348 2349 2350 2351
  }

  pNewQueryInfo->interval = pQueryInfo->interval;

  pCmd->command = TSDB_SQL_SELECT;
  pNew->fp = tscFirstRoundCallback;

H
Haojun Liao 已提交
2352
  int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386

  int32_t index = 0;
  for(int32_t i = 0; i < numOfExprs; ++i) {
    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
    if (pExpr->functionId == TSDB_FUNC_TS && pQueryInfo->interval.interval > 0) {
      taosArrayPush(pSup->pColsInfo, &pExpr->resColId);

      SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
      SSchema* schema = tscGetColumnSchemaById(pTableMetaInfo1->pTableMeta, pExpr->colInfo.colId);

      SSqlExpr* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_TS, &colIndex, schema, TSDB_COL_NORMAL);
      p->resColId = pExpr->resColId;  // update the result column id
    } else if (pExpr->functionId == TSDB_FUNC_STDDEV_DST) {
      taosArrayPush(pSup->pColsInfo, &pExpr->resColId);

      SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = pExpr->colInfo.colIndex};
      SSchema schema = {.type = TSDB_DATA_TYPE_DOUBLE, .bytes = sizeof(double)};
      tstrncpy(schema.name, pExpr->aliasName, tListLen(schema.name));

      SSqlExpr* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_AVG, &colIndex, &schema, TSDB_COL_NORMAL);
      p->resColId = pExpr->resColId;  // update the result column id
    } else if (pExpr->functionId == TSDB_FUNC_TAG) {
      pSup->tagLen += pExpr->resBytes;
      SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = pExpr->colInfo.colIndex};

      SSchema* schema = NULL;
      if (pExpr->colInfo.colId != TSDB_TBNAME_COLUMN_INDEX) {
        schema = tscGetColumnSchemaById(pTableMetaInfo1->pTableMeta, pExpr->colInfo.colId);
      } else {
        schema = tGetTbnameColumnSchema();
      }

      SSqlExpr* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_TAG, &colIndex, schema, TSDB_COL_TAG);
      p->resColId = pExpr->resColId;
2387
    } else if (pExpr->functionId == TSDB_FUNC_PRJ) {
H
Haojun Liao 已提交
2388
      int32_t num = (int32_t) taosArrayGetSize(pNewQueryInfo->groupbyExpr.columnInfo);
2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405
      for(int32_t k = 0; k < num; ++k) {
        SColIndex* pIndex = taosArrayGet(pNewQueryInfo->groupbyExpr.columnInfo, k);
        if (pExpr->colInfo.colId == pIndex->colId) {
          pSup->tagLen += pExpr->resBytes;
          taosArrayPush(pSup->pColsInfo, &pExpr->resColId);

          SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = pIndex->colIndex};
          SSchema* schema = tscGetColumnSchemaById(pTableMetaInfo1->pTableMeta, pExpr->colInfo.colId);

          //doLimitOutputNormalColOfGroupby
          SSqlExpr* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_PRJ, &colIndex, schema, TSDB_COL_NORMAL);
          p->numOfParams = 1;
          p->param[0].i64 = 1;
          p->param[0].nType = TSDB_DATA_TYPE_INT;
          p->resColId = pExpr->resColId;  // update the result column id
        }
      }
2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421
    }
  }

  SColumnIndex columnIndex = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
  tscInsertPrimaryTsSourceColumn(pNewQueryInfo, &columnIndex);

  tscTansformFuncForSTableQuery(pNewQueryInfo);

  tscDebug(
      "%p first round subquery:%p tableIndex:%d, vgroupIndex:%d, numOfVgroups:%d, type:%d, query to retrieve timestamps, "
      "numOfExpr:%" PRIzu ", colList:%d, numOfOutputFields:%d, name:%s",
      pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pNewQueryInfo->type,
      tscSqlExprNumOfExprs(pNewQueryInfo), index+1, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name));

  tscHandleMasterSTableQuery(pNew);
  return TSDB_CODE_SUCCESS;
2422 2423 2424 2425 2426 2427 2428

  _error:
  destroySup(pSup);
  taos_free_result(pNew);
  pSql->res.code = terrno;
  tscAsyncResultOnError(pSql);
  return terrno;
2429
}
D
TD-2516  
dapan1121 已提交
2430

H
hjxilinx 已提交
2431 2432 2433 2434 2435
int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
  
  // pRes->code check only serves in launching metric sub-queries
2436
  if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
H
hjxilinx 已提交
2437
    pCmd->command = TSDB_SQL_RETRIEVE_LOCALMERGE;  // enable the abort of kill super table function.
H
hjxilinx 已提交
2438 2439 2440
    return pRes->code;
  }
  
2441
  tExtMemBuffer   **pMemoryBuf = NULL;
H
Haojun Liao 已提交
2442 2443
  tOrderDescriptor *pDesc  = NULL;
  SColumnModel     *pModel = NULL;
H
Haojun Liao 已提交
2444
  SColumnModel     *pFinalModel = NULL;
H
Haojun Liao 已提交
2445

H
Haojun Liao 已提交
2446
  pRes->qId = 0x1;  // hack the qhandle check
H
hjxilinx 已提交
2447
  
H
Haojun Liao 已提交
2448
  const uint32_t nBufferSize = (1u << 16u);  // 64KB
H
hjxilinx 已提交
2449
  
H
Haojun Liao 已提交
2450
  SQueryInfo     *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
2451
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
2452 2453
  SSubqueryState *pState = &pSql->subState;

H
Haojun Liao 已提交
2454 2455 2456 2457
  pState->numOfSub = 0;
  if (pTableMetaInfo->pVgroupTables == NULL) {
    pState->numOfSub = pTableMetaInfo->vgroupList->numOfVgroups;
  } else {
S
Shengliang Guan 已提交
2458
    pState->numOfSub = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
H
Haojun Liao 已提交
2459 2460
  }

H
Haojun Liao 已提交
2461
  assert(pState->numOfSub > 0);
H
hjxilinx 已提交
2462
  
H
Haojun Liao 已提交
2463
  int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, &pFinalModel, nBufferSize);
H
hjxilinx 已提交
2464
  if (ret != 0) {
2465
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
Haojun Liao 已提交
2466
    tscAsyncResultOnError(pSql);
S
TD-1848  
Shengliang Guan 已提交
2467
    tfree(pMemoryBuf);
H
hjxilinx 已提交
2468
    return ret;
H
hjxilinx 已提交
2469
  }
2470

H
Haojun Liao 已提交
2471
  tscDebug("0x%"PRIx64" retrieved query data from %d vnode(s)", pSql->self, pState->numOfSub);
2472
  pSql->pSubs = calloc(pState->numOfSub, POINTER_BYTES);
H
Haojun Liao 已提交
2473
  if (pSql->pSubs == NULL) {
S
TD-1848  
Shengliang Guan 已提交
2474
    tfree(pSql->pSubs);
H
Haojun Liao 已提交
2475
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
Haojun Liao 已提交
2476
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel,pState->numOfSub);
H
Haojun Liao 已提交
2477

H
Haojun Liao 已提交
2478
    tscAsyncResultOnError(pSql);
H
Haojun Liao 已提交
2479 2480 2481
    return ret;
  }

2482 2483 2484 2485 2486 2487 2488 2489
  if (pState->states == NULL) {
    pState->states = calloc(pState->numOfSub, sizeof(*pState->states));
    if (pState->states == NULL) {
      pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      tscAsyncResultOnError(pSql);
      tfree(pMemoryBuf);
      return ret;
    }
D
fix bug  
dapan1121 已提交
2490 2491

    pthread_mutex_init(&pState->mutex, NULL);
2492 2493 2494
  }

  memset(pState->states, 0, sizeof(*pState->states) * pState->numOfSub);
H
Haojun Liao 已提交
2495
  tscDebug("0x%"PRIx64" reset all sub states to 0", pSql->self);
2496
  
H
hjxilinx 已提交
2497 2498 2499
  pRes->code = TSDB_CODE_SUCCESS;
  
  int32_t i = 0;
H
Haojun Liao 已提交
2500
  for (; i < pState->numOfSub; ++i) {
H
hjxilinx 已提交
2501 2502
    SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport));
    if (trs == NULL) {
D
dapan1121 已提交
2503
      tscError("0x%"PRIx64" failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno));
H
hjxilinx 已提交
2504 2505 2506 2507 2508
      break;
    }
    
    trs->pExtMemBuffer = pMemoryBuf;
    trs->pOrderDescriptor = pDesc;
H
Haojun Liao 已提交
2509

H
hjxilinx 已提交
2510 2511
    trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage));
    if (trs->localBuffer == NULL) {
D
dapan1121 已提交
2512
      tscError("0x%"PRIx64" failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno));
S
TD-1848  
Shengliang Guan 已提交
2513
      tfree(trs);
H
hjxilinx 已提交
2514 2515 2516
      break;
    }
    
H
Haojun Liao 已提交
2517 2518
    trs->subqueryIndex  = i;
    trs->pParentSql     = pSql;
H
hjxilinx 已提交
2519
    trs->pFinalColModel = pModel;
H
Haojun Liao 已提交
2520
    trs->pFFColModel    = pFinalModel;
H
Haojun Liao 已提交
2521

2522
    SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL);
H
hjxilinx 已提交
2523
    if (pNew == NULL) {
D
dapan1121 已提交
2524
      tscError("0x%"PRIx64" failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno));
S
TD-1848  
Shengliang Guan 已提交
2525 2526
      tfree(trs->localBuffer);
      tfree(trs);
H
hjxilinx 已提交
2527 2528 2529 2530 2531 2532 2533
      break;
    }
    
    // todo handle multi-vnode situation
    if (pQueryInfo->tsBuf) {
      SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
      pNewQueryInfo->tsBuf = tsBufClone(pQueryInfo->tsBuf);
H
Haojun Liao 已提交
2534
      assert(pNewQueryInfo->tsBuf != NULL);
H
hjxilinx 已提交
2535 2536
    }
    
H
Haojun Liao 已提交
2537
    tscDebug("0x%"PRIx64" sub:%p create subquery success. orderOfSub:%d", pSql->self, pNew, trs->subqueryIndex);
H
hjxilinx 已提交
2538 2539
  }
  
H
Haojun Liao 已提交
2540
  if (i < pState->numOfSub) {
D
dapan1121 已提交
2541
    tscError("0x%"PRIx64" failed to prepare subquery structure and launch subqueries", pSql->self);
2542
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
2543
    
H
Haojun Liao 已提交
2544
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel, pState->numOfSub);
H
Haojun Liao 已提交
2545
    doCleanupSubqueries(pSql, i);
H
hjxilinx 已提交
2546 2547 2548
    return pRes->code;   // free all allocated resource
  }
  
2549
  if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
H
Haojun Liao 已提交
2550
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel, pState->numOfSub);
H
Haojun Liao 已提交
2551
    doCleanupSubqueries(pSql, i);
H
hjxilinx 已提交
2552 2553 2554
    return pRes->code;
  }
  
H
Haojun Liao 已提交
2555
  for(int32_t j = 0; j < pState->numOfSub; ++j) {
H
hjxilinx 已提交
2556 2557 2558
    SSqlObj* pSub = pSql->pSubs[j];
    SRetrieveSupport* pSupport = pSub->param;
    
H
Haojun Liao 已提交
2559
    tscDebug("0x%"PRIx64" sub:%p launch subquery, orderOfSub:%d.", pSql->self, pSub, pSupport->subqueryIndex);
H
hjxilinx 已提交
2560 2561
    tscProcessSql(pSub);
  }
H
Haojun Liao 已提交
2562

H
hjxilinx 已提交
2563 2564 2565
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
2566 2567
static void tscFreeRetrieveSup(SSqlObj *pSql) {
  SRetrieveSupport *trsupport = pSql->param;
2568

H
Haojun Liao 已提交
2569 2570
  void* p = atomic_val_compare_exchange_ptr(&pSql->param, trsupport, 0);
  if (p == NULL) {
H
Haojun Liao 已提交
2571
    tscDebug("0x%"PRIx64" retrieve supp already released", pSql->self);
H
Haojun Liao 已提交
2572 2573 2574
    return;
  }

H
Haojun Liao 已提交
2575
  tscDebug("0x%"PRIx64" start to free subquery supp obj:%p", pSql->self, trsupport);
S
TD-1848  
Shengliang Guan 已提交
2576 2577
  tfree(trsupport->localBuffer);
  tfree(trsupport);
H
hjxilinx 已提交
2578 2579
}

2580
static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows);
2581
static void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows);
H
hjxilinx 已提交
2582

H
Haojun Liao 已提交
2583
static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t code) {
H
hjxilinx 已提交
2584
// set no disk space error info
D
dapan1121 已提交
2585
  tscError("sub:0x%"PRIx64" failed to flush data to disk, reason:%s", ((SSqlObj *)tres)->self, tstrerror(code));
H
Haojun Liao 已提交
2586
  SSqlObj* pParentSql = trsupport->pParentSql;
H
Haojun Liao 已提交
2587 2588

  pParentSql->res.code = code;
H
hjxilinx 已提交
2589
  trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
H
Haojun Liao 已提交
2590
  tscHandleSubqueryError(trsupport, tres, pParentSql->res.code);
H
hjxilinx 已提交
2591 2592
}

H
Haojun Liao 已提交
2593 2594 2595 2596
/*
 * current query failed, and the retry count is less than the available
 * count, retry query clear previous retrieved data, then launch a new sub query
 */
D
fix bug  
dapan1121 已提交
2597 2598 2599
static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32_t code, int32_t *sent) {
  *sent = 0;
  
D
fix bug  
dapan1121 已提交
2600 2601 2602 2603 2604 2605 2606 2607 2608 2609
  SRetrieveSupport *trsupport = malloc(sizeof(SRetrieveSupport));
  if (trsupport == NULL) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  memcpy(trsupport, oriTrs, sizeof(*trsupport));

  const uint32_t nBufferSize = (1u << 16u);  // 64KB
  trsupport->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage));
  if (trsupport->localBuffer == NULL) {
D
dapan1121 已提交
2610
    tscError("0x%"PRIx64" failed to malloc buffer for local buffer, reason:%s", pSql->self, strerror(errno));
D
fix bug  
dapan1121 已提交
2611 2612 2613 2614
    tfree(trsupport);
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  
H
Haojun Liao 已提交
2615 2616 2617
  SSqlObj *pParentSql = trsupport->pParentSql;
  int32_t  subqueryIndex = trsupport->subqueryIndex;

S
TD-1732  
Shengliang Guan 已提交
2618 2619
  STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
  SVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
H
Haojun Liao 已提交
2620 2621 2622 2623 2624

  tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]);

  // clear local saved number of results
  trsupport->localBuffer->num = 0;
D
dapan1121 已提交
2625
  tscError("0x%"PRIx64" sub:0x%"PRIx64" retrieve/query failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSql->self, pSql->self,
H
Haojun Liao 已提交
2626 2627
           tstrerror(code), subqueryIndex, trsupport->numOfRetry);

2628
  SSqlObj *pNew = tscCreateSTableSubquery(trsupport->pParentSql, trsupport, pSql);
H
Haojun Liao 已提交
2629
  if (pNew == NULL) {
D
dapan1121 已提交
2630 2631
    tscError("0x%"PRIx64" sub:0x%"PRIx64" failed to create new subquery due to error:%s, abort retry, vgId:%d, orderOfSub:%d",
             oriTrs->pParentSql->self, pSql->self, tstrerror(terrno), pVgroup->vgId, oriTrs->subqueryIndex);
H
Haojun Liao 已提交
2632

2633
    pParentSql->res.code = terrno;
D
fix bug  
dapan1121 已提交
2634
    oriTrs->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
H
Haojun Liao 已提交
2635

D
fix bug  
dapan1121 已提交
2636
    tfree(trsupport);
H
Haojun Liao 已提交
2637 2638 2639
    return pParentSql->res.code;
  }

2640 2641
  int32_t ret = tscProcessSql(pNew);

D
fix bug  
dapan1121 已提交
2642 2643
  *sent = 1;
  
2644 2645
  // if failed to process sql, let following code handle the pSql
  if (ret == TSDB_CODE_SUCCESS) {
D
fix bug  
dapan1121 已提交
2646
    tscFreeRetrieveSup(pSql);
2647
    taos_free_result(pSql);
H
Haojun Liao 已提交
2648
    return ret;
D
fix bug  
dapan1121 已提交
2649
  } else {    
2650
    pParentSql->pSubs[trsupport->subqueryIndex] = pSql;
D
fix bug  
dapan1121 已提交
2651 2652
    tscFreeRetrieveSup(pNew);
    taos_free_result(pNew);
H
Haojun Liao 已提交
2653
    return ret;
2654
  }
H
Haojun Liao 已提交
2655 2656
}

2657
void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
H
Haojun Liao 已提交
2658 2659 2660 2661 2662
  // it has been freed already
  if (pSql->param != trsupport || pSql->param == NULL) {
    return;
  }

H
Haojun Liao 已提交
2663
  SSqlObj *pParentSql = trsupport->pParentSql;
H
hjxilinx 已提交
2664 2665 2666
  int32_t  subqueryIndex = trsupport->subqueryIndex;
  
  assert(pSql != NULL);
H
Haojun Liao 已提交
2667

H
Haojun Liao 已提交
2668
  SSubqueryState* pState = &pParentSql->subState;
H
Haojun Liao 已提交
2669

2670
  // retrieved in subquery failed. OR query cancelled in retrieve phase.
H
Haojun Liao 已提交
2671 2672
  if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) {

H
hjxilinx 已提交
2673 2674
    /*
     * kill current sub-query connection, which may retrieve data from vnodes;
2675
     * Here we get: pPObj->res.code == TSDB_CODE_TSC_QUERY_CANCELLED
H
hjxilinx 已提交
2676 2677 2678
     */
    pSql->res.numOfRows = 0;
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;  // disable retry efforts
H
Haojun Liao 已提交
2679
    tscDebug("0x%"PRIx64" query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%s", pParentSql->self, pSql,
2680
             subqueryIndex, tstrerror(pParentSql->res.code));
H
hjxilinx 已提交
2681
  }
H
Haojun Liao 已提交
2682

H
hjxilinx 已提交
2683
  if (numOfRows >= 0) {  // current query is successful, but other sub query failed, still abort current query.
H
Haojun Liao 已提交
2684
    tscDebug("0x%"PRIx64" sub:0x%"PRIx64" retrieve numOfRows:%d,orderOfSub:%d", pParentSql->self, pSql->self, numOfRows, subqueryIndex);
D
dapan1121 已提交
2685
    tscError("0x%"PRIx64" sub:0x%"PRIx64" abort further retrieval due to other queries failure,orderOfSub:%d,code:%s", pParentSql->self, pSql->self,
2686
             subqueryIndex, tstrerror(pParentSql->res.code));
H
hjxilinx 已提交
2687
  } else {
H
Haojun Liao 已提交
2688
    if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pParentSql->res.code == TSDB_CODE_SUCCESS) {
D
fix bug  
dapan1121 已提交
2689 2690 2691 2692
      int32_t sent = 0;
      
      tscReissueSubquery(trsupport, pSql, numOfRows, &sent);
      if (sent) {
H
hjxilinx 已提交
2693 2694 2695
        return;
      }
    } else {  // reach the maximum retry count, abort
H
Haojun Liao 已提交
2696
      atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows);
D
dapan1121 已提交
2697
      tscError("0x%"PRIx64" sub:0x%"PRIx64" retrieve failed,code:%s,orderOfSub:%d failed.no more retry,set global code:%s", pParentSql->self, pSql->self,
H
Haojun Liao 已提交
2698
               tstrerror(numOfRows), subqueryIndex, tstrerror(pParentSql->res.code));
H
hjxilinx 已提交
2699 2700
    }
  }
H
Haojun Liao 已提交
2701

D
fix bug  
dapan1121 已提交
2702
  if (!subAndCheckDone(pSql, pParentSql, subqueryIndex)) {
H
Haojun Liao 已提交
2703
    tscDebug("0x%"PRIx64" sub:%p,%d freed, not finished, total:%d", pParentSql->self, pSql, trsupport->subqueryIndex, pState->numOfSub);
H
Haojun Liao 已提交
2704

H
Haojun Liao 已提交
2705
    tscFreeRetrieveSup(pSql);
S
Shengliang Guan 已提交
2706
    return;
2707
  }  
H
hjxilinx 已提交
2708 2709
  
  // all subqueries are failed
D
dapan1121 已提交
2710
  tscError("0x%"PRIx64" retrieve from %d vnode(s) completed,code:%s.FAILED.", pParentSql->self, pState->numOfSub,
H
Haojun Liao 已提交
2711 2712
      tstrerror(pParentSql->res.code));

H
hjxilinx 已提交
2713
  // release allocated resource
H
Haojun Liao 已提交
2714
  tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel, trsupport->pFFColModel,
H
Haojun Liao 已提交
2715
                            pState->numOfSub);
H
hjxilinx 已提交
2716
  
H
Haojun Liao 已提交
2717
  tscFreeRetrieveSup(pSql);
2718

H
hjxilinx 已提交
2719
  // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
2720 2721 2722 2723 2724 2725
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0);

  if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
    (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code);
  } else {  // regular super table query
    if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2726
      tscAsyncResultOnError(pParentSql);
2727 2728
    }
  }
H
hjxilinx 已提交
2729 2730
}

2731 2732
static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* pSql) {
  int32_t           idx = trsupport->subqueryIndex;
H
Haojun Liao 已提交
2733
  SSqlObj *         pParentSql = trsupport->pParentSql;
2734 2735
  tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
  
H
Haojun Liao 已提交
2736
  SSubqueryState* pState = &pParentSql->subState;
2737 2738
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  
2739 2740
  STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
  
2741
  // data in from current vnode is stored in cache and disk
S
Shengliang Guan 已提交
2742
  uint32_t numOfRowsFromSubquery = (uint32_t)(trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num);
H
Haojun Liao 已提交
2743
  SVgroupsInfo* vgroupsInfo = pTableMetaInfo->vgroupList;
H
Haojun Liao 已提交
2744
  tscDebug("0x%"PRIx64" sub:%p all data retrieved from ep:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pParentSql->self, pSql,
H
Haojun Liao 已提交
2745
           vgroupsInfo->vgroups[0].epAddr[0].fqdn, vgroupsInfo->vgroups[0].vgId, numOfRowsFromSubquery, idx);
2746 2747 2748 2749
  
  tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity);

#ifdef _DEBUG_VIEW
2750
  printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->num);
2751 2752
    SSrcColumnInfo colInfo[256] = {0};
    tscGetSrcColumnInfo(colInfo, pQueryInfo);
2753 2754
    tColModelDisplayEx(pDesc->pColumnModel, trsupport->localBuffer->data, trsupport->localBuffer->num,
                       trsupport->localBuffer->num, colInfo);
2755 2756
#endif
  
H
Haojun Liao 已提交
2757
  if (tsTotalTmpDirGB != 0 && tsAvailTmpDirectorySpace < tsReservedTmpDirectorySpace) {
D
dapan1121 已提交
2758
    tscError("0x%"PRIx64" sub:0x%"PRIx64" client disk space remain %.3f GB, need at least %.3f GB, stop query", pParentSql->self, pSql->self,
H
Haojun Liao 已提交
2759
             tsAvailTmpDirectorySpace, tsReservedTmpDirectorySpace);
S
Shengliang Guan 已提交
2760 2761
    tscAbortFurtherRetryRetrieval(trsupport, pSql, TSDB_CODE_TSC_NO_DISKSPACE);
    return;
2762 2763 2764
  }
  
  // each result for a vnode is ordered as an independant list,
H
Haojun Liao 已提交
2765
  // then used as an input of loser tree for disk-based merge
2766 2767
  int32_t code = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pQueryInfo->groupbyExpr.orderType);
  if (code != 0) { // set no disk space error info, and abort retry
S
Shengliang Guan 已提交
2768 2769
    tscAbortFurtherRetryRetrieval(trsupport, pSql, code);
    return;
2770 2771
  }
  
D
fix bug  
dapan1121 已提交
2772
  if (!subAndCheckDone(pSql, pParentSql, idx)) {
H
Haojun Liao 已提交
2773
    tscDebug("0x%"PRIx64" sub:%p orderOfSub:%d freed, not finished", pParentSql->self, pSql, trsupport->subqueryIndex);
H
Haojun Liao 已提交
2774

H
Haojun Liao 已提交
2775
    tscFreeRetrieveSup(pSql);
S
Shengliang Guan 已提交
2776
    return;
2777
  }  
2778 2779 2780 2781
  
  // all sub-queries are returned, start to local merge process
  pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage;
  
H
Haojun Liao 已提交
2782 2783
  tscDebug("0x%"PRIx64" retrieve from %d vnodes completed.final NumOfRows:%" PRId64 ",start to build loser tree",
      pParentSql->self, pState->numOfSub, pState->numOfRetrievedRows);
2784
  
H
Haojun Liao 已提交
2785
  SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0);
2786 2787
  tscClearInterpInfo(pPQueryInfo);
  
H
Haojun Liao 已提交
2788
  tscCreateLocalMerger(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, trsupport->pFinalColModel, trsupport->pFFColModel, pParentSql);
H
Haojun Liao 已提交
2789
  tscDebug("0x%"PRIx64" build loser tree completed", pParentSql->self);
2790
  
H
Haojun Liao 已提交
2791 2792 2793
  pParentSql->res.precision = pSql->res.precision;
  pParentSql->res.numOfRows = 0;
  pParentSql->res.row = 0;
2794
  
H
Haojun Liao 已提交
2795
  tscFreeRetrieveSup(pSql);
H
Haojun Liao 已提交
2796

2797 2798 2799 2800 2801
  // set the command flag must be after the semaphore been correctly set.
  pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE;
  if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
    (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
  } else {
H
Haojun Liao 已提交
2802
    tscAsyncResultOnError(pParentSql);
2803
  }
2804 2805 2806
}

static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
H
Haojun Liao 已提交
2807 2808 2809 2810
  SSqlObj *pSql = (SSqlObj *)tres;
  assert(pSql != NULL);

  // this query has been freed already
H
hjxilinx 已提交
2811
  SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
H
Haojun Liao 已提交
2812
  if (pSql->param == NULL || param == NULL) {
H
Haojun Liao 已提交
2813
    tscDebug("0x%"PRIx64" already freed in dnodecallback", pSql->self);
H
Haojun Liao 已提交
2814 2815 2816
    return;
  }

H
hjxilinx 已提交
2817
  tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
H
Haojun Liao 已提交
2818
  int32_t           idx   = trsupport->subqueryIndex;
H
Haojun Liao 已提交
2819
  SSqlObj *         pParentSql = trsupport->pParentSql;
H
Haojun Liao 已提交
2820

H
Haojun Liao 已提交
2821
  SSubqueryState* pState = &pParentSql->subState;
H
hjxilinx 已提交
2822
  
H
Haojun Liao 已提交
2823
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
TD-1732  
Shengliang Guan 已提交
2824
  SVgroupInfo  *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
H
Haojun Liao 已提交
2825 2826 2827

  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
H
Haojun Liao 已提交
2828 2829
    tscDebug("0x%"PRIx64" query cancelled/failed, sub:%p, vgId:%d, orderOfSub:%d, code:%s, global code:%s",
             pParentSql->self, pSql, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(numOfRows), tstrerror(pParentSql->res.code));
H
Haojun Liao 已提交
2830 2831 2832 2833 2834 2835 2836 2837

    tscHandleSubqueryError(param, tres, numOfRows);
    return;
  }

  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(numOfRows == taos_errno(pSql));

H
Haojun Liao 已提交
2838 2839 2840 2841
    if (numOfRows == TSDB_CODE_TSC_QUERY_CANCELLED) {
      trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
    }

H
Haojun Liao 已提交
2842
    if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
D
dapan1121 已提交
2843
      tscError("0x%"PRIx64" sub:0x%"PRIx64" failed code:%s, retry:%d", pParentSql->self, pSql->self, tstrerror(numOfRows), trsupport->numOfRetry);
H
Haojun Liao 已提交
2844

D
fix bug  
dapan1121 已提交
2845 2846 2847 2848
      int32_t sent = 0;
      
      tscReissueSubquery(trsupport, pSql, numOfRows, &sent);
      if (sent) {
H
Haojun Liao 已提交
2849 2850 2851
        return;
      }
    } else {
H
Haojun Liao 已提交
2852
      tscDebug("0x%"PRIx64" sub:%p reach the max retry times, set global code:%s", pParentSql->self, pSql, tstrerror(numOfRows));
H
Haojun Liao 已提交
2853 2854 2855 2856 2857
      atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows);  // set global code and abort
    }

    tscHandleSubqueryError(param, tres, numOfRows);
    return;
H
hjxilinx 已提交
2858 2859 2860 2861 2862 2863 2864 2865 2866
  }
  
  SSqlRes *   pRes = &pSql->res;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  
  if (numOfRows > 0) {
    assert(pRes->numOfRows == numOfRows);
    int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows);
    
H
Haojun Liao 已提交
2867 2868
    tscDebug("0x%"PRIx64" sub:%p retrieve numOfRows:%d totalNumOfRows:%" PRIu64 " from ep:%s, orderOfSub:%d",
        pParentSql->self, pSql, pRes->numOfRows, pState->numOfRetrievedRows, pSql->epSet.fqdn[pSql->epSet.inUse], idx);
2869

Y
yihaoDeng 已提交
2870
    if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0) && !(tscGetQueryInfoDetail(&pParentSql->cmd, 0)->distinctTag)) {
D
dapan1121 已提交
2871 2872
      tscError("0x%"PRIx64" sub:0x%"PRIx64" num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64,
               pParentSql->self, pSql->self, tsMaxNumOfOrderedResults, num);
S
Shengliang Guan 已提交
2873 2874
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_SORTED_RES_TOO_MANY);
      return;
H
hjxilinx 已提交
2875 2876 2877
    }

#ifdef _DEBUG_VIEW
2878
    printf("received data from vnode: %"PRIu64" rows\n", pRes->numOfRows);
H
hjxilinx 已提交
2879 2880 2881 2882 2883
    SSrcColumnInfo colInfo[256] = {0};

    tscGetSrcColumnInfo(colInfo, pQueryInfo);
    tColModelDisplayEx(pDesc->pColumnModel, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo);
#endif
2884
    
H
Haojun Liao 已提交
2885 2886
    // no disk space for tmp directory
    if (tsTotalTmpDirGB != 0 && tsAvailTmpDirectorySpace < tsReservedTmpDirectorySpace) {
D
dapan1121 已提交
2887
      tscError("0x%"PRIx64" sub:0x%"PRIx64" client disk space remain %.3f GB, need at least %.3f GB, stop query", pParentSql->self, pSql->self,
H
Haojun Liao 已提交
2888
               tsAvailTmpDirectorySpace, tsReservedTmpDirectorySpace);
S
Shengliang Guan 已提交
2889 2890
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_NO_DISKSPACE);
      return;
H
hjxilinx 已提交
2891 2892 2893
    }
    
    int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data,
H
Haojun Liao 已提交
2894
                               pRes->numOfRows, pQueryInfo->groupbyExpr.orderType);
2895
    if (ret != 0) { // set no disk space error info, and abort retry
2896
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_NO_DISKSPACE);
2897 2898 2899 2900
    } else if (pRes->completed) {
      tscAllDataRetrievedFromDnode(trsupport, pSql);
    } else { // continue fetch data from dnode
      taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
H
hjxilinx 已提交
2901
    }
2902
    
2903 2904
  } else { // all data has been retrieved to client
    tscAllDataRetrievedFromDnode(trsupport, pSql);
H
hjxilinx 已提交
2905 2906 2907
  }
}

2908
static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
H
hjxilinx 已提交
2909 2910
  const int32_t table_index = 0;
  
2911
  SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, TSDB_SQL_SELECT, prevSqlObj);
H
hjxilinx 已提交
2912 2913
  if (pNew != NULL) {  // the sub query of two-stage super table query
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
2914

H
hjxilinx 已提交
2915
    pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
H
Haojun Liao 已提交
2916 2917 2918 2919 2920

    // clear the limit/offset info, since it should not be sent to vnode to be executed.
    pQueryInfo->limit.limit = -1;
    pQueryInfo->limit.offset = 0;

H
Haojun Liao 已提交
2921
    assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1 && trsupport->subqueryIndex < pSql->subState.numOfSub);
H
hjxilinx 已提交
2922
    
H
hjxilinx 已提交
2923
    // launch subquery for each vnode, so the subquery index equals to the vgroupIndex.
H
hjxilinx 已提交
2924
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index);
H
hjxilinx 已提交
2925
    pTableMetaInfo->vgroupIndex = trsupport->subqueryIndex;
H
Haojun Liao 已提交
2926

H
hjxilinx 已提交
2927 2928 2929 2930 2931 2932
    pSql->pSubs[trsupport->subqueryIndex] = pNew;
  }
  
  return pNew;
}

2933
// todo there is are race condition in this function, while cancel is called by user.
H
hjxilinx 已提交
2934
void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
2935 2936 2937 2938 2939 2940 2941
  // the param may be null, since it may be done by other query threads. and the asyncOnError may enter in this
  // function while kill query by a user.
  if (param == NULL) {
    assert(code != TSDB_CODE_SUCCESS);
    return;
  }

2942
  SRetrieveSupport *trsupport = (SRetrieveSupport *) param;
H
hjxilinx 已提交
2943
  
H
Haojun Liao 已提交
2944
  SSqlObj*  pParentSql = trsupport->pParentSql;
2945
  SSqlObj*  pSql = (SSqlObj *) tres;
2946

2947 2948
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  assert(pSql->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1);
H
hjxilinx 已提交
2949
  
2950
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
TD-1732  
Shengliang Guan 已提交
2951
  SVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[trsupport->subqueryIndex];
H
Haojun Liao 已提交
2952

H
Haojun Liao 已提交
2953
  // stable query killed or other subquery failed, all query stopped
H
Haojun Liao 已提交
2954
  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
2955
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
D
dapan1121 已提交
2956 2957
    tscError("0x%"PRIx64" query cancelled or failed, sub:0x%"PRIx64", vgId:%d, orderOfSub:%d, code:%s, global code:%s",
        pParentSql->self, pSql->self, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(code), tstrerror(pParentSql->res.code));
H
Haojun Liao 已提交
2958 2959 2960

    tscHandleSubqueryError(param, tres, code);
    return;
H
hjxilinx 已提交
2961 2962 2963
  }
  
  /*
H
Haojun Liao 已提交
2964
   * if a subquery on a vnode failed, all retrieve operations from vnode that occurs later
2965
   * than this one are actually not necessary, we simply call the tscRetrieveFromDnodeCallBack
H
hjxilinx 已提交
2966 2967
   * function to abort current and remain retrieve process.
   *
2968
   * NOTE: thread safe is required.
H
hjxilinx 已提交
2969
   */
H
Haojun Liao 已提交
2970 2971
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(code == taos_errno(pSql));
2972

H
Haojun Liao 已提交
2973
    if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
D
dapan1121 已提交
2974
      tscError("0x%"PRIx64" sub:0x%"PRIx64" failed code:%s, retry:%d", pParentSql->self, pSql->self, tstrerror(code), trsupport->numOfRetry);
D
fix bug  
dapan1121 已提交
2975 2976 2977 2978 2979
      
      int32_t sent = 0;

      tscReissueSubquery(trsupport, pSql, code, &sent);
      if (sent) {
H
hjxilinx 已提交
2980 2981
        return;
      }
2982
    } else {
D
dapan1121 已提交
2983
      tscError("0x%"PRIx64" sub:0x%"PRIx64" reach the max retry times, set global code:%s", pParentSql->self, pSql->self, tstrerror(code));
H
Haojun Liao 已提交
2984
      atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code);  // set global code and abort
2985
    }
H
Haojun Liao 已提交
2986 2987 2988 2989 2990

    tscHandleSubqueryError(param, tres, pParentSql->res.code);
    return;
  }

H
Haojun Liao 已提交
2991 2992
  tscDebug("0x%"PRIx64" sub:0x%"PRIx64" query complete, ep:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSql->self,
      pSql->self, pVgroup->epAddr[pSql->epSet.inUse].fqdn, pVgroup->vgId, trsupport->subqueryIndex);
H
Haojun Liao 已提交
2993

H
Haojun Liao 已提交
2994
  if (pSql->res.qId == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode
H
Haojun Liao 已提交
2995 2996 2997
    tscRetrieveFromDnodeCallBack(param, pSql, 0);
  } else {
    taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
H
hjxilinx 已提交
2998 2999 3000
  }
}

3001 3002
static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) {
  if (pParentObj->retry > pParentObj->maxRetry) {
D
dapan1121 已提交
3003
    tscError("0x%"PRIx64" max retry reached, abort the retry effort", pParentObj->self);
3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023
    return false;
  }

  for (int32_t i = 0; i < numOfSub; ++i) {
    int32_t code = pParentObj->pSubs[i]->res.code;
    if (code == TSDB_CODE_SUCCESS) {
      continue;
    }

    if (code != TSDB_CODE_TDB_TABLE_RECONFIGURE && code != TSDB_CODE_TDB_INVALID_TABLE_ID &&
        code != TSDB_CODE_VND_INVALID_VGROUP_ID && code != TSDB_CODE_RPC_NETWORK_UNAVAIL &&
        code != TSDB_CODE_APP_NOT_READY) {
      pParentObj->res.code = code;
      return false;
    }
  }

  return true;
}

H
Haojun Liao 已提交
3024 3025 3026 3027 3028 3029 3030 3031 3032
static void doFreeInsertSupporter(SSqlObj* pSqlObj) {
  assert(pSqlObj != NULL && pSqlObj->subState.numOfSub > 0);

  for(int32_t i = 0; i < pSqlObj->subState.numOfSub; ++i) {
    SSqlObj* pSql = pSqlObj->pSubs[i];
    tfree(pSql->param);
  }
}

H
Haojun Liao 已提交
3033
static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) {
H
hjxilinx 已提交
3034 3035
  SInsertSupporter *pSupporter = (SInsertSupporter *)param;
  SSqlObj* pParentObj = pSupporter->pSql;
H
Haojun Liao 已提交
3036

H
Haojun Liao 已提交
3037
  // record the total inserted rows
H
Haojun Liao 已提交
3038
  if (numOfRows > 0) {
D
fix bug  
dapan1121 已提交
3039
    atomic_add_fetch_32(&pParentObj->res.numOfRows, numOfRows);
H
Haojun Liao 已提交
3040 3041
  }

H
Haojun Liao 已提交
3042
  if (taos_errno(tres) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3043 3044 3045 3046
    SSqlObj* pSql = (SSqlObj*) tres;
    assert(pSql != NULL && pSql->res.code == numOfRows);
    
    pParentObj->res.code = pSql->res.code;
H
Haojun Liao 已提交
3047

3048 3049 3050 3051 3052
    // set the flag in the parent sqlObj
    if (pSql->cmd.submitSchema) {
      pParentObj->cmd.submitSchema = 1;
    }
  }
H
Haojun Liao 已提交
3053

D
fix bug  
dapan1121 已提交
3054
  if (!subAndCheckDone(tres, pParentObj, pSupporter->index)) {
H
Haojun Liao 已提交
3055
    tscDebug("0x%"PRIx64" insert:%p,%d completed, total:%d", pParentObj->self, tres, pSupporter->index, pParentObj->subState.numOfSub);
H
hjxilinx 已提交
3056 3057
    return;
  }
H
Haojun Liao 已提交
3058

H
hjxilinx 已提交
3059 3060
  // restore user defined fp
  pParentObj->fp = pParentObj->fetchFp;
3061
  int32_t numOfSub = pParentObj->subState.numOfSub;
3062
  doFreeInsertSupporter(pParentObj);
3063 3064

  if (pParentObj->res.code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3065
    tscDebug("0x%"PRIx64" Async insertion completed, total inserted:%d", pParentObj->self, pParentObj->res.numOfRows);
3066 3067 3068 3069 3070 3071

    // todo remove this parameter in async callback function definition.
    // all data has been sent to vnode, call user function
    int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows;
    (*pParentObj->fp)(pParentObj->param, pParentObj, v);
  } else {
3072
    if (!needRetryInsert(pParentObj, numOfSub)) {
H
Haojun Liao 已提交
3073
      tscAsyncResultOnError(pParentObj);
3074 3075
      return;
    }
3076

3077
    int32_t numOfFailed = 0;
3078 3079 3080 3081 3082 3083
    for(int32_t i = 0; i < numOfSub; ++i) {
      SSqlObj* pSql = pParentObj->pSubs[i];
      if (pSql->res.code != TSDB_CODE_SUCCESS) {
        numOfFailed += 1;

        // clean up tableMeta in cache
D
fix bug  
dapan1121 已提交
3084
        tscFreeQueryInfo(&pSql->cmd, false);
3085 3086
        SQueryInfo* pQueryInfo = tscGetQueryInfoDetailSafely(&pSql->cmd, 0);
        STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, pSql->cmd.clauseIndex, 0);
3087
        tscAddTableMetaInfo(pQueryInfo, &pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL);
3088

3089 3090
        subquerySetState(pSql, &pParentObj->subState, i, 0);

H
Haojun Liao 已提交
3091
        tscDebug("0x%"PRIx64", failed sub:%d, %p", pParentObj->self, i, pSql);
3092 3093 3094
      }
    }

D
dapan1121 已提交
3095
    tscError("0x%"PRIx64" Async insertion completed, total inserted:%d rows, numOfFailed:%d, numOfTotal:%d", pParentObj->self,
3096 3097
             pParentObj->res.numOfRows, numOfFailed, numOfSub);

H
Haojun Liao 已提交
3098
    tscDebug("0x%"PRIx64" cleanup %d tableMeta in hashTable", pParentObj->self, pParentObj->cmd.numOfTables);
3099
    for(int32_t i = 0; i < pParentObj->cmd.numOfTables; ++i) {
3100 3101
      char name[TSDB_TABLE_FNAME_LEN] = {0};
      tNameExtractFullName(pParentObj->cmd.pTableNameList[i], name);
3102
      taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
3103
    }
3104

3105 3106
    pParentObj->cmd.parseFinished = false;

3107
    tscResetSqlCmd(&pParentObj->cmd, false);
3108

H
Haojun Liao 已提交
3109 3110 3111
    // in case of insert, redo parsing the sql string and build new submit data block for two reasons:
    // 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly.
    // 2. vnode may need the schema information along with submit block to update its local table schema.
H
Haojun Liao 已提交
3112
    tscDebug("0x%"PRIx64" re-parse sql to generate submit data, retry:%d", pParentObj->self, pParentObj->retry);
S
TD-2475  
Shengliang Guan 已提交
3113 3114
    pParentObj->retry++;

3115 3116 3117 3118 3119
    int32_t code = tsParseSql(pParentObj, true);
    if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;

    if (code != TSDB_CODE_SUCCESS) {
      pParentObj->res.code = code;
H
Haojun Liao 已提交
3120
      tscAsyncResultOnError(pParentObj);
3121 3122 3123 3124 3125
      return;
    }

    tscDoQuery(pParentObj);
  }
3126 3127 3128 3129 3130 3131 3132
}

/**
 * it is a subquery, so after parse the sql string, copy the submit block to payload of itself
 * @param pSql
 * @return
 */
3133
int32_t tscHandleInsertRetry(SSqlObj* pParent, SSqlObj* pSql) {
3134 3135 3136 3137
  assert(pSql != NULL && pSql->param != NULL);
  SSqlRes* pRes = &pSql->res;

  SInsertSupporter* pSupporter = (SInsertSupporter*) pSql->param;
H
Haojun Liao 已提交
3138
  assert(pSupporter->index < pSupporter->pSql->subState.numOfSub);
3139

3140
  STableDataBlocks* pTableDataBlock = taosArrayGetP(pParent->cmd.pDataBlocks, pSupporter->index);
H
Haojun Liao 已提交
3141 3142 3143
  int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock);

  if ((pRes->code = code)!= TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3144
    tscAsyncResultOnError(pSql);
H
Haojun Liao 已提交
3145
    return code;  // here the pSql may have been released already.
3146 3147 3148
  }

  return tscProcessSql(pSql);
H
hjxilinx 已提交
3149 3150 3151 3152
}

int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
H
Haojun Liao 已提交
3153 3154
  SSqlRes *pRes = &pSql->res;

3155 3156
  // it is the failure retry insert
  if (pSql->pSubs != NULL) {
D
dapan1121 已提交
3157 3158 3159 3160 3161 3162 3163
    int32_t blockNum = (int32_t)taosArrayGetSize(pCmd->pDataBlocks);
    if (pSql->subState.numOfSub != blockNum) {
      tscError("0x%"PRIx64" sub num:%d is not same with data block num:%d", pSql->self, pSql->subState.numOfSub, blockNum);
      pRes->code = TSDB_CODE_TSC_APP_ERROR;
      return pRes->code;
    }
    
3164 3165
    for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
      SSqlObj* pSub = pSql->pSubs[i];
3166 3167 3168
      SInsertSupporter* pSup = calloc(1, sizeof(SInsertSupporter));
      pSup->index = i;
      pSup->pSql = pSql;
3169

3170
      pSub->param = pSup;
H
Haojun Liao 已提交
3171
      tscDebug("0x%"PRIx64" sub:%p launch sub insert, orderOfSub:%d", pSql->self, pSub, i);
3172 3173 3174 3175 3176 3177 3178 3179
      if (pSub->res.code != TSDB_CODE_SUCCESS) {
        tscHandleInsertRetry(pSql, pSub);
      }
    }

    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
3180 3181
  pSql->subState.numOfSub = (uint16_t)taosArrayGetSize(pCmd->pDataBlocks);
  assert(pSql->subState.numOfSub > 0);
H
Haojun Liao 已提交
3182 3183

  pRes->code = TSDB_CODE_SUCCESS;
3184

H
Haojun Liao 已提交
3185 3186 3187
  // the number of already initialized subqueries
  int32_t numOfSub = 0;

3188 3189 3190 3191 3192 3193
  if (pSql->subState.states == NULL) {
    pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states));
    if (pSql->subState.states == NULL) {
      pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      goto _error;
    }
D
fix bug  
dapan1121 已提交
3194 3195

    pthread_mutex_init(&pSql->subState.mutex, NULL);
3196 3197 3198
  }

  memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub);
H
Haojun Liao 已提交
3199
  tscDebug("0x%"PRIx64" reset all sub states to 0", pSql->self);
3200

H
Haojun Liao 已提交
3201
  pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
H
Haojun Liao 已提交
3202 3203 3204 3205
  if (pSql->pSubs == NULL) {
    goto _error;
  }

H
Haojun Liao 已提交
3206
  tscDebug("0x%"PRIx64" submit data to %d vnode(s)", pSql->self, pSql->subState.numOfSub);
3207

H
Haojun Liao 已提交
3208
  while(numOfSub < pSql->subState.numOfSub) {
H
Haojun Liao 已提交
3209
    SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter));
H
Haojun Liao 已提交
3210 3211 3212 3213
    if (pSupporter == NULL) {
      goto _error;
    }

3214 3215 3216 3217
    pSupporter->pSql   = pSql;
    pSupporter->index  = numOfSub;

    SSqlObj *pNew = createSimpleSubObj(pSql, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT);
H
hjxilinx 已提交
3218
    if (pNew == NULL) {
D
dapan1121 已提交
3219
      tscError("0x%"PRIx64" failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql->self, numOfSub, strerror(errno));
H
Haojun Liao 已提交
3220
      goto _error;
H
hjxilinx 已提交
3221
    }
3222 3223 3224
  
    /*
     * assign the callback function to fetchFp to make sure that the error process function can restore
H
Haojun Liao 已提交
3225
     * the callback function (multiVnodeInsertFinalize) correctly.
3226 3227
     */
    pNew->fetchFp = pNew->fp;
H
Haojun Liao 已提交
3228
    pSql->pSubs[numOfSub] = pNew;
H
Haojun Liao 已提交
3229

3230 3231
    STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, numOfSub);
    pRes->code = tscCopyDataBlockToPayload(pNew, pTableDataBlock);
H
Haojun Liao 已提交
3232
    if (pRes->code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3233
      tscDebug("0x%"PRIx64" sub:%p create subObj success. orderOfSub:%d", pSql->self, pNew, numOfSub);
3234
      numOfSub++;
H
Haojun Liao 已提交
3235
    } else {
H
Haojun Liao 已提交
3236
      tscDebug("0x%"PRIx64" prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%s", pSql->self, numOfSub,
H
Haojun Liao 已提交
3237
               pSql->subState.numOfSub, tstrerror(pRes->code));
H
Haojun Liao 已提交
3238
      goto _error;
H
Haojun Liao 已提交
3239
    }
H
hjxilinx 已提交
3240 3241
  }
  
H
Haojun Liao 已提交
3242
  if (numOfSub < pSql->subState.numOfSub) {
D
dapan1121 已提交
3243
    tscError("0x%"PRIx64" failed to prepare subObj structure and launch sub-insertion", pSql->self);
3244
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
Haojun Liao 已提交
3245
    goto _error;
H
hjxilinx 已提交
3246
  }
H
Haojun Liao 已提交
3247

3248 3249
  pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);

H
Haojun Liao 已提交
3250 3251
  // use the local variable
  for (int32_t j = 0; j < numOfSub; ++j) {
H
hjxilinx 已提交
3252
    SSqlObj *pSub = pSql->pSubs[j];
H
Haojun Liao 已提交
3253
    tscDebug("0x%"PRIx64" sub:%p launch sub insert, orderOfSub:%d", pSql->self, pSub, j);
H
hjxilinx 已提交
3254 3255
    tscProcessSql(pSub);
  }
H
Haojun Liao 已提交
3256

H
hjxilinx 已提交
3257
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
3258 3259 3260

  _error:
  return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
3261
}
H
hjxilinx 已提交
3262

H
Haojun Liao 已提交
3263 3264 3265
static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t columnIndex, int16_t* bytes) {
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

H
Haojun Liao 已提交
3266
  SInternalField* pInfo = (SInternalField*) TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, columnIndex);
H
Haojun Liao 已提交
3267 3268 3269
  assert(pInfo->pSqlExpr != NULL);

  *bytes = pInfo->pSqlExpr->resBytes;
H
Haojun Liao 已提交
3270
  char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows + pRes->row * (*bytes);
H
Haojun Liao 已提交
3271 3272 3273 3274 3275 3276 3277 3278 3279 3280

  return pData;
}

static void doBuildResFromSubqueries(SSqlObj* pSql) {
  SSqlRes* pRes = &pSql->res;

  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);

  int32_t numOfRes = INT32_MAX;
H
Haojun Liao 已提交
3281
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
Haojun Liao 已提交
3282 3283
    SSqlObj* pSub = pSql->pSubs[i];
    if (pSub == NULL) {
H
Haojun Liao 已提交
3284 3285 3286
      continue;
    }

S
Shengliang Guan 已提交
3287
    int32_t remain = (int32_t)(pSub->res.numOfRows - pSub->res.row);
H
Haojun Liao 已提交
3288
    numOfRes = (int32_t)(MIN(numOfRes, remain));
H
Haojun Liao 已提交
3289 3290
  }

H
Haojun Liao 已提交
3291 3292
  if (numOfRes == 0) {  // no result any more, free all subquery objects
    freeJoinSubqueryObj(pSql);
H
Haojun Liao 已提交
3293 3294 3295
    return;
  }

H
Haojun Liao 已提交
3296
  int32_t rowSize = tscGetResRowLength(pQueryInfo->exprList);
H
Haojun Liao 已提交
3297

H
Haojun Liao 已提交
3298 3299
  assert(numOfRes * rowSize > 0);
  char* tmp = realloc(pRes->pRsp, numOfRes * rowSize + sizeof(tFilePage));
H
Haojun Liao 已提交
3300 3301 3302 3303 3304 3305 3306
  if (tmp == NULL) {
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return;
  } else {
    pRes->pRsp = tmp;
  }

H
Haojun Liao 已提交
3307 3308
  tFilePage* pFilePage = (tFilePage*) pRes->pRsp;
  pFilePage->num = numOfRes;
H
Haojun Liao 已提交
3309

H
Haojun Liao 已提交
3310
  pRes->data = pFilePage->data;
H
Haojun Liao 已提交
3311
  char* data = pRes->data;
H
Haojun Liao 已提交
3312

H
Haojun Liao 已提交
3313 3314 3315 3316 3317
  int16_t bytes = 0;

  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
  for(int32_t i = 0; i < numOfExprs; ++i) {
    SColumnIndex* pIndex = &pRes->pColumnIndex[i];
H
Haojun Liao 已提交
3318 3319
    SSqlRes*      pRes1 = &pSql->pSubs[pIndex->tableIndex]->res;
    SSqlCmd*      pCmd1 = &pSql->pSubs[pIndex->tableIndex]->cmd;
H
Haojun Liao 已提交
3320 3321 3322 3323 3324

    char* pData = getResultBlockPosition(pCmd1, pRes1, pIndex->columnIndex, &bytes);
    memcpy(data, pData, bytes * numOfRes);

    data += bytes * numOfRes;
H
Haojun Liao 已提交
3325 3326 3327 3328 3329 3330 3331 3332 3333 3334
  }

  for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
    SSqlObj* pSub = pSql->pSubs[i];
    if (pSub == NULL) {
      continue;
    }

    pSub->res.row += numOfRes;
    assert(pSub->res.row <= pSub->res.numOfRows);
H
Haojun Liao 已提交
3335 3336 3337 3338
  }

  pRes->numOfRows = numOfRes;
  pRes->numOfClauseTotal += numOfRes;
H
Haojun Liao 已提交
3339 3340 3341 3342 3343 3344 3345 3346 3347 3348 3349

  int32_t finalRowSize = 0;
  for(int32_t i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
    TAOS_FIELD* pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
    finalRowSize += pField->bytes;
  }

  doArithmeticCalculate(pQueryInfo, pFilePage, rowSize, finalRowSize);

  pRes->data = pFilePage->data;
  tscSetResRawPtr(pRes, pQueryInfo);
H
Haojun Liao 已提交
3350 3351
}

H
hjxilinx 已提交
3352
void tscBuildResFromSubqueries(SSqlObj *pSql) {
H
Haojun Liao 已提交
3353 3354
  SSqlRes* pRes = &pSql->res;

H
hjxilinx 已提交
3355
  if (pRes->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3356
    tscAsyncResultOnError(pSql);
H
hjxilinx 已提交
3357 3358
    return;
  }
H
Haojun Liao 已提交
3359 3360 3361

  if (pRes->tsrow == NULL) {
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
H
Haojun Liao 已提交
3362
    pRes->numOfCols = (int16_t) tscSqlExprNumOfExprs(pQueryInfo);
H
Haojun Liao 已提交
3363

H
Haojun Liao 已提交
3364 3365 3366 3367
    pRes->tsrow  = calloc(pRes->numOfCols, POINTER_BYTES);
    pRes->urow   = calloc(pRes->numOfCols, POINTER_BYTES);
    pRes->buffer = calloc(pRes->numOfCols, POINTER_BYTES);
    pRes->length = calloc(pRes->numOfCols, sizeof(int32_t));
H
Haojun Liao 已提交
3368

H
Haojun Liao 已提交
3369 3370
    if (pRes->tsrow == NULL || pRes->buffer == NULL || pRes->length == NULL) {
      pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
Haojun Liao 已提交
3371
      tscAsyncResultOnError(pSql);
H
Haojun Liao 已提交
3372 3373 3374
      return;
    }

3375
    tscRestoreFuncForSTableQuery(pQueryInfo);
H
Haojun Liao 已提交
3376 3377
  }

H
Haojun Liao 已提交
3378 3379 3380 3381 3382
  assert (pRes->row >= pRes->numOfRows);
  doBuildResFromSubqueries(pSql);
  if (pRes->code == TSDB_CODE_SUCCESS) {
    (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
  } else {
H
Haojun Liao 已提交
3383
    tscAsyncResultOnError(pSql);
H
hjxilinx 已提交
3384 3385 3386
  }
}

H
Haojun Liao 已提交
3387
char *getArithmeticInputSrc(void *param, const char *name, int32_t colId) {
3388 3389 3390 3391 3392 3393 3394
  SArithmeticSupport *pSupport = (SArithmeticSupport *) param;

  int32_t index = -1;
  SSqlExpr* pExpr = NULL;
  
  for (int32_t i = 0; i < pSupport->numOfCols; ++i) {
    pExpr = taosArrayGetP(pSupport->exprList, i);
B
Bomin Zhang 已提交
3395
    if (strncmp(name, pExpr->aliasName, sizeof(pExpr->aliasName) - 1) == 0) {
3396 3397 3398 3399 3400 3401 3402 3403 3404
      index = i;
      break;
    }
  }

  assert(index >= 0 && index < pSupport->numOfCols);
  return pSupport->data[index] + pSupport->offset * pExpr->resBytes;
}

3405
TAOS_ROW doSetResultRowData(SSqlObj *pSql) {
H
hjxilinx 已提交
3406 3407
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
H
Haojun Liao 已提交
3408

H
hjxilinx 已提交
3409 3410
  assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows);
  if (pRes->row >= pRes->numOfRows) {  // all the results has returned to invoker
S
TD-1848  
Shengliang Guan 已提交
3411
    tfree(pRes->tsrow);
H
hjxilinx 已提交
3412 3413
    return pRes->tsrow;
  }
H
Haojun Liao 已提交
3414

H
hjxilinx 已提交
3415
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
Haojun Liao 已提交
3416

H
Haojun Liao 已提交
3417 3418
  size_t size = tscNumOfFields(pQueryInfo);
  for (int i = 0; i < size; ++i) {
3419
    SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
H
Haojun Liao 已提交
3420

H
Haojun Liao 已提交
3421
    int32_t type  = pInfo->field.type;
3422
    int32_t bytes = pInfo->field.bytes;
H
Haojun Liao 已提交
3423

3424 3425 3426 3427 3428
    if (type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR) {
      pRes->tsrow[i] = isNull(pRes->urow[i], type) ? NULL : pRes->urow[i];
    } else {
      pRes->tsrow[i] = isNull(pRes->urow[i], type) ? NULL : varDataVal(pRes->urow[i]);
      pRes->length[i] = varDataLen(pRes->urow[i]);
H
hjxilinx 已提交
3429
    }
H
Haojun Liao 已提交
3430

H
Haojun Liao 已提交
3431
    ((char**) pRes->urow)[i] += bytes;
H
hjxilinx 已提交
3432
  }
H
Haojun Liao 已提交
3433

H
hjxilinx 已提交
3434 3435 3436 3437
  pRes->row++;  // index increase one-step
  return pRes->tsrow;
}

H
Haojun Liao 已提交
3438
static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
H
hjxilinx 已提交
3439 3440 3441 3442 3443 3444 3445
  bool     hasData = true;
  SSqlCmd *pCmd = &pSql->cmd;
  
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
    bool allSubqueryExhausted = true;
    
H
Haojun Liao 已提交
3446
    for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460 3461 3462 3463 3464 3465 3466 3467 3468 3469 3470 3471
      if (pSql->pSubs[i] == NULL) {
        continue;
      }
      
      SSqlRes *pRes1 = &pSql->pSubs[i]->res;
      SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd;
      
      SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(pCmd1, pCmd1->clauseIndex);
      assert(pQueryInfo1->numOfTables == 1);
      
      STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo1, 0);
      
      /*
       * if the global limitation is not reached, and current result has not exhausted, or next more vnodes are
       * available, goes on
       */
      if (pTableMetaInfo->vgroupIndex < pTableMetaInfo->vgroupList->numOfVgroups && pRes1->row < pRes1->numOfRows &&
          (!tscHasReachLimitation(pQueryInfo1, pRes1))) {
        allSubqueryExhausted = false;
        break;
      }
    }
    
    hasData = !allSubqueryExhausted;
  } else {  // otherwise, in case inner join, if any subquery exhausted, query completed.
H
Haojun Liao 已提交
3472
    for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
3473 3474 3475 3476 3477 3478 3479 3480
      if (pSql->pSubs[i] == 0) {
        continue;
      }
      
      SSqlRes *   pRes1 = &pSql->pSubs[i]->res;
      SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0);
      
      if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pQueryInfo1, pRes1) &&
H
Haojun Liao 已提交
3481
          tscIsProjectionQuery(pQueryInfo1)) || (pRes1->numOfRows == 0)) {
H
hjxilinx 已提交
3482 3483 3484 3485 3486 3487 3488 3489
        hasData = false;
        break;
      }
    }
  }
  
  return hasData;
}