tscSubquery.c 104.9 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
H
Hui Li 已提交
15
#define _GNU_SOURCE
16

H
Haojun Liao 已提交
17
#include "os.h"
H
hjxilinx 已提交
18

H
Haojun Liao 已提交
19
#include "texpr.h"
H
Haojun Liao 已提交
20
#include "qTsbuf.h"
H
Haojun Liao 已提交
21
#include "tcompare.h"
S
slguan 已提交
22
#include "tscLog.h"
H
Haojun Liao 已提交
23 24
#include "tscSubquery.h"
#include "tschemautil.h"
25
#include "tsclient.h"
26
#include "qUtil.h"
H
hjxilinx 已提交
27 28 29

typedef struct SInsertSupporter {
  SSqlObj*  pSql;
30
  int32_t   index;
H
hjxilinx 已提交
31 32
} SInsertSupporter;

H
hjxilinx 已提交
33
static void freeJoinSubqueryObj(SSqlObj* pSql);
H
Haojun Liao 已提交
34
static bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql);
H
hjxilinx 已提交
35

H
Haojun Liao 已提交
36 37 38 39 40
static int32_t tsCompare(int32_t order, int64_t left, int64_t right) {
  if (left == right) {
    return 0;
  }

41
  if (order == TSDB_ORDER_ASC) {
H
Haojun Liao 已提交
42
    return left < right? -1:1;
H
hjxilinx 已提交
43
  } else {
H
Haojun Liao 已提交
44
    return left > right? -1:1;
H
hjxilinx 已提交
45 46 47
  }
}

48 49 50 51 52 53 54 55 56 57 58
static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) {
  while (tsBufNextPos(pTSBuf)) {
    STSElem el1 = tsBufGetElem(pTSBuf);

    int32_t res = tVariantCompare(el1.tag, tag1);
    if (res != 0) { // it is a record with new tag
      return;
    }
  }
}

59 60 61 62
static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, int8_t state) {
  assert(idx < subState->numOfSub);
  assert(subState->states);

D
fix bug  
dapan1121 已提交
63 64
  pthread_mutex_lock(&subState->mutex);
  
65 66 67 68
  tscDebug("subquery:%p,%d state set to %d", pSql, idx, state);
  
  subState->states[idx] = state;

D
fix bug  
dapan1121 已提交
69
  pthread_mutex_unlock(&subState->mutex);
70 71
}

D
fix bug  
dapan1121 已提交
72
static bool allSubqueryDone(SSqlObj *pParentSql) {
73
  bool done = true;
D
fix bug  
dapan1121 已提交
74
  SSubqueryState *subState = &pParentSql->subState;
75 76 77 78 79

  //lock in caller
  
  for (int i = 0; i < subState->numOfSub; i++) {
    if (0 == subState->states[i]) {
D
dapan1121 已提交
80
      tscDebug("%p subquery:%p,%d is NOT finished, total:%d", pParentSql, pParentSql->pSubs[i], i, subState->numOfSub);
81 82 83
      done = false;
      break;
    } else {
D
dapan1121 已提交
84
      tscDebug("%p subquery:%p,%d is finished, total:%d", pParentSql, pParentSql->pSubs[i], i, subState->numOfSub);
85 86 87 88 89 90
    }
  }

  return done;
}

D
fix bug  
dapan1121 已提交
91 92 93
static bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
  SSubqueryState *subState = &pParentSql->subState;

94 95
  assert(idx < subState->numOfSub);

D
fix bug  
dapan1121 已提交
96
  pthread_mutex_lock(&subState->mutex);
97

D
dapan1121 已提交
98
  tscDebug("%p subquery:%p,%d state set to 1", pParentSql, pSql, idx);
99 100 101
  
  subState->states[idx] = 1;

D
fix bug  
dapan1121 已提交
102
  bool done = allSubqueryDone(pParentSql);
103

D
fix bug  
dapan1121 已提交
104
  pthread_mutex_unlock(&subState->mutex);
105 106 107 108 109 110

  return done;
}



H
Haojun Liao 已提交
111 112
static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJoinSupporter* pSupporter2, STimeWindow * win) {
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
H
hjxilinx 已提交
113

H
Haojun Liao 已提交
114 115 116 117 118
  STSBuf* output1 = tsBufCreate(true, pQueryInfo->order.order);
  STSBuf* output2 = tsBufCreate(true, pQueryInfo->order.order);

  win->skey = INT64_MAX;
  win->ekey = INT64_MIN;
H
hjxilinx 已提交
119 120 121

  SLimitVal* pLimit = &pQueryInfo->limit;
  int32_t    order = pQueryInfo->order.order;
H
Haojun Liao 已提交
122

H
hjxilinx 已提交
123 124
  SQueryInfo* pSubQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[0]->cmd, 0);
  SQueryInfo* pSubQueryInfo2 = tscGetQueryInfoDetail(&pSql->pSubs[1]->cmd, 0);
H
Haojun Liao 已提交
125

H
hjxilinx 已提交
126 127 128
  pSubQueryInfo1->tsBuf = output1;
  pSubQueryInfo2->tsBuf = output2;

H
Haojun Liao 已提交
129 130
  TSKEY st = taosGetTimestampUs();

131 132 133 134 135 136
  // no result generated, return directly
  if (pSupporter1->pTSBuf == NULL || pSupporter2->pTSBuf == NULL) {
    tscDebug("%p at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql);
    return 0;
  }

H
hjxilinx 已提交
137 138 139 140 141 142 143
  tsBufResetPos(pSupporter1->pTSBuf);
  tsBufResetPos(pSupporter2->pTSBuf);

  if (!tsBufNextPos(pSupporter1->pTSBuf)) {
    tsBufFlush(output1);
    tsBufFlush(output2);

144
    tscDebug("%p input1 is empty, 0 for secondary query after ts blocks intersecting", pSql);
H
hjxilinx 已提交
145 146 147 148 149 150 151
    return 0;
  }

  if (!tsBufNextPos(pSupporter2->pTSBuf)) {
    tsBufFlush(output1);
    tsBufFlush(output2);

152
    tscDebug("%p input2 is empty, 0 for secondary query after ts blocks intersecting", pSql);
H
hjxilinx 已提交
153 154 155 156 157 158
    return 0;
  }

  int64_t numOfInput1 = 1;
  int64_t numOfInput2 = 1;

H
Haojun Liao 已提交
159 160
  while(1) {
    STSElem elem = tsBufGetElem(pSupporter1->pTSBuf);
H
hjxilinx 已提交
161

H
Haojun Liao 已提交
162
    // no data in pSupporter1 anymore, jump out of loop
163
    if (!tsBufIsValidElem(&elem)) {
H
Haojun Liao 已提交
164 165
      break;
    }
H
hjxilinx 已提交
166

167 168
    // find the data in supporter2 with the same tag value
    STSElem e2 = tsBufFindElemStartPosByTag(pSupporter2->pTSBuf, elem.tag);
H
hjxilinx 已提交
169

H
Haojun Liao 已提交
170 171 172
    /**
     * there are elements in pSupporter2 with the same tag, continue
     */
173 174 175 176
    tVariant tag1 = {0};
    tVariantAssign(&tag1, elem.tag);

    if (tsBufIsValidElem(&e2)) {
H
Haojun Liao 已提交
177 178 179 180
      while (1) {
        STSElem elem1 = tsBufGetElem(pSupporter1->pTSBuf);
        STSElem elem2 = tsBufGetElem(pSupporter2->pTSBuf);

181 182 183 184 185 186 187 188 189 190
        // data with current are exhausted
        if (!tsBufIsValidElem(&elem1) || tVariantCompare(elem1.tag, &tag1) != 0) {
          break;
        }

        if (!tsBufIsValidElem(&elem2) || tVariantCompare(elem2.tag, &tag1) != 0) { // ignore all records with the same tag
          skipRemainValue(pSupporter1->pTSBuf, &tag1);
          break;
        }

H
Haojun Liao 已提交
191 192
        /*
         * in case of stable query, limit/offset is not applied here. the limit/offset is applied to the
193
         * final results which is acquired after the secondary merge of in the client.
H
Haojun Liao 已提交
194 195 196
         */
        int32_t re = tsCompare(order, elem1.ts, elem2.ts);
        if (re < 0) {
197
          tsBufNextPos(pSupporter1->pTSBuf);
H
Haojun Liao 已提交
198 199
          numOfInput1++;
        } else if (re > 0) {
200
          tsBufNextPos(pSupporter2->pTSBuf);
H
Haojun Liao 已提交
201 202 203 204 205 206 207 208 209 210 211
          numOfInput2++;
        } else {
          if (pLimit->offset == 0 || pQueryInfo->interval.interval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
            if (win->skey > elem1.ts) {
              win->skey = elem1.ts;
            }

            if (win->ekey < elem1.ts) {
              win->ekey = elem1.ts;
            }

H
Haojun Liao 已提交
212 213
            tsBufAppend(output1, elem1.id, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
            tsBufAppend(output2, elem2.id, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
H
Haojun Liao 已提交
214
          } else {
215
            pLimit->offset -= 1;//offset apply to projection?
H
Haojun Liao 已提交
216 217
          }

218
          tsBufNextPos(pSupporter1->pTSBuf);
H
Haojun Liao 已提交
219 220
          numOfInput1++;

221
          tsBufNextPos(pSupporter2->pTSBuf);
H
Haojun Liao 已提交
222 223
          numOfInput2++;
        }
H
hjxilinx 已提交
224
      }
H
Haojun Liao 已提交
225
    } else {  // no data in pSupporter2, ignore current data in pSupporter2
226
      skipRemainValue(pSupporter1->pTSBuf, &tag1);
H
hjxilinx 已提交
227 228 229 230 231 232 233 234 235
    }
  }

  /*
   * failed to set the correct ts order yet in two cases:
   * 1. only one element
   * 2. only one element for each tag.
   */
  if (output1->tsOrder == -1) {
236 237
    output1->tsOrder = TSDB_ORDER_ASC;
    output2->tsOrder = TSDB_ORDER_ASC;
H
hjxilinx 已提交
238 239 240 241 242
  }

  tsBufFlush(output1);
  tsBufFlush(output2);

H
Haojun Liao 已提交
243
  tsBufDestroy(pSupporter1->pTSBuf);
D
fix bug  
dapan1121 已提交
244
  pSupporter1->pTSBuf = NULL;
H
Haojun Liao 已提交
245
  tsBufDestroy(pSupporter2->pTSBuf);
D
fix bug  
dapan1121 已提交
246 247
  pSupporter2->pTSBuf = NULL;
    
H
Haojun Liao 已提交
248
  TSKEY et = taosGetTimestampUs();
H
Haojun Liao 已提交
249
  tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks "
H
Haojun Liao 已提交
250
           "intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us",
H
Haojun Liao 已提交
251 252
           pSql, numOfInput1, numOfInput2, output1->numOfTotal, output1->numOfGroups, win->skey, win->ekey,
           tsBufGetNumOfGroup(output1), et - st);
H
hjxilinx 已提交
253 254 255 256 257

  return output1->numOfTotal;
}

// todo handle failed to create sub query
H
Haojun Liao 已提交
258
SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, int32_t index) {
259
  SJoinSupporter* pSupporter = calloc(1, sizeof(SJoinSupporter));
H
hjxilinx 已提交
260 261 262 263 264 265 266 267 268
  if (pSupporter == NULL) {
    return NULL;
  }

  pSupporter->pObj = pSql;

  pSupporter->subqueryIndex = index;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
  
269
  memcpy(&pSupporter->interval, &pQueryInfo->interval, sizeof(pSupporter->interval));
H
hjxilinx 已提交
270 271 272
  pSupporter->limit = pQueryInfo->limit;

  STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, index);
273
  pSupporter->uid = pTableMetaInfo->pTableMeta->id.uid;
H
hjxilinx 已提交
274 275
  assert (pSupporter->uid != 0);

S
slguan 已提交
276
  taosGetTmpfilePath("join-", pSupporter->path);
H
hjxilinx 已提交
277

D
fix bug  
dapan1121 已提交
278 279
  // do NOT create file here to reduce crash generated file left issue
  pSupporter->f = NULL;
H
hjxilinx 已提交
280 281 282 283

  return pSupporter;
}

H
Haojun Liao 已提交
284
static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
H
hjxilinx 已提交
285 286 287 288
  if (pSupporter == NULL) {
    return;
  }

H
hjxilinx 已提交
289 290 291 292 293 294 295
  if (pSupporter->exprList != NULL) {
    tscSqlExprInfoDestroy(pSupporter->exprList);
  }
  
  if (pSupporter->colList != NULL) {
    tscColumnListDestroy(pSupporter->colList);
  }
H
hjxilinx 已提交
296

H
hjxilinx 已提交
297
  tscFieldInfoClear(&pSupporter->fieldsInfo);
H
hjxilinx 已提交
298

D
fix bug  
dapan1121 已提交
299 300 301 302 303 304 305
  if (pSupporter->pTSBuf != NULL) {
    tsBufDestroy(pSupporter->pTSBuf);
    pSupporter->pTSBuf = NULL;
  }

  unlink(pSupporter->path);
  
H
hjxilinx 已提交
306 307
  if (pSupporter->f != NULL) {
    fclose(pSupporter->f);
H
hjxilinx 已提交
308
    pSupporter->f = NULL;
H
hjxilinx 已提交
309 310
  }

D
fix bug  
dapan1121 已提交
311

312 313 314 315 316
  if (pSupporter->pVgroupTables != NULL) {
    taosArrayDestroy(pSupporter->pVgroupTables);
    pSupporter->pVgroupTables = NULL;
  }

S
TD-1848  
Shengliang Guan 已提交
317
  tfree(pSupporter->pIdTagList);
H
hjxilinx 已提交
318 319 320 321 322 323 324 325 326 327
  tscTagCondRelease(&pSupporter->tagCond);
  free(pSupporter);
}

/*
 * need the secondary query process
 * In case of count(ts)/count(*)/spread(ts) query, that are only applied to
 * primary timestamp column , the secondary query is not necessary
 *
 */
H
Haojun Liao 已提交
328
static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
329 330 331
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  
  for (int32_t i = 0; i < numOfCols; ++i) {
332 333
    SColumn* base = taosArrayGet(pQueryInfo->colList, i);
    if (base->colIndex.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
H
hjxilinx 已提交
334 335 336 337 338 339 340
      return true;
    }
  }

  return false;
}

341 342 343
static void filterVgroupTables(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
  int32_t  num = 0;
  int32_t* list = NULL;
H
Haojun Liao 已提交
344
  tsBufGetGroupIdList(pQueryInfo->tsBuf, &num, &list);
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369

  // The virtual node, of which all tables are disqualified after the timestamp intersection,
  // is removed to avoid next stage query.
  // TODO: If tables from some vnodes are not qualified for next stage query, discard them.
  for (int32_t k = 0; k < taosArrayGetSize(pVgroupTables);) {
    SVgroupTableInfo* p = taosArrayGet(pVgroupTables, k);

    bool found = false;
    for (int32_t f = 0; f < num; ++f) {
      if (p->vgInfo.vgId == list[f]) {
        found = true;
        break;
      }
    }

    if (!found) {
      tscRemoveVgroupTableGroup(pVgroupTables, k);
    } else {
      k++;
    }
  }

  assert(taosArrayGetSize(pVgroupTables) > 0);
  TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);

S
TD-1848  
Shengliang Guan 已提交
370
  tfree(list);
371 372 373 374 375
}

static SArray* buildVgroupTableByResult(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
  int32_t  num = 0;
  int32_t* list = NULL;
H
Haojun Liao 已提交
376
  tsBufGetGroupIdList(pQueryInfo->tsBuf, &num, &list);
377

S
Shengliang Guan 已提交
378
  size_t numOfGroups = taosArrayGetSize(pVgroupTables);
379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396

  SArray* pNew = taosArrayInit(num, sizeof(SVgroupTableInfo));

  SVgroupTableInfo info;
  for (int32_t i = 0; i < num; ++i) {
    int32_t vnodeId = list[i];

    for (int32_t j = 0; j < numOfGroups; ++j) {
      SVgroupTableInfo* p1 = taosArrayGet(pVgroupTables, j);
      if (p1->vgInfo.vgId == vnodeId) {
        tscVgroupTableCopy(&info, p1);
        break;
      }
    }

    taosArrayPush(pNew, &info);
  }

S
TD-1848  
Shengliang Guan 已提交
397
  tfree(list);
398 399 400 401 402
  TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);

  return pNew;
}

H
hjxilinx 已提交
403 404 405
/*
 * launch secondary stage query to fetch the result that contains timestamp in set
 */
H
Haojun Liao 已提交
406
static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
H
hjxilinx 已提交
407
  int32_t         numOfSub = 0;
408
  SJoinSupporter* pSupporter = NULL;
H
hjxilinx 已提交
409
  
H
Haojun Liao 已提交
410
  //If the columns are not involved in the final select clause, the corresponding query will not be issued.
H
Haojun Liao 已提交
411
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
412
    pSupporter = pSql->pSubs[i]->param;
H
hjxilinx 已提交
413
    if (taosArrayGetSize(pSupporter->exprList) > 0) {
H
hjxilinx 已提交
414 415 416 417 418 419 420
      ++numOfSub;
    }
  }
  
  assert(numOfSub > 0);
  
  // scan all subquery, if one sub query has only ts, ignore it
H
Haojun Liao 已提交
421
  tscDebug("%p start to launch secondary subqueries, %d out of %d needs to query", pSql, numOfSub, pSql->subState.numOfSub);
H
hjxilinx 已提交
422 423 424

  bool success = true;
  
H
Haojun Liao 已提交
425
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
426 427 428 429 430
    SSqlObj *pPrevSub = pSql->pSubs[i];
    pSql->pSubs[i] = NULL;
    
    pSupporter = pPrevSub->param;
  
H
hjxilinx 已提交
431
    if (taosArrayGetSize(pSupporter->exprList) == 0) {
432
      tscDebug("%p subIndex: %d, no need to launch query, ignore it", pSql, i);
H
hjxilinx 已提交
433 434
    
      tscDestroyJoinSupporter(pSupporter);
435
      taos_free_result(pPrevSub);
H
hjxilinx 已提交
436 437 438 439 440 441
    
      pSql->pSubs[i] = NULL;
      continue;
    }
  
    SQueryInfo *pSubQueryInfo = tscGetQueryInfoDetail(&pPrevSub->cmd, 0);
H
Haojun Liao 已提交
442
    STSBuf     *pTsBuf = pSubQueryInfo->tsBuf;
H
hjxilinx 已提交
443 444 445
    pSubQueryInfo->tsBuf = NULL;
  
    // free result for async object will also free sqlObj
H
hjxilinx 已提交
446
    assert(tscSqlExprNumOfExprs(pSubQueryInfo) == 1); // ts_comp query only requires one resutl columns
H
hjxilinx 已提交
447 448
    taos_free_result(pPrevSub);
  
449
    SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL);
H
hjxilinx 已提交
450 451 452 453 454
    if (pNew == NULL) {
      tscDestroyJoinSupporter(pSupporter);
      success = false;
      break;
    }
455
    
B
Bomin Zhang 已提交
456

H
hjxilinx 已提交
457 458 459 460
    tscClearSubqueryInfo(&pNew->cmd);
    pSql->pSubs[i] = pNew;
  
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
H
Haojun Liao 已提交
461
    pQueryInfo->tsBuf = pTsBuf;  // transfer the ownership of timestamp comp-z data to the new created object
B
Bomin Zhang 已提交
462

H
hjxilinx 已提交
463
    // set the second stage sub query for join process
H
hjxilinx 已提交
464
    TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE);
465
    memcpy(&pQueryInfo->interval, &pSupporter->interval, sizeof(pQueryInfo->interval));
B
Bomin Zhang 已提交
466

H
hjxilinx 已提交
467
    tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond);
B
Bomin Zhang 已提交
468

H
hjxilinx 已提交
469 470 471
    pQueryInfo->colList = pSupporter->colList;
    pQueryInfo->exprList = pSupporter->exprList;
    pQueryInfo->fieldsInfo = pSupporter->fieldsInfo;
H
Haojun Liao 已提交
472
    pQueryInfo->groupbyExpr = pSupporter->groupInfo;
H
Haojun Liao 已提交
473

474
    assert(pNew->subState.numOfSub == 0 && pNew->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1);
H
hjxilinx 已提交
475
  
476
    tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
477
  
478
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
479
    pTableMetaInfo->pVgroupTables = pSupporter->pVgroupTables;
H
Haojun Liao 已提交
480 481 482

    pSupporter->exprList = NULL;
    pSupporter->colList = NULL;
483
    pSupporter->pVgroupTables = NULL;
H
Haojun Liao 已提交
484 485
    memset(&pSupporter->fieldsInfo, 0, sizeof(SFieldInfo));
    memset(&pSupporter->groupInfo, 0, sizeof(SSqlGroupbyExpr));
H
Haojun Liao 已提交
486

H
hjxilinx 已提交
487 488 489 490 491
    /*
     * When handling the projection query, the offset value will be modified for table-table join, which is changed
     * during the timestamp intersection.
     */
    pSupporter->limit = pQueryInfo->limit;
492
    pQueryInfo->limit = pSupporter->limit;
H
Haojun Liao 已提交
493 494 495 496 497

    SColumnIndex index = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
    SSchema* s = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, 0);

    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, 0);
H
Haojun Liao 已提交
498 499
    int16_t funcId = pExpr->functionId;

H
Haojun Liao 已提交
500
    if ((pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) ||
H
Haojun Liao 已提交
501 502 503 504
        (funcId != TSDB_FUNC_TS && funcId != TSDB_FUNC_TS_DUMMY && funcId != TSDB_FUNC_PRJ)) {

      int16_t functionId = tscIsProjectionQuery(pQueryInfo)? TSDB_FUNC_PRJ : TSDB_FUNC_TS;

505
      tscAddFuncInSelectClause(pQueryInfo, 0, functionId, &index, s, TSDB_COL_NORMAL);
H
Haojun Liao 已提交
506
      tscPrintSelectClause(pNew, 0);
507
      tscFieldInfoUpdateOffset(pQueryInfo);
H
Haojun Liao 已提交
508 509 510 511

      pExpr = tscSqlExprGet(pQueryInfo, 0);
    }

512
    // set the join condition tag column info, todo extract method
H
Haojun Liao 已提交
513 514
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
      assert(pQueryInfo->tagCond.joinInfo.hasJoin);
515
      int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
H
Haojun Liao 已提交
516

517
      // set the tag column id for executor to extract correct tag value
518
      pExpr->param[0] = (tVariant) {.i64 = colId, .nType = TSDB_DATA_TYPE_BIGINT, .nLen = sizeof(int64_t)};
H
Haojun Liao 已提交
519
      pExpr->numOfParams = 1;
H
Haojun Liao 已提交
520 521
    }

522 523 524 525 526 527 528 529
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
      assert(pTableMetaInfo->pVgroupTables != NULL);
      if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
        SArray* p = buildVgroupTableByResult(pQueryInfo, pTableMetaInfo->pVgroupTables);
        tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables);
        pTableMetaInfo->pVgroupTables = p;
      } else {
        filterVgroupTables(pQueryInfo, pTableMetaInfo->pVgroupTables);
H
Haojun Liao 已提交
530 531 532
      }
    }

D
fix bug  
dapan1121 已提交
533 534
    subquerySetState(pPrevSub, &pSql->subState, i, 0);
    
535
    size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
S
TD-1057  
Shengliang Guan 已提交
536
    tscDebug("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s",
537
             pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pQueryInfo->type, taosArrayGetSize(pQueryInfo->exprList),
538
             numOfCols, pQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name));
H
hjxilinx 已提交
539 540 541 542
  }
  
  //prepare the subqueries object failed, abort
  if (!success) {
543
    pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
544
    tscError("%p failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d", pSql,
H
Haojun Liao 已提交
545
        pSql->subState.numOfSub, pSql->res.code);
H
hjxilinx 已提交
546
    freeJoinSubqueryObj(pSql);
H
hjxilinx 已提交
547 548 549 550
    
    return pSql->res.code;
  }
  
H
Haojun Liao 已提交
551
  for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
Haojun Liao 已提交
552
    if (pSql->pSubs[i] == NULL) {
H
hjxilinx 已提交
553 554
      continue;
    }
H
Haojun Liao 已提交
555

H
Haojun Liao 已提交
556
    tscDoQuery(pSql->pSubs[i]);
H
hjxilinx 已提交
557 558 559 560 561
  }

  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
562
void freeJoinSubqueryObj(SSqlObj* pSql) {
H
Haojun Liao 已提交
563
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
564 565 566 567 568
    SSqlObj* pSub = pSql->pSubs[i];
    if (pSub == NULL) {
      continue;
    }
    
569
    SJoinSupporter* p = pSub->param;
H
hjxilinx 已提交
570
    tscDestroyJoinSupporter(p);
H
hjxilinx 已提交
571

572 573
    taos_free_result(pSub);
    pSql->pSubs[i] = NULL;
H
hjxilinx 已提交
574 575
  }

D
fix bug  
dapan1121 已提交
576 577 578 579
  if (pSql->subState.states) {
    pthread_mutex_destroy(&pSql->subState.mutex);
  }
  
580 581
  tfree(pSql->subState.states);
  
D
fix bug  
dapan1121 已提交
582
  
H
Haojun Liao 已提交
583
  pSql->subState.numOfSub = 0;
H
hjxilinx 已提交
584 585
}

D
fix bug  
dapan1121 已提交
586
static int32_t quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
D
fix bug  
dapan1121 已提交
587
  if (subAndCheckDone(pSqlSub, pSqlObj, pSupporter->subqueryIndex)) {
588
    tscError("%p all subquery return and query failed, global code:%s", pSqlObj, tstrerror(pSqlObj->res.code));  
H
hjxilinx 已提交
589
    freeJoinSubqueryObj(pSqlObj);
D
fix bug  
dapan1121 已提交
590
    return 0;
H
hjxilinx 已提交
591
  }
D
fix bug  
dapan1121 已提交
592

D
fix bug  
dapan1121 已提交
593
  return 1;
D
TD-2516  
dapan1121 已提交
594
  //tscDestroyJoinSupporter(pSupporter);
H
hjxilinx 已提交
595 596 597
}

// update the query time range according to the join results on timestamp
H
Haojun Liao 已提交
598 599 600
static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) {
  assert(pQueryInfo->window.skey <= win->skey && pQueryInfo->window.ekey >= win->ekey);
  pQueryInfo->window = *win;
H
Haojun Liao 已提交
601 602


H
hjxilinx 已提交
603 604
}

H
Haojun Liao 已提交
605 606 607
int32_t tidTagsCompar(const void* p1, const void* p2) {
  const STidTags* t1 = (const STidTags*) (p1);
  const STidTags* t2 = (const STidTags*) (p2);
608 609
  
  if (t1->vgId != t2->vgId) {
weixin_48148422's avatar
weixin_48148422 已提交
610 611
    return (t1->vgId > t2->vgId) ? 1 : -1;
  }
612

H
Haojun Liao 已提交
613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631
  tstr* tag1 = (tstr*) t1->tag;
  tstr* tag2 = (tstr*) t2->tag;

  if (tag1->len != tag2->len) {
    return (tag1->len > tag2->len)? 1: -1;
  }

  return strncmp(tag1->data, tag2->data, tag1->len);
}

int32_t tagValCompar(const void* p1, const void* p2) {
  const STidTags* t1 = (const STidTags*) varDataVal(p1);
  const STidTags* t2 = (const STidTags*) varDataVal(p2);

  tstr* tag1 = (tstr*) t1->tag;
  tstr* tag2 = (tstr*) t2->tag;

  if (tag1->len != tag2->len) {
    return (tag1->len > tag2->len)? 1: -1;
632
  }
H
Haojun Liao 已提交
633

H
Haojun Liao 已提交
634
  return memcmp(tag1->data, tag2->data, tag1->len);
635 636
}

H
Haojun Liao 已提交
637
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables) {
H
Haojun Liao 已提交
638 639
  SArray*   result = taosArrayInit(4, sizeof(SVgroupTableInfo));
  SArray*   vgTables = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
640 641
  STidTags* prev = NULL;

H
Haojun Liao 已提交
642 643 644
  size_t numOfTables = taosArrayGetSize(tables);
  for (size_t i = 0; i < numOfTables; i++) {
    STidTags* tt = taosArrayGet(tables, i);
weixin_48148422's avatar
weixin_48148422 已提交
645

H
Haojun Liao 已提交
646
    if (prev == NULL || tt->vgId != prev->vgId) {
647
      SVgroupsInfo* pvg = pTableMetaInfo->vgroupList;
weixin_48148422's avatar
weixin_48148422 已提交
648

H
Haojun Liao 已提交
649 650 651
      SVgroupTableInfo info = {{0}};
      for (int32_t m = 0; m < pvg->numOfVgroups; ++m) {
        if (tt->vgId == pvg->vgroups[m].vgId) {
S
TD-1732  
Shengliang Guan 已提交
652
          tscSVgroupInfoCopy(&info.vgInfo, &pvg->vgroups[m]);
653 654 655
          break;
        }
      }
656
      assert(info.vgInfo.numOfEps != 0);
weixin_48148422's avatar
weixin_48148422 已提交
657

H
Haojun Liao 已提交
658
      vgTables = taosArrayInit(4, sizeof(STableIdInfo));
weixin_48148422's avatar
weixin_48148422 已提交
659
      info.itemList = vgTables;
H
Haojun Liao 已提交
660 661 662

      if (taosArrayGetSize(result) > 0) {
        SVgroupTableInfo* prevGroup = taosArrayGet(result, taosArrayGetSize(result) - 1);
P
plum-lihui 已提交
663
        tscDebug("%p vgId:%d, tables:%"PRIzu, pSql, prevGroup->vgInfo.vgId, taosArrayGetSize(prevGroup->itemList));
H
Haojun Liao 已提交
664 665
      }

H
Haojun Liao 已提交
666
      taosArrayPush(result, &info);
667
    }
weixin_48148422's avatar
weixin_48148422 已提交
668

H
Haojun Liao 已提交
669 670
    STableIdInfo item = {.uid = tt->uid, .tid = tt->tid, .key = INT64_MIN};
    taosArrayPush(vgTables, &item);
H
Haojun Liao 已提交
671

H
Haojun Liao 已提交
672
    tscTrace("%p tid:%d, uid:%"PRIu64",vgId:%d added", pSql, tt->tid, tt->uid, tt->vgId);
weixin_48148422's avatar
weixin_48148422 已提交
673
    prev = tt;
674
  }
weixin_48148422's avatar
weixin_48148422 已提交
675 676

  pTableMetaInfo->pVgroupTables = result;
H
Haojun Liao 已提交
677
  pTableMetaInfo->vgroupIndex = 0;
H
Haojun Liao 已提交
678 679 680

  if (taosArrayGetSize(result) > 0) {
    SVgroupTableInfo* g = taosArrayGet(result, taosArrayGetSize(result) - 1);
P
plum-lihui 已提交
681
    tscDebug("%p vgId:%d, tables:%"PRIzu, pSql, g->vgInfo.vgId, taosArrayGetSize(g->itemList));
H
Haojun Liao 已提交
682
  }
683 684
}

685
static void issueTsCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* pParent) {
686 687 688
  SSqlCmd* pCmd = &pSql->cmd;
  tscClearSubqueryInfo(pCmd);
  tscFreeSqlResult(pSql);
H
Haojun Liao 已提交
689

690
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
Haojun Liao 已提交
691 692
  assert(pQueryInfo->numOfTables == 1);

693 694 695 696 697 698 699 700 701 702 703 704
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  tscInitQueryInfo(pQueryInfo);

  TSDB_QUERY_CLEAR_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
  TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
  
  pCmd->command = TSDB_SQL_SELECT;
  pSql->fp = tscJoinQueryCallback;
  
  SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
  
  SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
705
  tscAddFuncInSelectClause(pQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL);
706 707
  
  // set the tags value for ts_comp function
H
Haojun Liao 已提交
708 709
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0);
710
    int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
711
    pExpr->param->i64 = tagColId;
H
Haojun Liao 已提交
712 713 714
    pExpr->numOfParams = 1;
  }

715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730
  // add the filter tag column
  if (pSupporter->colList != NULL) {
    size_t s = taosArrayGetSize(pSupporter->colList);
    
    for (int32_t i = 0; i < s; ++i) {
      SColumn *pCol = taosArrayGetP(pSupporter->colList, i);
      
      if (pCol->numOfFilters > 0) {  // copy to the pNew->cmd.colList if it is filtered.
        SColumn *p = tscColumnClone(pCol);
        taosArrayPush(pQueryInfo->colList, &p);
      }
    }
  }
  
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  
731
  tscDebug(
H
Haojun Liao 已提交
732
      "%p subquery:%p tableIndex:%d, vgroupIndex:%d, numOfVgroups:%d, type:%d, ts_comp query to retrieve timestamps, "
S
TD-1057  
Shengliang Guan 已提交
733
      "numOfExpr:%" PRIzu ", colList:%" PRIzu ", numOfOutputFields:%d, name:%s",
H
Haojun Liao 已提交
734
      pParent, pSql, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pQueryInfo->type,
735
      tscSqlExprNumOfExprs(pQueryInfo), numOfCols, pQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name));
736 737 738 739
  
  tscProcessSql(pSql);
}

H
Haojun Liao 已提交
740
static bool checkForDuplicateTagVal(SSchema* pColSchema, SJoinSupporter* p1, SSqlObj* pPSqlObj) {
H
Haojun Liao 已提交
741 742 743
  for(int32_t i = 1; i < p1->num; ++i) {
    STidTags* prev = (STidTags*) varDataVal(p1->pIdTagList + (i - 1) * p1->tagSize);
    STidTags* p = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize);
744
    assert(prev->vgId >= 1 && p->vgId >= 1);
H
Haojun Liao 已提交
745 746

    if (doCompare(prev->tag, p->tag, pColSchema->type, pColSchema->bytes) == 0) {
H
Haojun Liao 已提交
747 748
      tscError("%p join tags have same value for different table, free all sub SqlObj and quit", pPSqlObj);
      pPSqlObj->res.code = TSDB_CODE_QRY_DUP_JOIN_KEY;
H
Haojun Liao 已提交
749 750 751 752 753 754 755
      return false;
    }
  }

  return true;
}

756
static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray** s1, SArray** s2) {
H
Haojun Liao 已提交
757 758 759
  SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
  SJoinSupporter* p2 = pParentSql->pSubs[1]->param;

H
Haojun Liao 已提交
760 761
  tscDebug("%p all subquery retrieve <tid, tags> complete, do tags match, %d, %d", pParentSql, p1->num, p2->num);

H
Haojun Liao 已提交
762 763 764
  // sort according to the tag value
  qsort(p1->pIdTagList, p1->num, p1->tagSize, tagValCompar);
  qsort(p2->pIdTagList, p2->num, p2->tagSize, tagValCompar);
H
Haojun Liao 已提交
765 766

  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
767
  int16_t tagColId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
H
Haojun Liao 已提交
768

H
Haojun Liao 已提交
769
  SSchema* pColSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
H
Haojun Liao 已提交
770

H
Haojun Liao 已提交
771
  // int16_t for padding
H
Haojun Liao 已提交
772 773 774
  int32_t size = p1->tagSize - sizeof(int16_t);
  *s1 = taosArrayInit(p1->num, size);
  *s2 = taosArrayInit(p2->num, size);
H
Haojun Liao 已提交
775

H
Haojun Liao 已提交
776
  if (!(checkForDuplicateTagVal(pColSchema, p1, pParentSql) && checkForDuplicateTagVal(pColSchema, p2, pParentSql))) {
777
    return TSDB_CODE_QRY_DUP_JOIN_KEY;
H
Haojun Liao 已提交
778 779 780 781 782 783
  }

  int32_t i = 0, j = 0;
  while(i < p1->num && j < p2->num) {
    STidTags* pp1 = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize);
    STidTags* pp2 = (STidTags*) varDataVal(p2->pIdTagList + j * p2->tagSize);
784
    assert(pp1->tid != 0 && pp2->tid != 0);
H
Haojun Liao 已提交
785 786 787

    int32_t ret = doCompare(pp1->tag, pp2->tag, pColSchema->type, pColSchema->bytes);
    if (ret == 0) {
788
      tscDebug("%p tag matched, vgId:%d, val:%d, tid:%d, uid:%"PRIu64", tid:%d, uid:%"PRIu64, pParentSql, pp1->vgId,
H
Haojun Liao 已提交
789 790 791 792 793 794 795 796 797 798 799 800
               *(int*) pp1->tag, pp1->tid, pp1->uid, pp2->tid, pp2->uid);

      taosArrayPush(*s1, pp1);
      taosArrayPush(*s2, pp2);
      j++;
      i++;
    } else if (ret > 0) {
      j++;
    } else {
      i++;
    }
  }
801

H
Haojun Liao 已提交
802 803 804 805 806 807 808 809
  // reorganize the tid-tag value according to both the vgroup id and tag values
  // sort according to the tag value
  size_t t1 = taosArrayGetSize(*s1);
  size_t t2 = taosArrayGetSize(*s2);

  qsort((*s1)->pData, t1, size, tidTagsCompar);
  qsort((*s2)->pData, t2, size, tidTagsCompar);

H
Haojun Liao 已提交
810 811 812 813 814 815 816 817 818 819 820 821
#if 0
  for(int32_t k = 0; k < t1; ++k) {
    STidTags* p =  (*s1)->pData + size * k;
    printf("%d, tag:%s\n", p->vgId, ((tstr*)(p->tag))->data);
  }

  for(int32_t k = 0; k < t1; ++k) {
    STidTags* p =  (*s2)->pData + size * k;
    printf("%d, tag:%s\n", p->vgId, ((tstr*)(p->tag))->data);
  }
#endif

P
plum-lihui 已提交
822
  tscDebug("%p tags match complete, result: %"PRIzu", %"PRIzu, pParentSql, t1, t2);
823
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
824 825
}

H
Haojun Liao 已提交
826
static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
827
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
828

H
hjxilinx 已提交
829
  SSqlObj* pParentSql = pSupporter->pObj;
H
Haojun Liao 已提交
830

H
hjxilinx 已提交
831 832
  SSqlObj* pSql = (SSqlObj*)tres;
  SSqlCmd* pCmd = &pSql->cmd;
H
Haojun Liao 已提交
833 834 835
  SSqlRes* pRes = &pSql->res;

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
Haojun Liao 已提交
836
  assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY));
H
Haojun Liao 已提交
837

D
dapan1121 已提交
838 839
  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, pParentSql->res.code);
D
fix bug  
dapan1121 已提交
840 841 842
    if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
      return;
    }
D
dapan1121 已提交
843 844 845 846 847 848

    tscAsyncResultOnError(pParentSql);

    return;
  }

H
Haojun Liao 已提交
849 850 851
  // check for the error code firstly
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    // todo retry if other subqueries are not failed
H
Haojun Liao 已提交
852

H
Haojun Liao 已提交
853 854
    assert(numOfRows < 0 && numOfRows == taos_errno(pSql));
    tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
H
Haojun Liao 已提交
855

H
Haojun Liao 已提交
856
    pParentSql->res.code = numOfRows;
D
fix bug  
dapan1121 已提交
857 858 859
    if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
      return;
    }
H
Haojun Liao 已提交
860

H
Haojun Liao 已提交
861
    tscAsyncResultOnError(pParentSql);
H
Haojun Liao 已提交
862 863
    return;
  }
H
Haojun Liao 已提交
864

H
Haojun Liao 已提交
865 866
  // keep the results in memory
  if (numOfRows > 0) {
867
    size_t validLen = (size_t)(pSupporter->tagSize * pRes->numOfRows);
H
Haojun Liao 已提交
868
    size_t length = pSupporter->totalLen + validLen;
H
Haojun Liao 已提交
869

H
Haojun Liao 已提交
870 871 872 873
    // todo handle memory error
    char* tmp = realloc(pSupporter->pIdTagList, length);
    if (tmp == NULL) {
      tscError("%p failed to malloc memory", pSql);
H
Haojun Liao 已提交
874

H
Haojun Liao 已提交
875
      pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
D
fix bug  
dapan1121 已提交
876 877 878
      if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
        return;
      }
H
Haojun Liao 已提交
879

H
Haojun Liao 已提交
880
      tscAsyncResultOnError(pParentSql);
H
Haojun Liao 已提交
881 882
      return;
    }
H
Haojun Liao 已提交
883

H
Haojun Liao 已提交
884
    pSupporter->pIdTagList = tmp;
H
Haojun Liao 已提交
885

H
Haojun Liao 已提交
886
    memcpy(pSupporter->pIdTagList + pSupporter->totalLen, pRes->data, validLen);
S
Shengliang Guan 已提交
887 888
    pSupporter->totalLen += (int32_t)validLen;
    pSupporter->num += (int32_t)pRes->numOfRows;
H
Haojun Liao 已提交
889

H
Haojun Liao 已提交
890 891 892 893 894 895
    // query not completed, continue to retrieve tid + tag tuples
    if (!pRes->completed) {
      taos_fetch_rows_a(tres, tidTagRetrieveCallback, param);
      return;
    }
  }
H
Haojun Liao 已提交
896

H
Haojun Liao 已提交
897 898 899 900
  // data in current vnode has all returned to client, try next vnode if exits
  // <tid + tag> tuples have been retrieved to client, try <tid + tag> tuples from the next vnode
  if (hasMoreVnodesToTry(pSql)) {
    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
901

H
Haojun Liao 已提交
902 903 904
    int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
    pTableMetaInfo->vgroupIndex += 1;
    assert(pTableMetaInfo->vgroupIndex < totalVgroups);
H
Haojun Liao 已提交
905

906
    tscDebug("%p tid_tag from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%d",
H
Haojun Liao 已提交
907
             pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, pSupporter->num);
H
Haojun Liao 已提交
908

H
Haojun Liao 已提交
909 910
    pCmd->command = TSDB_SQL_SELECT;
    tscResetForNextRetrieve(&pSql->res);
H
Haojun Liao 已提交
911

H
Haojun Liao 已提交
912 913 914 915 916
    // set the callback function
    pSql->fp = tscJoinQueryCallback;
    tscProcessSql(pSql);
    return;
  }
H
Haojun Liao 已提交
917

H
Haojun Liao 已提交
918 919
  // no data exists in next vnode, mark the <tid, tags> query completed
  // only when there is no subquery exits any more, proceeds to get the intersect of the <tid, tags> tuple sets.
D
fix bug  
dapan1121 已提交
920
  if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
921
    tscDebug("%p tagRetrieve:%p,%d completed, total:%d", pParentSql, tres, pSupporter->subqueryIndex, pParentSql->subState.numOfSub);
H
Haojun Liao 已提交
922
    return;
923
  }  
H
Haojun Liao 已提交
924

H
Haojun Liao 已提交
925
  SArray *s1 = NULL, *s2 = NULL;
926 927 928 929
  int32_t code = getIntersectionOfTableTuple(pQueryInfo, pParentSql, &s1, &s2);
  if (code != TSDB_CODE_SUCCESS) {
    freeJoinSubqueryObj(pParentSql);
    pParentSql->res.code = code;
H
Haojun Liao 已提交
930
    tscAsyncResultOnError(pParentSql);
931 932 933

    taosArrayDestroy(s1);
    taosArrayDestroy(s2);
934 935 936
    return;
  }

H
Haojun Liao 已提交
937
  if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) {  // no results,return.
H
Haojun Liao 已提交
938 939
    assert(pParentSql->fp != tscJoinQueryCallback);

940
    tscDebug("%p tag intersect does not generated qualified tables for join, free all sub SqlObj and quit", pParentSql);
H
Haojun Liao 已提交
941
    freeJoinSubqueryObj(pParentSql);
H
Haojun Liao 已提交
942

943 944
    // set no result command
    pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
H
Haojun Liao 已提交
945 946
    assert(pParentSql->fp != tscJoinQueryCallback);

H
Haojun Liao 已提交
947
    (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
B
Bomin Zhang 已提交
948 949 950 951
  } else {
    // proceed to for ts_comp query
    SSqlCmd* pSubCmd1 = &pParentSql->pSubs[0]->cmd;
    SSqlCmd* pSubCmd2 = &pParentSql->pSubs[1]->cmd;
H
Haojun Liao 已提交
952

B
Bomin Zhang 已提交
953 954 955
    SQueryInfo*     pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0);
    STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo1, 0);
    tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo1, s1);
H
Haojun Liao 已提交
956

B
Bomin Zhang 已提交
957 958 959
    SQueryInfo*     pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0);
    STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0);
    tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2);
H
Haojun Liao 已提交
960

H
Haojun Liao 已提交
961
    SSqlObj* psub1 = pParentSql->pSubs[0];
H
Haojun Liao 已提交
962
    ((SJoinSupporter*)psub1->param)->pVgroupTables =  tscVgroupTableInfoDup(pTableMetaInfo1->pVgroupTables);
H
Haojun Liao 已提交
963 964

    SSqlObj* psub2 = pParentSql->pSubs[1];
H
Haojun Liao 已提交
965
    ((SJoinSupporter*)psub2->param)->pVgroupTables =  tscVgroupTableInfoDup(pTableMetaInfo2->pVgroupTables);
H
Haojun Liao 已提交
966

H
Haojun Liao 已提交
967
    pParentSql->subState.numOfSub = 2;
D
fix bug  
dapan1121 已提交
968
    
969
    memset(pParentSql->subState.states, 0, sizeof(pParentSql->subState.states[0]) * pParentSql->subState.numOfSub);
D
fix bug  
dapan1121 已提交
970 971
    tscDebug("%p reset all sub states to 0", pParentSql);
    
H
Haojun Liao 已提交
972
    for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) {
B
Bomin Zhang 已提交
973
      SSqlObj* sub = pParentSql->pSubs[m];
974
      issueTsCompQuery(sub, sub->param, pParentSql);
B
Bomin Zhang 已提交
975
    }
H
Haojun Liao 已提交
976
  }
B
Bomin Zhang 已提交
977 978 979

  taosArrayDestroy(s1);
  taosArrayDestroy(s2);
H
Haojun Liao 已提交
980
}
H
Haojun Liao 已提交
981

H
Haojun Liao 已提交
982 983
static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
984

H
Haojun Liao 已提交
985 986 987 988 989 990 991 992 993
  SSqlObj* pParentSql = pSupporter->pObj;

  SSqlObj* pSql = (SSqlObj*)tres;
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  assert(!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE));

D
dapan1121 已提交
994 995
  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, pParentSql->res.code);
D
fix bug  
dapan1121 已提交
996 997 998
    if (quitAllSubquery(pSql, pParentSql, pSupporter)){
      return;
    }
D
dapan1121 已提交
999 1000 1001 1002 1003 1004

    tscAsyncResultOnError(pParentSql);

    return;
  }

H
Haojun Liao 已提交
1005 1006 1007 1008 1009 1010 1011
  // check for the error code firstly
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    // todo retry if other subqueries are not failed yet 
    assert(numOfRows < 0 && numOfRows == taos_errno(pSql));
    tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);

    pParentSql->res.code = numOfRows;
D
fix bug  
dapan1121 已提交
1012 1013 1014
    if (quitAllSubquery(pSql, pParentSql, pSupporter)){
      return;
    }
H
Haojun Liao 已提交
1015

H
Haojun Liao 已提交
1016
    tscAsyncResultOnError(pParentSql);
1017 1018
    return;
  }
H
Haojun Liao 已提交
1019

H
Haojun Liao 已提交
1020
  if (numOfRows > 0) {  // write the compressed timestamp to disk file
D
fix bug  
dapan1121 已提交
1021
    if(pSupporter->f == NULL) {
S
TD-1207  
Shengliang Guan 已提交
1022
      pSupporter->f = fopen(pSupporter->path, "wb");
D
fix bug  
dapan1121 已提交
1023 1024 1025 1026 1027 1028

      if (pSupporter->f == NULL) {
        tscError("%p failed to create tmp file:%s, reason:%s", pSql, pSupporter->path, strerror(errno));
        
        pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);

D
fix bug  
dapan1121 已提交
1029 1030 1031
        if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
          return;
        }
D
fix bug  
dapan1121 已提交
1032 1033 1034 1035 1036 1037 1038
        
        tscAsyncResultOnError(pParentSql);

        return;
      }
    }
      
1039
    fwrite(pRes->data, (size_t)pRes->numOfRows, 1, pSupporter->f);
H
Haojun Liao 已提交
1040 1041 1042 1043 1044 1045 1046 1047
    fclose(pSupporter->f);
    pSupporter->f = NULL;

    STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true);
    if (pBuf == NULL) {  // in error process, close the fd
      tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows);

      pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
D
fix bug  
dapan1121 已提交
1048

D
fix bug  
dapan1121 已提交
1049 1050 1051
      if (quitAllSubquery(pSql, pParentSql, pSupporter)){
        return;
      }
D
fix bug  
dapan1121 已提交
1052
      
H
Haojun Liao 已提交
1053
      tscAsyncResultOnError(pParentSql);
H
Haojun Liao 已提交
1054

H
hjxilinx 已提交
1055 1056
      return;
    }
1057

H
Haojun Liao 已提交
1058
    if (pSupporter->pTSBuf == NULL) {
1059
      tscDebug("%p create tmp file for ts block:%s, size:%d bytes", pSql, pBuf->path, numOfRows);
H
Haojun Liao 已提交
1060 1061 1062
      pSupporter->pTSBuf = pBuf;
    } else {
      assert(pQueryInfo->numOfTables == 1);  // for subquery, only one
H
Haojun Liao 已提交
1063
      tsBufMerge(pSupporter->pTSBuf, pBuf);
H
Haojun Liao 已提交
1064
      tsBufDestroy(pBuf);
H
Haojun Liao 已提交
1065
    }
H
hjxilinx 已提交
1066

H
Haojun Liao 已提交
1067 1068
    // continue to retrieve ts-comp data from vnode
    if (!pRes->completed) {
S
slguan 已提交
1069
      taosGetTmpfilePath("ts-join", pSupporter->path);
S
TD-1207  
Shengliang Guan 已提交
1070
      pSupporter->f = fopen(pSupporter->path, "wb");
H
Haojun Liao 已提交
1071
      pRes->row = pRes->numOfRows;
H
hjxilinx 已提交
1072

H
Haojun Liao 已提交
1073 1074
      taos_fetch_rows_a(tres, tsCompRetrieveCallback, param);
      return;
H
hjxilinx 已提交
1075
    }
H
Haojun Liao 已提交
1076
  }
H
Haojun Liao 已提交
1077

H
Haojun Liao 已提交
1078 1079
  if (hasMoreVnodesToTry(pSql)) {
    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
1080

H
Haojun Liao 已提交
1081 1082 1083
    int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
    pTableMetaInfo->vgroupIndex += 1;
    assert(pTableMetaInfo->vgroupIndex < totalVgroups);
H
Haojun Liao 已提交
1084

1085
    tscDebug("%p results from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%" PRId64,
H
Haojun Liao 已提交
1086 1087
             pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups,
             pRes->numOfClauseTotal);
H
Haojun Liao 已提交
1088

H
Haojun Liao 已提交
1089 1090
    pCmd->command = TSDB_SQL_SELECT;
    tscResetForNextRetrieve(&pSql->res);
H
Haojun Liao 已提交
1091

H
Haojun Liao 已提交
1092
    assert(pSupporter->f == NULL);
S
slguan 已提交
1093
    taosGetTmpfilePath("ts-join", pSupporter->path);
H
Haojun Liao 已提交
1094 1095
    
    // TODO check for failure
S
TD-1207  
Shengliang Guan 已提交
1096
    pSupporter->f = fopen(pSupporter->path, "wb");
H
Haojun Liao 已提交
1097
    pRes->row = pRes->numOfRows;
H
Haojun Liao 已提交
1098

H
Haojun Liao 已提交
1099 1100 1101 1102 1103
    // set the callback function
    pSql->fp = tscJoinQueryCallback;
    tscProcessSql(pSql);
    return;
  }
H
Haojun Liao 已提交
1104

D
fix bug  
dapan1121 已提交
1105
  if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
H
Haojun Liao 已提交
1106
    return;
1107
  }  
H
hjxilinx 已提交
1108

1109
  tscDebug("%p all subquery retrieve ts complete, do ts block intersect", pParentSql);
H
hjxilinx 已提交
1110

H
Haojun Liao 已提交
1111 1112 1113
  // proceeds to launched secondary query to retrieve final data
  SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
  SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
H
Haojun Liao 已提交
1114

H
Haojun Liao 已提交
1115 1116 1117
  STimeWindow win = TSWINDOW_INITIALIZER;
  int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &win);
  if (num <= 0) {  // no result during ts intersect
1118
    tscDebug("%p no results generated in ts intersection, free all sub SqlObj and quit", pParentSql);
H
Haojun Liao 已提交
1119
    freeJoinSubqueryObj(pParentSql);
1120 1121 1122

    // set no result command
    pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
H
Haojun Liao 已提交
1123
    (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
H
Haojun Liao 已提交
1124 1125 1126 1127 1128 1129
    return;
  }

  // launch the query the retrieve actual results from vnode along with the filtered timestamp
  SQueryInfo* pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
  updateQueryTimeRange(pPQueryInfo, &win);
H
Haojun Liao 已提交
1130 1131

  //update the vgroup that involved in real data query
H
Haojun Liao 已提交
1132
  tscLaunchRealSubqueries(pParentSql);
H
Haojun Liao 已提交
1133
}
H
Haojun Liao 已提交
1134

H
Haojun Liao 已提交
1135 1136
static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfRows) {
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
1137

H
Haojun Liao 已提交
1138
  SSqlObj* pParentSql = pSupporter->pObj;
H
Haojun Liao 已提交
1139

H
Haojun Liao 已提交
1140 1141 1142
  SSqlObj* pSql = (SSqlObj*)tres;
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;
H
Haojun Liao 已提交
1143

H
Haojun Liao 已提交
1144
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
D
dapan1121 已提交
1145 1146 1147

  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, pParentSql->res.code);
D
fix bug  
dapan1121 已提交
1148 1149 1150 1151
    if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
      return;
    }
    
D
dapan1121 已提交
1152 1153 1154 1155 1156 1157
    tscAsyncResultOnError(pParentSql);

    return;
  }

  
H
Haojun Liao 已提交
1158 1159
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(numOfRows == taos_errno(pSql));
H
hjxilinx 已提交
1160

H
Haojun Liao 已提交
1161 1162
    pParentSql->res.code = numOfRows;
    tscError("%p retrieve failed, index:%d, code:%s", pSql, pSupporter->subqueryIndex, tstrerror(numOfRows));
H
Haojun Liao 已提交
1163

H
Haojun Liao 已提交
1164
    tscAsyncResultOnError(pParentSql);
H
Haojun Liao 已提交
1165
    return;
H
Haojun Liao 已提交
1166
  }
H
Haojun Liao 已提交
1167

H
Haojun Liao 已提交
1168 1169 1170
  if (numOfRows >= 0) {
    pRes->numOfTotal += pRes->numOfRows;
  }
H
Haojun Liao 已提交
1171

H
Haojun Liao 已提交
1172
  SSubqueryState* pState = &pParentSql->subState;
H
Haojun Liao 已提交
1173 1174 1175
  if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) {
    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
    assert(pQueryInfo->numOfTables == 1);
H
Haojun Liao 已提交
1176

H
Haojun Liao 已提交
1177
    // for projection query, need to try next vnode if current vnode is exhausted
H
Haojun Liao 已提交
1178 1179
    int32_t numOfVgroups = 0;  // TODO refactor
    if (pTableMetaInfo->pVgroupTables != NULL) {
S
Shengliang Guan 已提交
1180
      numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
H
Haojun Liao 已提交
1181 1182 1183 1184 1185
    } else {
      numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
    }

    if ((++pTableMetaInfo->vgroupIndex) < numOfVgroups) {
H
Haojun Liao 已提交
1186
      tscDebug("%p no result in current vnode anymore, try next vnode, vgIndex:%d", pSql, pTableMetaInfo->vgroupIndex);
H
Haojun Liao 已提交
1187 1188
      pSql->cmd.command = TSDB_SQL_SELECT;
      pSql->fp = tscJoinQueryCallback;
H
Haojun Liao 已提交
1189

H
Haojun Liao 已提交
1190 1191
      tscProcessSql(pSql);
      return;
H
Haojun Liao 已提交
1192 1193
    } else {
      tscDebug("%p no result in current subquery anymore", pSql);
H
Haojun Liao 已提交
1194 1195 1196
    }
  }

D
fix bug  
dapan1121 已提交
1197
  if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
1198
    tscDebug("%p sub:%p,%d completed, total:%d", pParentSql, tres, pSupporter->subqueryIndex, pState->numOfSub);
H
Haojun Liao 已提交
1199 1200 1201
    return;
  }

H
Haojun Liao 已提交
1202
  tscDebug("%p all %d secondary subqueries retrieval completed, code:%d", tres, pState->numOfSub, pParentSql->res.code);
H
Haojun Liao 已提交
1203 1204 1205 1206 1207 1208 1209

  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    freeJoinSubqueryObj(pParentSql);
    pParentSql->res.completed = true;
  }

  // update the records for each subquery in parent sql object.
1210
  bool stableQuery = tscIsTwoStageSTableQuery(pQueryInfo, 0);
H
Haojun Liao 已提交
1211
  for (int32_t i = 0; i < pState->numOfSub; ++i) {
H
Haojun Liao 已提交
1212
    if (pParentSql->pSubs[i] == NULL) {
H
Haojun Liao 已提交
1213
      tscDebug("%p %p sub:%d not retrieve data", pParentSql, NULL, i);
H
Haojun Liao 已提交
1214
      continue;
H
hjxilinx 已提交
1215
    }
H
Haojun Liao 已提交
1216 1217

    SSqlRes* pRes1 = &pParentSql->pSubs[i]->res;
H
Haojun Liao 已提交
1218 1219

    if (pRes1->row > 0 && pRes1->numOfRows > 0) {
H
Haojun Liao 已提交
1220
      tscDebug("%p sub:%p index:%d numOfRows:%d total:%"PRId64 " (not retrieve)", pParentSql, pParentSql->pSubs[i], i,
H
Haojun Liao 已提交
1221 1222 1223
               pRes1->numOfRows, pRes1->numOfTotal);
      assert(pRes1->row < pRes1->numOfRows);
    } else {
1224
      if (!stableQuery) {
1225 1226 1227
        pRes1->numOfClauseTotal += pRes1->numOfRows;
      }

H
Haojun Liao 已提交
1228
      tscDebug("%p sub:%p index:%d numOfRows:%d total:%"PRId64, pParentSql, pParentSql->pSubs[i], i,
H
Haojun Liao 已提交
1229 1230
               pRes1->numOfRows, pRes1->numOfTotal);
    }
H
hjxilinx 已提交
1231
  }
H
Haojun Liao 已提交
1232 1233 1234

  // data has retrieved to client, build the join results
  tscBuildResFromSubqueries(pParentSql);
H
hjxilinx 已提交
1235 1236
}

1237
void tscFetchDatablockForSubquery(SSqlObj* pSql) {
H
Haojun Liao 已提交
1238
  assert(pSql->subState.numOfSub >= 1);
H
hjxilinx 已提交
1239
  
H
hjxilinx 已提交
1240
  int32_t numOfFetch = 0;
H
Haojun Liao 已提交
1241 1242
  bool    hasData = true;
  bool    reachLimit = false;
H
Haojun Liao 已提交
1243 1244

  // if the subquery is NULL, it does not involved in the final result generation
H
Haojun Liao 已提交
1245
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
1246 1247
    SSqlObj* pSub = pSql->pSubs[i];
    if (pSub == NULL) {
H
hjxilinx 已提交
1248 1249
      continue;
    }
H
Haojun Liao 已提交
1250

H
hjxilinx 已提交
1251
    SSqlRes *pRes = &pSub->res;
H
Haojun Liao 已提交
1252

H
hjxilinx 已提交
1253 1254
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0);

H
Haojun Liao 已提交
1255 1256
    if (!tscHasReachLimitation(pQueryInfo, pRes)) {
      if (pRes->row >= pRes->numOfRows) {
H
Haojun Liao 已提交
1257
        // no data left in current result buffer
H
Haojun Liao 已提交
1258 1259
        hasData = false;

H
Haojun Liao 已提交
1260 1261
        // The current query is completed for the active vnode, try next vnode if exists
        // If it is completed, no need to fetch anymore.
H
Haojun Liao 已提交
1262 1263
        if (!pRes->completed) {
          numOfFetch++;
H
Haojun Liao 已提交
1264
        }
H
hjxilinx 已提交
1265
      }
H
Haojun Liao 已提交
1266 1267
    } else {  // has reach the limitation, no data anymore
      if (pRes->row >= pRes->numOfRows) {
H
Haojun Liao 已提交
1268 1269
        reachLimit = true;
        hasData    = false;
H
Haojun Liao 已提交
1270 1271
        break;
      }
H
hjxilinx 已提交
1272
    }
H
Haojun Liao 已提交
1273
  }
H
hjxilinx 已提交
1274

H
hjxilinx 已提交
1275 1276 1277 1278
  // has data remains in client side, and continue to return data to app
  if (hasData) {
    tscBuildResFromSubqueries(pSql);
    return;
H
Haojun Liao 已提交
1279
  }
H
Haojun Liao 已提交
1280

H
Haojun Liao 已提交
1281 1282
  // If at least one subquery is completed in current vnode, try the next vnode in case of multi-vnode
  // super table projection query.
1283 1284 1285 1286 1287 1288 1289
  if (reachLimit) {
    pSql->res.completed = true;
    freeJoinSubqueryObj(pSql);

    if (pSql->res.code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, 0);
    } else {
H
Haojun Liao 已提交
1290
      tscAsyncResultOnError(pSql);
1291 1292 1293 1294 1295 1296
    }

    return;
  }

  if (numOfFetch <= 0) {
H
Haojun Liao 已提交
1297 1298
    bool tryNextVnode = false;

1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311
    bool orderedPrjQuery = false;
    for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
      SSqlObj* pSub = pSql->pSubs[i];
      if (pSub == NULL) {
        continue;
      }

      SQueryInfo* p = tscGetQueryInfoDetail(&pSub->cmd, 0);
      orderedPrjQuery = tscNonOrderedProjectionQueryOnSTable(p, 0);
      if (orderedPrjQuery) {
        break;
      }
    }
H
Haojun Liao 已提交
1312

D
fix bug  
dapan1121 已提交
1313

1314
    if (orderedPrjQuery) {
1315
      for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
Haojun Liao 已提交
1316 1317
        SSqlObj* pSub = pSql->pSubs[i];
        if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) {
D
fix bug  
dapan1121 已提交
1318
          subquerySetState(pSub, &pSql->subState, i, 0);
H
Haojun Liao 已提交
1319 1320 1321
        }
      }
    }
D
fix bug  
dapan1121 已提交
1322
    
H
Haojun Liao 已提交
1323 1324 1325 1326 1327 1328 1329 1330 1331

    for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
      SSqlObj* pSub = pSql->pSubs[i];
      if (pSub == NULL) {
        continue;
      }

      SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0);

1332 1333
      if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && pSub->res.row >= pSub->res.numOfRows &&
          pSub->res.completed) {
H
Haojun Liao 已提交
1334 1335 1336 1337 1338 1339
        STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
        assert(pQueryInfo->numOfTables == 1);

        // for projection query, need to try next vnode if current vnode is exhausted
        int32_t numOfVgroups = 0;  // TODO refactor
        if (pTableMetaInfo->pVgroupTables != NULL) {
S
Shengliang Guan 已提交
1340
          numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
H
Haojun Liao 已提交
1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361
        } else {
          numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
        }

        if ((++pTableMetaInfo->vgroupIndex) < numOfVgroups) {
          tscDebug("%p no result in current vnode anymore, try next vnode, vgIndex:%d", pSub,
                   pTableMetaInfo->vgroupIndex);
          pSub->cmd.command = TSDB_SQL_SELECT;
          pSub->fp = tscJoinQueryCallback;

          tscProcessSql(pSub);
          tryNextVnode = true;
        } else {
          tscDebug("%p no result in current subquery anymore", pSub);
        }
      }
    }

    if (tryNextVnode) {
      return;
    }
H
Haojun Liao 已提交
1362 1363 1364 1365

    pSql->res.completed = true;
    freeJoinSubqueryObj(pSql);

H
hjxilinx 已提交
1366 1367 1368
    if (pSql->res.code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, 0);
    } else {
H
Haojun Liao 已提交
1369
      tscAsyncResultOnError(pSql);
H
hjxilinx 已提交
1370
    }
1371

H
hjxilinx 已提交
1372 1373 1374 1375
    return;
  }

  // TODO multi-vnode retrieve for projection query with limitation has bugs, since the global limiation is not handled
H
Haojun Liao 已提交
1376
  // retrieve data from current vnode.
1377
  tscDebug("%p retrieve data from %d subqueries", pSql, numOfFetch);
H
Haojun Liao 已提交
1378
  SJoinSupporter* pSupporter = NULL;
1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391

  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
    SSqlObj* pSql1 = pSql->pSubs[i];
    if (pSql1 == NULL) {
      continue;
    }

    SSqlRes* pRes1 = &pSql1->res;

    if (pRes1->row >= pRes1->numOfRows) {
      subquerySetState(pSql1, &pSql->subState, i, 0);
    }
  }
H
Haojun Liao 已提交
1392

H
Haojun Liao 已提交
1393
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
1394 1395 1396 1397
    SSqlObj* pSql1 = pSql->pSubs[i];
    if (pSql1 == NULL) {
      continue;
    }
H
Haojun Liao 已提交
1398

H
hjxilinx 已提交
1399 1400 1401
    SSqlRes* pRes1 = &pSql1->res;
    SSqlCmd* pCmd1 = &pSql1->cmd;

1402
    pSupporter = (SJoinSupporter*)pSql1->param;
H
hjxilinx 已提交
1403 1404 1405 1406 1407 1408 1409 1410

    // wait for all subqueries completed
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd1, 0);
    assert(pRes1->numOfRows >= 0 && pQueryInfo->numOfTables == 1);

    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
    
    if (pRes1->row >= pRes1->numOfRows) {
1411
      tscDebug("%p subquery:%p retrieve data from vnode, subquery:%d, vgroupIndex:%d", pSql, pSql1,
H
hjxilinx 已提交
1412
               pSupporter->subqueryIndex, pTableMetaInfo->vgroupIndex);
H
hjxilinx 已提交
1413 1414

      tscResetForNextRetrieve(pRes1);
H
Haojun Liao 已提交
1415
      pSql1->fp = joinRetrieveFinalResCallback;
H
hjxilinx 已提交
1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431

      if (pCmd1->command < TSDB_SQL_LOCAL) {
        pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
      }

      tscProcessSql(pSql1);
    }
  }
}

// all subqueries return, set the result output index
void tscSetupOutputColumnIndex(SSqlObj* pSql) {
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;


H
Haojun Liao 已提交
1432
  // the column transfer support struct has been built
H
hjxilinx 已提交
1433
  if (pRes->pColumnIndex != NULL) {
H
Haojun Liao 已提交
1434
    return;
H
hjxilinx 已提交
1435 1436 1437 1438
  }

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

S
Shengliang Guan 已提交
1439
  int32_t numOfExprs = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
H
Haojun Liao 已提交
1440
  pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * numOfExprs);
H
Haojun Liao 已提交
1441 1442 1443 1444
  if (pRes->pColumnIndex == NULL) {
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return;
  }
H
Haojun Liao 已提交
1445 1446

  for (int32_t i = 0; i < numOfExprs; ++i) {
H
hjxilinx 已提交
1447 1448 1449 1450 1451
    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);

    int32_t tableIndexOfSub = -1;
    for (int32_t j = 0; j < pQueryInfo->numOfTables; ++j) {
      STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, j);
1452
      if (pTableMetaInfo->pTableMeta->id.uid == pExpr->uid) {
H
hjxilinx 已提交
1453 1454 1455 1456 1457 1458 1459 1460 1461 1462
        tableIndexOfSub = j;
        break;
      }
    }

    assert(tableIndexOfSub >= 0 && tableIndexOfSub < pQueryInfo->numOfTables);
    
    SSqlCmd* pSubCmd = &pSql->pSubs[tableIndexOfSub]->cmd;
    SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(pSubCmd, 0);
    
H
Haojun Liao 已提交
1463 1464
    size_t numOfSubExpr = taosArrayGetSize(pSubQueryInfo->exprList);
    for (int32_t k = 0; k < numOfSubExpr; ++k) {
H
hjxilinx 已提交
1465 1466 1467 1468 1469 1470 1471
      SSqlExpr* pSubExpr = tscSqlExprGet(pSubQueryInfo, k);
      if (pExpr->functionId == pSubExpr->functionId && pExpr->colInfo.colId == pSubExpr->colInfo.colId) {
        pRes->pColumnIndex[i] = (SColumnIndex){.tableIndex = tableIndexOfSub, .columnIndex = k};
        break;
      }
    }
  }
H
Haojun Liao 已提交
1472 1473

  // restore the offset value for super table query in case of final result.
1474
  tscRestoreFuncForSTableQuery(pQueryInfo);
H
Haojun Liao 已提交
1475
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
1476 1477 1478 1479
}

void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
  SSqlObj* pSql = (SSqlObj*)tres;
H
Haojun Liao 已提交
1480

1481
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
1482
  SSqlObj* pParentSql = pSupporter->pObj;
D
fix bug  
dapan1121 已提交
1483
  
H
hjxilinx 已提交
1484
  // There is only one subquery and table for each subquery.
H
hjxilinx 已提交
1485
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
D
fix bug  
dapan1121 已提交
1486 1487
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);

H
hjxilinx 已提交
1488
  assert(pQueryInfo->numOfTables == 1 && pSql->cmd.numOfClause == 1);
H
hjxilinx 已提交
1489

H
Haojun Liao 已提交
1490 1491 1492
  // retrieve actual query results from vnode during the second stage join subquery
  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code, pParentSql->res.code);
D
fix bug  
dapan1121 已提交
1493 1494 1495
    if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
      return;
    }
D
fix bug  
dapan1121 已提交
1496

H
Haojun Liao 已提交
1497
    tscAsyncResultOnError(pParentSql);
H
hjxilinx 已提交
1498

H
Haojun Liao 已提交
1499 1500
    return;
  }
H
hjxilinx 已提交
1501

H
Haojun Liao 已提交
1502 1503 1504
  // TODO here retry is required, not directly returns to client
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(taos_errno(pSql) == code);
H
hjxilinx 已提交
1505

1506
    tscError("%p abort query, code:%s, global code:%s", pSql, tstrerror(code), tstrerror(pParentSql->res.code));
H
Haojun Liao 已提交
1507 1508
    pParentSql->res.code = code;

D
fix bug  
dapan1121 已提交
1509 1510 1511 1512
    if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
      return;
    }
    
H
Haojun Liao 已提交
1513
    tscAsyncResultOnError(pParentSql);
H
Haojun Liao 已提交
1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533

    return;
  }

  // retrieve <tid, tag> tuples from vnode
  if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) {
    pSql->fp = tidTagRetrieveCallback;
    pSql->cmd.command = TSDB_SQL_FETCH;
    tscProcessSql(pSql);
    return;
  }

  // retrieve ts_comp info from vnode
  if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
    pSql->fp = tsCompRetrieveCallback;
    pSql->cmd.command = TSDB_SQL_FETCH;
    tscProcessSql(pSql);
    return;
  }

1534 1535
  // In case of consequence query from other vnode, do not wait for other query response here.
  if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) {
D
fix bug  
dapan1121 已提交
1536
    if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
1537
      return;
1538
    }      
H
Haojun Liao 已提交
1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549
  }

  tscSetupOutputColumnIndex(pParentSql);

  /**
   * if the query is a continue query (vgroupIndex > 0 for projection query) for next vnode, do the retrieval of
   * data instead of returning to its invoker
   */
  if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
    pSql->fp = joinRetrieveFinalResCallback;  // continue retrieve data
    pSql->cmd.command = TSDB_SQL_FETCH;
1550
    
H
Haojun Liao 已提交
1551 1552 1553 1554 1555
    tscProcessSql(pSql);
  } else {  // first retrieve from vnode during the secondary stage sub-query
    // set the command flag must be after the semaphore been correctly set.
    if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
      (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
H
hjxilinx 已提交
1556
    } else {
H
Haojun Liao 已提交
1557
      tscAsyncResultOnError(pParentSql);
H
hjxilinx 已提交
1558 1559 1560 1561 1562 1563 1564
    }
  }
}

/////////////////////////////////////////////////////////////////////////////////////////
static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code);

1565
static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
H
hjxilinx 已提交
1566

H
Haojun Liao 已提交
1567
int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) {
H
hjxilinx 已提交
1568 1569 1570 1571
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  
  pSql->res.qhandle = 0x1;
H
Haojun Liao 已提交
1572 1573
  assert(pSql->res.numOfRows == 0);

H
hjxilinx 已提交
1574
  if (pSql->pSubs == NULL) {
H
Haojun Liao 已提交
1575
    pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
H
hjxilinx 已提交
1576
    if (pSql->pSubs == NULL) {
1577
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
1578 1579 1580
    }
  }
  
1581
  SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL);
H
hjxilinx 已提交
1582
  if (pNew == NULL) {
1583
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
1584 1585
  }
  
1586
  pSql->pSubs[tableIndex] = pNew;
H
hjxilinx 已提交
1587 1588 1589 1590 1591 1592 1593 1594
  
  if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
    addGroupInfoForSubquery(pSql, pNew, 0, tableIndex);
    
    // refactor as one method
    SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
    assert(pNewQueryInfo != NULL);
    
1595 1596 1597 1598 1599 1600 1601
    // update the table index
    size_t num = taosArrayGetSize(pNewQueryInfo->colList);
    for (int32_t i = 0; i < num; ++i) {
      SColumn* pCol = taosArrayGetP(pNewQueryInfo->colList, i);
      pCol->colIndex.tableIndex = 0;
    }
    
H
hjxilinx 已提交
1602 1603 1604 1605 1606 1607 1608
    pSupporter->colList = pNewQueryInfo->colList;
    pNewQueryInfo->colList = NULL;
    
    pSupporter->exprList = pNewQueryInfo->exprList;
    pNewQueryInfo->exprList = NULL;
    
    pSupporter->fieldsInfo = pNewQueryInfo->fieldsInfo;
H
hjxilinx 已提交
1609
  
H
hjxilinx 已提交
1610 1611
    // this data needs to be transfer to support struct
    memset(&pNewQueryInfo->fieldsInfo, 0, sizeof(SFieldInfo));
H
Haojun Liao 已提交
1612 1613 1614
    if (tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond) != 0) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
1615

H
Haojun Liao 已提交
1616 1617 1618
    pSupporter->groupInfo = pNewQueryInfo->groupbyExpr;
    memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));

H
hjxilinx 已提交
1619
    pNew->cmd.numOfCols = 0;
1620
    pNewQueryInfo->interval.interval = 0;
H
Haojun Liao 已提交
1621 1622 1623 1624 1625
    pSupporter->limit = pNewQueryInfo->limit;

    pNewQueryInfo->limit.limit = -1;
    pNewQueryInfo->limit.offset = 0;

H
hjxilinx 已提交
1626 1627 1628
    // backup the data and clear it in the sqlcmd object
    memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));
    
H
hjxilinx 已提交
1629
    tscInitQueryInfo(pNewQueryInfo);
H
hjxilinx 已提交
1630 1631
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
    
weixin_48148422's avatar
weixin_48148422 已提交
1632
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // return the tableId & tag
H
Haojun Liao 已提交
1633
      SColumnIndex colIndex = {0};
H
Haojun Liao 已提交
1634 1635 1636 1637

      STagCond* pTagCond = &pSupporter->tagCond;
      assert(pTagCond->joinInfo.hasJoin);

1638
      int32_t tagColId = tscGetJoinTagColIdByUid(pTagCond, pTableMetaInfo->pTableMeta->id.uid);
H
Haojun Liao 已提交
1639 1640 1641
      SSchema* s = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);

      colIndex.columnIndex = tscGetTagColIndexById(pTableMetaInfo->pTableMeta, tagColId);
1642

H
Haojun Liao 已提交
1643
      int16_t bytes = 0;
H
Haojun Liao 已提交
1644
      int16_t type  = 0;
H
Haojun Liao 已提交
1645 1646
      int32_t inter = 0;

H
Haojun Liao 已提交
1647
      getResultDataInfo(s->type, s->bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0);
H
Haojun Liao 已提交
1648

S
Shengliang Guan 已提交
1649
      SSchema s1 = {.colId = s->colId, .type = (uint8_t)type, .bytes = bytes};
H
Haojun Liao 已提交
1650
      pSupporter->tagSize = s1.bytes;
1651
      assert(isValidDataType(s1.type) && s1.bytes > 0);
H
Haojun Liao 已提交
1652

1653 1654
      // set get tags query type
      TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
1655
      tscAddFuncInSelectClause(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &colIndex, &s1, TSDB_COL_TAG);
1656
      size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
1657
  
1658
      tscDebug(
1659
          "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to tid_tag query to retrieve (tableId, tags), "
S
TD-1057  
Shengliang Guan 已提交
1660
          "exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, tagIndex:%d, name:%s",
1661
          pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo),
1662
          numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, colIndex.columnIndex, tNameGetTableName(&pNewQueryInfo->pTableMetaInfo[0]->name));
1663 1664
    } else {
      SSchema      colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
H
Haojun Liao 已提交
1665
      SColumnIndex colIndex = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
1666
      tscAddFuncInSelectClause(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &colIndex, &colSchema, TSDB_COL_NORMAL);
1667 1668 1669 1670

      // set the tags value for ts_comp function
      SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0);

H
Haojun Liao 已提交
1671
      if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
1672
        int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
1673
        pExpr->param->i64 = tagColId;
H
Haojun Liao 已提交
1674 1675
        pExpr->numOfParams = 1;
      }
1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692

      // add the filter tag column
      if (pSupporter->colList != NULL) {
        size_t s = taosArrayGetSize(pSupporter->colList);

        for (int32_t i = 0; i < s; ++i) {
          SColumn *pCol = taosArrayGetP(pSupporter->colList, i);

          if (pCol->numOfFilters > 0) {  // copy to the pNew->cmd.colList if it is filtered.
            SColumn *p = tscColumnClone(pCol);
            taosArrayPush(pNewQueryInfo->colList, &p);
          }
        }
      }

      size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);

1693
      tscDebug(
B
Bomin Zhang 已提交
1694
          "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%u, transfer to ts_comp query to retrieve timestamps, "
S
TD-1057  
Shengliang Guan 已提交
1695
          "exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s",
1696
          pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo),
1697
          numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pNewQueryInfo->pTableMetaInfo[0]->name));
1698
    }
H
hjxilinx 已提交
1699
  } else {
H
hjxilinx 已提交
1700
    assert(0);
H
hjxilinx 已提交
1701 1702 1703 1704
    SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
    pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
  }

H
Haojun Liao 已提交
1705
  return TSDB_CODE_SUCCESS;
H
hjxilinx 已提交
1706 1707
}

H
Haojun Liao 已提交
1708
void tscHandleMasterJoinQuery(SSqlObj* pSql) {
H
hjxilinx 已提交
1709
  SSqlCmd* pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1710 1711
  SSqlRes* pRes = &pSql->res;

H
hjxilinx 已提交
1712 1713
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0);
1714

H
Haojun Liao 已提交
1715
  int32_t code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1716
  pSql->subState.numOfSub = pQueryInfo->numOfTables;
H
Haojun Liao 已提交
1717

1718 1719 1720 1721 1722 1723
  if (pSql->subState.states == NULL) {
    pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states));
    if (pSql->subState.states == NULL) {
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      goto _error;
    }
D
fix bug  
dapan1121 已提交
1724 1725
    
    pthread_mutex_init(&pSql->subState.mutex, NULL);
1726
  }
D
dapan1121 已提交
1727 1728 1729

  memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub);
  tscDebug("%p reset all sub states to 0", pSql);
1730
  
H
Haojun Liao 已提交
1731 1732
  bool hasEmptySub = false;

1733
  tscDebug("%p start subquery, total:%d", pSql, pQueryInfo->numOfTables);
H
hjxilinx 已提交
1734
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
Haojun Liao 已提交
1735

H
Haojun Liao 已提交
1736
    SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i);
H
hjxilinx 已提交
1737 1738 1739
    
    if (pSupporter == NULL) {  // failed to create support struct, abort current query
      tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i);
H
Haojun Liao 已提交
1740 1741
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      goto _error;
H
hjxilinx 已提交
1742 1743
    }
    
H
Haojun Liao 已提交
1744
    code = tscCreateJoinSubquery(pSql, i, pSupporter);
H
hjxilinx 已提交
1745 1746
    if (code != TSDB_CODE_SUCCESS) {  // failed to create subquery object, quit query
      tscDestroyJoinSupporter(pSupporter);
H
Haojun Liao 已提交
1747 1748 1749 1750 1751 1752 1753
      goto _error;
    }

    SSqlObj* pSub = pSql->pSubs[i];
    STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSub->cmd, 0, 0);
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) && (pTableMetaInfo->vgroupList->numOfVgroups == 0)) {
      hasEmptySub = true;
H
hjxilinx 已提交
1754 1755 1756
      break;
    }
  }
H
Haojun Liao 已提交
1757

H
Haojun Liao 已提交
1758 1759 1760 1761 1762
  if (hasEmptySub) {  // at least one subquery is empty, do nothing and return
    freeJoinSubqueryObj(pSql);
    pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
    (*pSql->fp)(pSql->param, pSql, 0);
  } else {
D
fix bug  
dapan1121 已提交
1763
    int fail = 0;
H
Haojun Liao 已提交
1764
    for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
Haojun Liao 已提交
1765
      SSqlObj* pSub = pSql->pSubs[i];
D
fix bug  
dapan1121 已提交
1766 1767 1768 1769 1770
      if (fail) {
        (*pSub->fp)(pSub->param, pSub, 0);
        continue;
      }
      
H
Haojun Liao 已提交
1771
      if ((code = tscProcessSql(pSub)) != TSDB_CODE_SUCCESS) {
D
fix bug  
dapan1121 已提交
1772 1773
        pRes->code = code;
        (*pSub->fp)(pSub->param, pSub, 0);
D
fix bug  
dapan1121 已提交
1774
        fail = 1;
H
Haojun Liao 已提交
1775 1776 1777
      }
    }

D
fix bug  
dapan1121 已提交
1778 1779 1780 1781
    if(fail) {
      return;
    }

H
Haojun Liao 已提交
1782 1783 1784 1785 1786 1787 1788
    pSql->cmd.command = TSDB_SQL_TABLE_JOIN_RETRIEVE;
  }

  return;

  _error:
  pRes->code = code;
H
Haojun Liao 已提交
1789
  tscAsyncResultOnError(pSql);
H
hjxilinx 已提交
1790 1791
}

H
Haojun Liao 已提交
1792 1793
static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) {
  assert(numOfSubs <= pSql->subState.numOfSub && numOfSubs >= 0);
H
hjxilinx 已提交
1794 1795 1796 1797 1798 1799 1800
  
  for(int32_t i = 0; i < numOfSubs; ++i) {
    SSqlObj* pSub = pSql->pSubs[i];
    assert(pSub != NULL);
    
    SRetrieveSupport* pSupport = pSub->param;
    
S
TD-1848  
Shengliang Guan 已提交
1801 1802
    tfree(pSupport->localBuffer);
    tfree(pSupport);
H
hjxilinx 已提交
1803
    
1804
    taos_free_result(pSub);
H
hjxilinx 已提交
1805 1806 1807
  }
}

D
TD-2516  
dapan1121 已提交
1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824
void tscLockByThread(int64_t *lockedBy) {
  int64_t tid = taosGetSelfPthreadId();
  int     i = 0;
  while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) {
    if (++i % 100 == 0) {
      sched_yield();
    }
  }
}

void tscUnlockByThread(int64_t *lockedBy) {
  int64_t tid = taosGetSelfPthreadId();
  if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) {
    assert(false);
  }
}

1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848
typedef struct SFirstRoundQuerySup {
  SSqlObj  *pParent;
  int32_t   numOfRows;
  SArray   *pColsInfo;
  int32_t   tagLen;
  STColumn *pTagCols;
  SArray   *pResult;   // SArray<SInterResult>
  int64_t   interval;
  char*     buf;
  int32_t   bufLen;
} SFirstRoundQuerySup;

void doAppendData(SInterResult* pInterResult, TAOS_ROW row, int32_t numOfCols, SQueryInfo* pQueryInfo) {
  TSKEY key = INT64_MIN;
  for(int32_t i = 0; i < numOfCols; ++i) {
    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
    if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
      continue;
    }

    if (pExpr->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
      key = *(TSKEY*) row[i];
      continue;
    }
D
TD-2516  
dapan1121 已提交
1849

1850 1851 1852 1853 1854 1855 1856 1857
    double v = 0;
    if (row[i] != NULL) {
      v = *(double*) row[i];
    } else {
      SET_DOUBLE_NULL(&v);
    }

    int32_t id = pExpr->colInfo.colId;
H
Haojun Liao 已提交
1858
    int32_t numOfQueriedCols = (int32_t) taosArrayGetSize(pInterResult->pResult);
1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958

    SArray* p = NULL;
    for(int32_t j = 0; j < numOfQueriedCols; ++j) {
      SStddevInterResult* pColRes = taosArrayGet(pInterResult->pResult, j);
      if (pColRes->colId == id) {
        p = pColRes->pResult;
        break;
      }
    }

    //append a new column
    if (p == NULL) {
      SStddevInterResult t = {.colId = id, .pResult = taosArrayInit(10, sizeof(SResPair)),};
      taosArrayPush(pInterResult->pResult, &t);
      p = t.pResult;
    }

    SResPair pair = {.avg = v, .key = key};
    taosArrayPush(p, &pair);
  }
}

void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
  SSqlObj* pSql = (SSqlObj*)tres;
  SSqlRes* pRes = &pSql->res;

  SFirstRoundQuerySup* pSup = param;
  SQueryInfo*          pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

  if (numOfRows > 0) {
    TAOS_ROW row = NULL;
    int32_t  numOfCols = taos_field_count(tres);

    if (pSup->tagLen == 0) {  // no tags, all rows belong to one group
      SInterResult interResult = {.tags = NULL, .pResult = taosArrayInit(4, sizeof(SStddevInterResult))};
      taosArrayPush(pSup->pResult, &interResult);

      while ((row = taos_fetch_row(tres)) != NULL) {
        doAppendData(&interResult, row, numOfCols, pQueryInfo);
      }
    } else {  // tagLen > 0
      char* p = calloc(1, pSup->tagLen);

      while ((row = taos_fetch_row(tres)) != NULL) {
        int32_t* length = taos_fetch_lengths(tres);
        memset(p, 0, pSup->tagLen);

        int32_t offset = 0;
        for (int32_t i = 0; i < numOfCols && offset < pSup->tagLen; ++i) {
          SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
          if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
            memcpy(p + offset, row[i], length[i]);
            offset += pExpr->resBytes;
          }
        }

        assert(offset == pSup->tagLen);
        size_t size = taosArrayGetSize(pSup->pResult);

        if (size > 0) {
          SInterResult* pInterResult = taosArrayGetLast(pSup->pResult);
          if (memcmp(pInterResult->tags, p, pSup->tagLen) == 0) {  // belongs to the same group
            doAppendData(pInterResult, row, numOfCols, pQueryInfo);
          } else {
            char* tags = malloc( pSup->tagLen);
            memcpy(tags, p, pSup->tagLen);

            SInterResult interResult = {.tags = tags, .pResult = taosArrayInit(4, sizeof(SStddevInterResult))};
            taosArrayPush(pSup->pResult, &interResult);
            doAppendData(&interResult, row, numOfCols, pQueryInfo);
          }
        } else {
          char* tags = malloc(pSup->tagLen);
          memcpy(tags, p, pSup->tagLen);

          SInterResult interResult = {.tags = tags, .pResult = taosArrayInit(4, sizeof(SStddevInterResult))};
          taosArrayPush(pSup->pResult, &interResult);
          doAppendData(&interResult, row, numOfCols, pQueryInfo);
        }
      }

      tfree(p);
    }
  }

  pSup->numOfRows += numOfRows;
  if (!pRes->completed) {
    taos_fetch_rows_a(tres, tscFirstRoundRetrieveCallback, param);
    return;
  }

  // set the parameters for the second round query process
  SSqlObj    *pParent = pSup->pParent;
  SSqlCmd    *pPCmd   = &pParent->cmd;
  SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(pPCmd, 0);

  if (pSup->numOfRows > 0) {
    SBufferWriter bw = tbufInitWriter(NULL, false);
    interResToBinary(&bw, pSup->pResult, pSup->tagLen);

H
Haojun Liao 已提交
1959
    pQueryInfo1->bufLen = (int32_t) tbufTell(&bw);
1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026
    pQueryInfo1->buf = tbufGetData(&bw, true);

    // set the serialized binary string as the parameter of arithmetic expression
    tbufCloseWriter(&bw);
  }

  taosArrayDestroyEx(pSup->pResult, freeInterResult);
  taosArrayDestroy(pSup->pColsInfo);
  tfree(pSup);

  taos_free_result(pSql);

  pQueryInfo1->round = 1;
  tscDoQuery(pParent);
}

void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) {
  int32_t c = taos_errno(tres);
  if (c != TSDB_CODE_SUCCESS) {
    // TODO HANDLE ERROR
  }

  taos_fetch_rows_a(tres, tscFirstRoundRetrieveCallback, param);
}

int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  STableMetaInfo* pTableMetaInfo1 = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);

  SFirstRoundQuerySup *pSup = calloc(1, sizeof(SFirstRoundQuerySup));

  pSup->pParent  = pSql;
  pSup->interval = pQueryInfo->interval.interval;
  pSup->pResult  = taosArrayInit(6, sizeof(SStddevInterResult));
  pSup->pColsInfo = taosArrayInit(6, sizeof(int16_t)); // result column id

  SSqlObj *pNew = createSubqueryObj(pSql, 0, tscFirstRoundCallback, pSup, TSDB_SQL_SELECT, NULL);
  SSqlCmd *pCmd = &pNew->cmd;

  tscClearSubqueryInfo(pCmd);
  tscFreeSqlResult(pSql);

  SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  assert(pQueryInfo->numOfTables == 1);

  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);

  tscInitQueryInfo(pNewQueryInfo);
  pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr;
  if (pQueryInfo->groupbyExpr.columnInfo != NULL) {
    pNewQueryInfo->groupbyExpr.columnInfo = taosArrayDup(pQueryInfo->groupbyExpr.columnInfo);
    if (pNewQueryInfo->groupbyExpr.columnInfo == NULL) {
      terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
//      goto _error;
    }
  }

  if (tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond) != 0) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
//    goto _error;
  }

  pNewQueryInfo->interval = pQueryInfo->interval;

  pCmd->command = TSDB_SQL_SELECT;
  pNew->fp = tscFirstRoundCallback;

H
Haojun Liao 已提交
2027
  int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080

  int32_t index = 0;
  int32_t numOfTags = 0;
  for(int32_t i = 0; i < numOfExprs; ++i) {
    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
    if (pExpr->functionId == TSDB_FUNC_TS && pQueryInfo->interval.interval > 0) {
      taosArrayPush(pSup->pColsInfo, &pExpr->resColId);

      SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
      SSchema* schema = tscGetColumnSchemaById(pTableMetaInfo1->pTableMeta, pExpr->colInfo.colId);

      SSqlExpr* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_TS, &colIndex, schema, TSDB_COL_NORMAL);
      p->resColId = pExpr->resColId;  // update the result column id
    } else if (pExpr->functionId == TSDB_FUNC_STDDEV_DST) {
      taosArrayPush(pSup->pColsInfo, &pExpr->resColId);

      SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = pExpr->colInfo.colIndex};
      SSchema schema = {.type = TSDB_DATA_TYPE_DOUBLE, .bytes = sizeof(double)};
      tstrncpy(schema.name, pExpr->aliasName, tListLen(schema.name));

      SSqlExpr* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_AVG, &colIndex, &schema, TSDB_COL_NORMAL);
      p->resColId = pExpr->resColId;  // update the result column id
    } else if (pExpr->functionId == TSDB_FUNC_TAG) {
      pSup->tagLen += pExpr->resBytes;
      SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = pExpr->colInfo.colIndex};

      SSchema* schema = NULL;
      if (pExpr->colInfo.colId != TSDB_TBNAME_COLUMN_INDEX) {
        schema = tscGetColumnSchemaById(pTableMetaInfo1->pTableMeta, pExpr->colInfo.colId);
      } else {
        schema = tGetTbnameColumnSchema();
      }

      SSqlExpr* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_TAG, &colIndex, schema, TSDB_COL_TAG);
      p->resColId = pExpr->resColId;
      numOfTags += 1;
    }
  }

  SColumnIndex columnIndex = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
  tscInsertPrimaryTsSourceColumn(pNewQueryInfo, &columnIndex);

  tscTansformFuncForSTableQuery(pNewQueryInfo);

  tscDebug(
      "%p first round subquery:%p tableIndex:%d, vgroupIndex:%d, numOfVgroups:%d, type:%d, query to retrieve timestamps, "
      "numOfExpr:%" PRIzu ", colList:%d, numOfOutputFields:%d, name:%s",
      pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pNewQueryInfo->type,
      tscSqlExprNumOfExprs(pNewQueryInfo), index+1, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name));

  tscHandleMasterSTableQuery(pNew);
  return TSDB_CODE_SUCCESS;
}
D
TD-2516  
dapan1121 已提交
2081

H
hjxilinx 已提交
2082 2083 2084 2085 2086
int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
  
  // pRes->code check only serves in launching metric sub-queries
2087
  if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
H
hjxilinx 已提交
2088
    pCmd->command = TSDB_SQL_RETRIEVE_LOCALMERGE;  // enable the abort of kill super table function.
H
hjxilinx 已提交
2089 2090 2091
    return pRes->code;
  }
  
2092
  tExtMemBuffer   **pMemoryBuf = NULL;
H
Haojun Liao 已提交
2093 2094
  tOrderDescriptor *pDesc  = NULL;
  SColumnModel     *pModel = NULL;
H
Haojun Liao 已提交
2095
  SColumnModel     *pFinalModel = NULL;
H
Haojun Liao 已提交
2096

H
Haojun Liao 已提交
2097
  pRes->qhandle = 0x1;  // hack the qhandle check
H
hjxilinx 已提交
2098
  
H
Haojun Liao 已提交
2099
  const uint32_t nBufferSize = (1u << 16u);  // 64KB
H
hjxilinx 已提交
2100
  
H
Haojun Liao 已提交
2101
  SQueryInfo     *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
2102
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
2103 2104
  SSubqueryState *pState = &pSql->subState;

H
Haojun Liao 已提交
2105 2106 2107 2108
  pState->numOfSub = 0;
  if (pTableMetaInfo->pVgroupTables == NULL) {
    pState->numOfSub = pTableMetaInfo->vgroupList->numOfVgroups;
  } else {
S
Shengliang Guan 已提交
2109
    pState->numOfSub = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
H
Haojun Liao 已提交
2110 2111
  }

H
Haojun Liao 已提交
2112
  assert(pState->numOfSub > 0);
H
hjxilinx 已提交
2113
  
H
Haojun Liao 已提交
2114
  int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, &pFinalModel, nBufferSize);
H
hjxilinx 已提交
2115
  if (ret != 0) {
2116
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
Haojun Liao 已提交
2117
    tscAsyncResultOnError(pSql);
S
TD-1848  
Shengliang Guan 已提交
2118
    tfree(pMemoryBuf);
H
hjxilinx 已提交
2119
    return ret;
H
hjxilinx 已提交
2120 2121
  }
  
H
Haojun Liao 已提交
2122
  tscDebug("%p retrieved query data from %d vnode(s)", pSql, pState->numOfSub);
2123
  pSql->pSubs = calloc(pState->numOfSub, POINTER_BYTES);
H
Haojun Liao 已提交
2124
  if (pSql->pSubs == NULL) {
S
TD-1848  
Shengliang Guan 已提交
2125
    tfree(pSql->pSubs);
H
Haojun Liao 已提交
2126
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
Haojun Liao 已提交
2127
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel,pState->numOfSub);
H
Haojun Liao 已提交
2128

H
Haojun Liao 已提交
2129
    tscAsyncResultOnError(pSql);
H
Haojun Liao 已提交
2130 2131 2132
    return ret;
  }

2133 2134 2135 2136 2137 2138 2139 2140
  if (pState->states == NULL) {
    pState->states = calloc(pState->numOfSub, sizeof(*pState->states));
    if (pState->states == NULL) {
      pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      tscAsyncResultOnError(pSql);
      tfree(pMemoryBuf);
      return ret;
    }
D
fix bug  
dapan1121 已提交
2141 2142

    pthread_mutex_init(&pState->mutex, NULL);
2143 2144 2145
  }

  memset(pState->states, 0, sizeof(*pState->states) * pState->numOfSub);
D
fix bug  
dapan1121 已提交
2146
  tscDebug("%p reset all sub states to 0", pSql);
2147
  
H
hjxilinx 已提交
2148 2149 2150
  pRes->code = TSDB_CODE_SUCCESS;
  
  int32_t i = 0;
H
Haojun Liao 已提交
2151
  for (; i < pState->numOfSub; ++i) {
H
hjxilinx 已提交
2152 2153 2154 2155 2156 2157 2158 2159
    SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport));
    if (trs == NULL) {
      tscError("%p failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
      break;
    }
    
    trs->pExtMemBuffer = pMemoryBuf;
    trs->pOrderDescriptor = pDesc;
H
Haojun Liao 已提交
2160

H
hjxilinx 已提交
2161 2162 2163
    trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage));
    if (trs->localBuffer == NULL) {
      tscError("%p failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
S
TD-1848  
Shengliang Guan 已提交
2164
      tfree(trs);
H
hjxilinx 已提交
2165 2166 2167
      break;
    }
    
H
Haojun Liao 已提交
2168 2169
    trs->subqueryIndex  = i;
    trs->pParentSql     = pSql;
H
hjxilinx 已提交
2170
    trs->pFinalColModel = pModel;
H
Haojun Liao 已提交
2171
    trs->pFFColModel    = pFinalModel;
H
Haojun Liao 已提交
2172

2173
    SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL);
H
hjxilinx 已提交
2174 2175
    if (pNew == NULL) {
      tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
S
TD-1848  
Shengliang Guan 已提交
2176 2177
      tfree(trs->localBuffer);
      tfree(trs);
H
hjxilinx 已提交
2178 2179 2180 2181 2182 2183 2184
      break;
    }
    
    // todo handle multi-vnode situation
    if (pQueryInfo->tsBuf) {
      SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
      pNewQueryInfo->tsBuf = tsBufClone(pQueryInfo->tsBuf);
H
Haojun Liao 已提交
2185
      assert(pNewQueryInfo->tsBuf != NULL);
H
hjxilinx 已提交
2186 2187
    }
    
2188
    tscDebug("%p sub:%p create subquery success. orderOfSub:%d", pSql, pNew, trs->subqueryIndex);
H
hjxilinx 已提交
2189 2190
  }
  
H
Haojun Liao 已提交
2191
  if (i < pState->numOfSub) {
H
hjxilinx 已提交
2192
    tscError("%p failed to prepare subquery structure and launch subqueries", pSql);
2193
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
2194
    
H
Haojun Liao 已提交
2195
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel, pState->numOfSub);
H
Haojun Liao 已提交
2196
    doCleanupSubqueries(pSql, i);
H
hjxilinx 已提交
2197 2198 2199
    return pRes->code;   // free all allocated resource
  }
  
2200
  if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
H
Haojun Liao 已提交
2201
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel, pState->numOfSub);
H
Haojun Liao 已提交
2202
    doCleanupSubqueries(pSql, i);
H
hjxilinx 已提交
2203 2204 2205
    return pRes->code;
  }
  
H
Haojun Liao 已提交
2206
  for(int32_t j = 0; j < pState->numOfSub; ++j) {
H
hjxilinx 已提交
2207 2208 2209
    SSqlObj* pSub = pSql->pSubs[j];
    SRetrieveSupport* pSupport = pSub->param;
    
2210
    tscDebug("%p sub:%p launch subquery, orderOfSub:%d.", pSql, pSub, pSupport->subqueryIndex);
H
hjxilinx 已提交
2211 2212
    tscProcessSql(pSub);
  }
H
Haojun Liao 已提交
2213

H
hjxilinx 已提交
2214 2215 2216
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
2217 2218
static void tscFreeRetrieveSup(SSqlObj *pSql) {
  SRetrieveSupport *trsupport = pSql->param;
2219

H
Haojun Liao 已提交
2220 2221 2222 2223 2224 2225 2226
  void* p = atomic_val_compare_exchange_ptr(&pSql->param, trsupport, 0);
  if (p == NULL) {
    tscDebug("%p retrieve supp already released", pSql);
    return;
  }

  tscDebug("%p start to free subquery supp obj:%p", pSql, trsupport);
S
TD-1848  
Shengliang Guan 已提交
2227 2228
  tfree(trsupport->localBuffer);
  tfree(trsupport);
H
hjxilinx 已提交
2229 2230
}

2231
static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows);
2232
static void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows);
H
hjxilinx 已提交
2233

H
Haojun Liao 已提交
2234
static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t code) {
H
hjxilinx 已提交
2235
// set no disk space error info
2236
  tscError("sub:%p failed to flush data to disk, reason:%s", tres, tstrerror(code));
H
Haojun Liao 已提交
2237
  SSqlObj* pParentSql = trsupport->pParentSql;
H
Haojun Liao 已提交
2238 2239

  pParentSql->res.code = code;
H
hjxilinx 已提交
2240
  trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
H
Haojun Liao 已提交
2241
  tscHandleSubqueryError(trsupport, tres, pParentSql->res.code);
H
hjxilinx 已提交
2242 2243
}

H
Haojun Liao 已提交
2244 2245 2246 2247
/*
 * current query failed, and the retry count is less than the available
 * count, retry query clear previous retrieved data, then launch a new sub query
 */
D
fix bug  
dapan1121 已提交
2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263
static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32_t code) {
  SRetrieveSupport *trsupport = malloc(sizeof(SRetrieveSupport));
  if (trsupport == NULL) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  memcpy(trsupport, oriTrs, sizeof(*trsupport));

  const uint32_t nBufferSize = (1u << 16u);  // 64KB
  trsupport->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage));
  if (trsupport->localBuffer == NULL) {
    tscError("%p failed to malloc buffer for local buffer, reason:%s", pSql, strerror(errno));
    tfree(trsupport);
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  
H
Haojun Liao 已提交
2264 2265 2266
  SSqlObj *pParentSql = trsupport->pParentSql;
  int32_t  subqueryIndex = trsupport->subqueryIndex;

S
TD-1732  
Shengliang Guan 已提交
2267 2268
  STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
  SVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
H
Haojun Liao 已提交
2269 2270 2271 2272 2273

  tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]);

  // clear local saved number of results
  trsupport->localBuffer->num = 0;
2274
  tscError("%p sub:%p retrieve/query failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSql, pSql,
H
Haojun Liao 已提交
2275 2276
           tstrerror(code), subqueryIndex, trsupport->numOfRetry);

2277
  SSqlObj *pNew = tscCreateSTableSubquery(trsupport->pParentSql, trsupport, pSql);
H
Haojun Liao 已提交
2278
  if (pNew == NULL) {
2279 2280
    tscError("%p sub:%p failed to create new subquery due to error:%s, abort retry, vgId:%d, orderOfSub:%d",
             trsupport->pParentSql, pSql, tstrerror(terrno), pVgroup->vgId, trsupport->subqueryIndex);
H
Haojun Liao 已提交
2281

2282
    pParentSql->res.code = terrno;
H
Haojun Liao 已提交
2283 2284 2285 2286 2287
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;

    return pParentSql->res.code;
  }

2288 2289 2290 2291 2292
  int32_t ret = tscProcessSql(pNew);

  // if failed to process sql, let following code handle the pSql
  if (ret == TSDB_CODE_SUCCESS) {
    taos_free_result(pSql);
H
Haojun Liao 已提交
2293 2294 2295
    return ret;
  } else {
    return ret;
2296
  }
H
Haojun Liao 已提交
2297 2298
}

2299
void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
H
Haojun Liao 已提交
2300 2301 2302 2303 2304
  // it has been freed already
  if (pSql->param != trsupport || pSql->param == NULL) {
    return;
  }

H
Haojun Liao 已提交
2305
  SSqlObj *pParentSql = trsupport->pParentSql;
H
hjxilinx 已提交
2306 2307 2308
  int32_t  subqueryIndex = trsupport->subqueryIndex;
  
  assert(pSql != NULL);
H
Haojun Liao 已提交
2309

H
Haojun Liao 已提交
2310
  SSubqueryState* pState = &pParentSql->subState;
H
Haojun Liao 已提交
2311

2312
  // retrieved in subquery failed. OR query cancelled in retrieve phase.
H
Haojun Liao 已提交
2313 2314
  if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) {

H
hjxilinx 已提交
2315 2316
    /*
     * kill current sub-query connection, which may retrieve data from vnodes;
2317
     * Here we get: pPObj->res.code == TSDB_CODE_TSC_QUERY_CANCELLED
H
hjxilinx 已提交
2318 2319 2320
     */
    pSql->res.numOfRows = 0;
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;  // disable retry efforts
2321 2322
    tscDebug("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%s", pParentSql, pSql,
             subqueryIndex, tstrerror(pParentSql->res.code));
H
hjxilinx 已提交
2323
  }
H
Haojun Liao 已提交
2324

H
hjxilinx 已提交
2325
  if (numOfRows >= 0) {  // current query is successful, but other sub query failed, still abort current query.
2326
    tscDebug("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pParentSql, pSql, numOfRows, subqueryIndex);
2327 2328
    tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%s", pParentSql, pSql,
             subqueryIndex, tstrerror(pParentSql->res.code));
H
hjxilinx 已提交
2329
  } else {
H
Haojun Liao 已提交
2330
    if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pParentSql->res.code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2331
      if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
2332 2333 2334
        return;
      }
    } else {  // reach the maximum retry count, abort
H
Haojun Liao 已提交
2335 2336 2337
      atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows);
      tscError("%p sub:%p retrieve failed,code:%s,orderOfSub:%d failed.no more retry,set global code:%s", pParentSql, pSql,
               tstrerror(numOfRows), subqueryIndex, tstrerror(pParentSql->res.code));
H
hjxilinx 已提交
2338 2339
    }
  }
H
Haojun Liao 已提交
2340

D
fix bug  
dapan1121 已提交
2341
  if (!subAndCheckDone(pSql, pParentSql, subqueryIndex)) {
2342
    tscDebug("%p sub:%p,%d freed, not finished, total:%d", pParentSql, pSql, trsupport->subqueryIndex, pState->numOfSub);
H
Haojun Liao 已提交
2343

H
Haojun Liao 已提交
2344
    tscFreeRetrieveSup(pSql);
S
Shengliang Guan 已提交
2345
    return;
2346
  }  
H
hjxilinx 已提交
2347 2348
  
  // all subqueries are failed
H
Haojun Liao 已提交
2349
  tscError("%p retrieve from %d vnode(s) completed,code:%s.FAILED.", pParentSql, pState->numOfSub,
H
Haojun Liao 已提交
2350 2351
      tstrerror(pParentSql->res.code));

H
hjxilinx 已提交
2352
  // release allocated resource
H
Haojun Liao 已提交
2353
  tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel, trsupport->pFFColModel,
H
Haojun Liao 已提交
2354
                            pState->numOfSub);
H
hjxilinx 已提交
2355
  
H
Haojun Liao 已提交
2356
  tscFreeRetrieveSup(pSql);
2357

H
hjxilinx 已提交
2358
  // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
2359 2360 2361 2362 2363 2364
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0);

  if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
    (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code);
  } else {  // regular super table query
    if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2365
      tscAsyncResultOnError(pParentSql);
2366 2367
    }
  }
H
hjxilinx 已提交
2368 2369
}

2370 2371
static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* pSql) {
  int32_t           idx = trsupport->subqueryIndex;
H
Haojun Liao 已提交
2372
  SSqlObj *         pParentSql = trsupport->pParentSql;
2373 2374
  tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
  
H
Haojun Liao 已提交
2375
  SSubqueryState* pState = &pParentSql->subState;
2376 2377
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  
2378 2379
  STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
  
2380
  // data in from current vnode is stored in cache and disk
S
Shengliang Guan 已提交
2381
  uint32_t numOfRowsFromSubquery = (uint32_t)(trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num);
H
Haojun Liao 已提交
2382 2383 2384
  SVgroupsInfo* vgroupsInfo = pTableMetaInfo->vgroupList;
  tscDebug("%p sub:%p all data retrieved from ep:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pParentSql, pSql,
           vgroupsInfo->vgroups[0].epAddr[0].fqdn, vgroupsInfo->vgroups[0].vgId, numOfRowsFromSubquery, idx);
2385 2386 2387 2388
  
  tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity);

#ifdef _DEBUG_VIEW
2389
  printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->num);
2390 2391
    SSrcColumnInfo colInfo[256] = {0};
    tscGetSrcColumnInfo(colInfo, pQueryInfo);
2392 2393
    tColModelDisplayEx(pDesc->pColumnModel, trsupport->localBuffer->data, trsupport->localBuffer->num,
                       trsupport->localBuffer->num, colInfo);
2394 2395
#endif
  
H
Haojun Liao 已提交
2396 2397 2398
  if (tsTotalTmpDirGB != 0 && tsAvailTmpDirectorySpace < tsReservedTmpDirectorySpace) {
    tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pParentSql, pSql,
             tsAvailTmpDirectorySpace, tsReservedTmpDirectorySpace);
S
Shengliang Guan 已提交
2399 2400
    tscAbortFurtherRetryRetrieval(trsupport, pSql, TSDB_CODE_TSC_NO_DISKSPACE);
    return;
2401 2402 2403
  }
  
  // each result for a vnode is ordered as an independant list,
H
Haojun Liao 已提交
2404
  // then used as an input of loser tree for disk-based merge
2405 2406
  int32_t code = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pQueryInfo->groupbyExpr.orderType);
  if (code != 0) { // set no disk space error info, and abort retry
S
Shengliang Guan 已提交
2407 2408
    tscAbortFurtherRetryRetrieval(trsupport, pSql, code);
    return;
2409 2410
  }
  
D
fix bug  
dapan1121 已提交
2411
  if (!subAndCheckDone(pSql, pParentSql, idx)) {
2412
    tscDebug("%p sub:%p orderOfSub:%d freed, not finished", pParentSql, pSql, trsupport->subqueryIndex);
H
Haojun Liao 已提交
2413

H
Haojun Liao 已提交
2414
    tscFreeRetrieveSup(pSql);
S
Shengliang Guan 已提交
2415
    return;
2416
  }  
2417 2418 2419 2420
  
  // all sub-queries are returned, start to local merge process
  pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage;
  
H
Haojun Liao 已提交
2421
  tscDebug("%p retrieve from %d vnodes completed.final NumOfRows:%" PRId64 ",start to build loser tree", pParentSql,
H
Haojun Liao 已提交
2422
           pState->numOfSub, pState->numOfRetrievedRows);
2423
  
H
Haojun Liao 已提交
2424
  SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0);
2425 2426
  tscClearInterpInfo(pPQueryInfo);
  
H
Haojun Liao 已提交
2427
  tscCreateLocalMerger(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, trsupport->pFinalColModel, trsupport->pFFColModel, pParentSql);
H
Haojun Liao 已提交
2428
  tscDebug("%p build loser tree completed", pParentSql);
2429
  
H
Haojun Liao 已提交
2430 2431 2432
  pParentSql->res.precision = pSql->res.precision;
  pParentSql->res.numOfRows = 0;
  pParentSql->res.row = 0;
2433
  
H
Haojun Liao 已提交
2434
  tscFreeRetrieveSup(pSql);
H
Haojun Liao 已提交
2435

2436 2437 2438 2439 2440
  // set the command flag must be after the semaphore been correctly set.
  pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE;
  if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
    (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
  } else {
H
Haojun Liao 已提交
2441
    tscAsyncResultOnError(pParentSql);
2442
  }
2443 2444 2445
}

static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
H
Haojun Liao 已提交
2446 2447 2448 2449
  SSqlObj *pSql = (SSqlObj *)tres;
  assert(pSql != NULL);

  // this query has been freed already
H
hjxilinx 已提交
2450
  SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
H
Haojun Liao 已提交
2451 2452 2453 2454 2455 2456
  if (pSql->param == NULL || param == NULL) {
    tscDebug("%p already freed in dnodecallback", pSql);
    assert(pSql->res.code == TSDB_CODE_TSC_QUERY_CANCELLED);
    return;
  }

H
hjxilinx 已提交
2457
  tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
H
Haojun Liao 已提交
2458
  int32_t           idx   = trsupport->subqueryIndex;
H
Haojun Liao 已提交
2459
  SSqlObj *         pParentSql = trsupport->pParentSql;
H
Haojun Liao 已提交
2460

H
Haojun Liao 已提交
2461
  SSubqueryState* pState = &pParentSql->subState;
H
hjxilinx 已提交
2462
  
H
Haojun Liao 已提交
2463
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
TD-1732  
Shengliang Guan 已提交
2464
  SVgroupInfo  *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
H
Haojun Liao 已提交
2465 2466 2467

  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
2468
    tscDebug("%p query cancelled/failed, sub:%p, vgId:%d, orderOfSub:%d, code:%s, global code:%s",
H
Haojun Liao 已提交
2469 2470 2471 2472 2473 2474 2475 2476 2477
             pParentSql, pSql, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(numOfRows), tstrerror(pParentSql->res.code));

    tscHandleSubqueryError(param, tres, numOfRows);
    return;
  }

  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(numOfRows == taos_errno(pSql));

H
Haojun Liao 已提交
2478 2479 2480 2481
    if (numOfRows == TSDB_CODE_TSC_QUERY_CANCELLED) {
      trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
    }

H
Haojun Liao 已提交
2482
    if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
2483
      tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(numOfRows), trsupport->numOfRetry);
H
Haojun Liao 已提交
2484 2485 2486 2487 2488

      if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) {
        return;
      }
    } else {
H
Haojun Liao 已提交
2489
      tscDebug("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(numOfRows));
H
Haojun Liao 已提交
2490 2491 2492 2493 2494
      atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows);  // set global code and abort
    }

    tscHandleSubqueryError(param, tres, numOfRows);
    return;
H
hjxilinx 已提交
2495 2496 2497 2498 2499 2500 2501 2502 2503
  }
  
  SSqlRes *   pRes = &pSql->res;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  
  if (numOfRows > 0) {
    assert(pRes->numOfRows == numOfRows);
    int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows);
    
H
Haojun Liao 已提交
2504
    tscDebug("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%" PRIu64 " from ep:%s, orderOfSub:%d", pParentSql, pSql,
2505
             pRes->numOfRows, pState->numOfRetrievedRows, pSql->epSet.fqdn[pSql->epSet.inUse], idx);
2506

H
hjxilinx 已提交
2507
    if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
B
Bomin Zhang 已提交
2508
      tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64,
H
Haojun Liao 已提交
2509
               pParentSql, pSql, tsMaxNumOfOrderedResults, num);
S
Shengliang Guan 已提交
2510 2511
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_SORTED_RES_TOO_MANY);
      return;
H
hjxilinx 已提交
2512 2513 2514
    }

#ifdef _DEBUG_VIEW
2515
    printf("received data from vnode: %"PRIu64" rows\n", pRes->numOfRows);
H
hjxilinx 已提交
2516 2517 2518 2519 2520
    SSrcColumnInfo colInfo[256] = {0};

    tscGetSrcColumnInfo(colInfo, pQueryInfo);
    tColModelDisplayEx(pDesc->pColumnModel, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo);
#endif
2521
    
H
Haojun Liao 已提交
2522 2523 2524 2525
    // no disk space for tmp directory
    if (tsTotalTmpDirGB != 0 && tsAvailTmpDirectorySpace < tsReservedTmpDirectorySpace) {
      tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pParentSql, pSql,
               tsAvailTmpDirectorySpace, tsReservedTmpDirectorySpace);
S
Shengliang Guan 已提交
2526 2527
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_NO_DISKSPACE);
      return;
H
hjxilinx 已提交
2528 2529 2530
    }
    
    int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data,
H
Haojun Liao 已提交
2531
                               pRes->numOfRows, pQueryInfo->groupbyExpr.orderType);
2532
    if (ret != 0) { // set no disk space error info, and abort retry
2533
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_NO_DISKSPACE);
2534 2535 2536 2537
    } else if (pRes->completed) {
      tscAllDataRetrievedFromDnode(trsupport, pSql);
    } else { // continue fetch data from dnode
      taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
H
hjxilinx 已提交
2538
    }
2539
    
2540 2541
  } else { // all data has been retrieved to client
    tscAllDataRetrievedFromDnode(trsupport, pSql);
H
hjxilinx 已提交
2542 2543 2544
  }
}

2545
static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
H
hjxilinx 已提交
2546 2547
  const int32_t table_index = 0;
  
2548
  SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, TSDB_SQL_SELECT, prevSqlObj);
H
hjxilinx 已提交
2549 2550
  if (pNew != NULL) {  // the sub query of two-stage super table query
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
2551

H
hjxilinx 已提交
2552
    pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
H
Haojun Liao 已提交
2553
    assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1 && trsupport->subqueryIndex < pSql->subState.numOfSub);
H
hjxilinx 已提交
2554
    
H
hjxilinx 已提交
2555
    // launch subquery for each vnode, so the subquery index equals to the vgroupIndex.
H
hjxilinx 已提交
2556
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index);
H
hjxilinx 已提交
2557
    pTableMetaInfo->vgroupIndex = trsupport->subqueryIndex;
H
hjxilinx 已提交
2558 2559 2560 2561 2562 2563 2564
    
    pSql->pSubs[trsupport->subqueryIndex] = pNew;
  }
  
  return pNew;
}

2565
// todo there is are race condition in this function, while cancel is called by user.
H
hjxilinx 已提交
2566
void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
2567 2568 2569 2570 2571 2572 2573
  // the param may be null, since it may be done by other query threads. and the asyncOnError may enter in this
  // function while kill query by a user.
  if (param == NULL) {
    assert(code != TSDB_CODE_SUCCESS);
    return;
  }

2574
  SRetrieveSupport *trsupport = (SRetrieveSupport *) param;
H
hjxilinx 已提交
2575
  
H
Haojun Liao 已提交
2576
  SSqlObj*  pParentSql = trsupport->pParentSql;
2577
  SSqlObj*  pSql = (SSqlObj *) tres;
2578

2579 2580
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  assert(pSql->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1);
H
hjxilinx 已提交
2581
  
2582
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
TD-1732  
Shengliang Guan 已提交
2583
  SVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[trsupport->subqueryIndex];
H
Haojun Liao 已提交
2584

H
Haojun Liao 已提交
2585
  // stable query killed or other subquery failed, all query stopped
H
Haojun Liao 已提交
2586
  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
2587
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
H
Haojun Liao 已提交
2588
    tscError("%p query cancelled or failed, sub:%p, vgId:%d, orderOfSub:%d, code:%s, global code:%s",
H
Haojun Liao 已提交
2589 2590 2591 2592
        pParentSql, pSql, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(code), tstrerror(pParentSql->res.code));

    tscHandleSubqueryError(param, tres, code);
    return;
H
hjxilinx 已提交
2593 2594 2595
  }
  
  /*
H
Haojun Liao 已提交
2596
   * if a subquery on a vnode failed, all retrieve operations from vnode that occurs later
2597
   * than this one are actually not necessary, we simply call the tscRetrieveFromDnodeCallBack
H
hjxilinx 已提交
2598 2599
   * function to abort current and remain retrieve process.
   *
2600
   * NOTE: thread safe is required.
H
hjxilinx 已提交
2601
   */
H
Haojun Liao 已提交
2602 2603
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(code == taos_errno(pSql));
2604

H
Haojun Liao 已提交
2605
    if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
2606
      tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry);
H
Haojun Liao 已提交
2607
      if (tscReissueSubquery(trsupport, pSql, code) == TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
2608 2609
        return;
      }
2610
    } else {
H
Haojun Liao 已提交
2611
      tscError("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(code));
H
Haojun Liao 已提交
2612
      atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code);  // set global code and abort
2613
    }
H
Haojun Liao 已提交
2614 2615 2616 2617 2618

    tscHandleSubqueryError(param, tres, pParentSql->res.code);
    return;
  }

H
Haojun Liao 已提交
2619
  tscDebug("%p sub:%p query complete, ep:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSql, pSql,
2620
             pVgroup->epAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex);
H
Haojun Liao 已提交
2621 2622 2623 2624 2625

  if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode
    tscRetrieveFromDnodeCallBack(param, pSql, 0);
  } else {
    taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
H
hjxilinx 已提交
2626 2627 2628
  }
}

2629 2630
static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) {
  if (pParentObj->retry > pParentObj->maxRetry) {
H
Haojun Liao 已提交
2631
    tscError("%p max retry reached, abort the retry effort", pParentObj);
2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651
    return false;
  }

  for (int32_t i = 0; i < numOfSub; ++i) {
    int32_t code = pParentObj->pSubs[i]->res.code;
    if (code == TSDB_CODE_SUCCESS) {
      continue;
    }

    if (code != TSDB_CODE_TDB_TABLE_RECONFIGURE && code != TSDB_CODE_TDB_INVALID_TABLE_ID &&
        code != TSDB_CODE_VND_INVALID_VGROUP_ID && code != TSDB_CODE_RPC_NETWORK_UNAVAIL &&
        code != TSDB_CODE_APP_NOT_READY) {
      pParentObj->res.code = code;
      return false;
    }
  }

  return true;
}

H
Haojun Liao 已提交
2652 2653 2654 2655 2656 2657 2658 2659 2660
static void doFreeInsertSupporter(SSqlObj* pSqlObj) {
  assert(pSqlObj != NULL && pSqlObj->subState.numOfSub > 0);

  for(int32_t i = 0; i < pSqlObj->subState.numOfSub; ++i) {
    SSqlObj* pSql = pSqlObj->pSubs[i];
    tfree(pSql->param);
  }
}

H
Haojun Liao 已提交
2661
static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) {
H
hjxilinx 已提交
2662 2663
  SInsertSupporter *pSupporter = (SInsertSupporter *)param;
  SSqlObj* pParentObj = pSupporter->pSql;
H
Haojun Liao 已提交
2664

H
Haojun Liao 已提交
2665
  // record the total inserted rows
H
Haojun Liao 已提交
2666
  if (numOfRows > 0) {
D
fix bug  
dapan1121 已提交
2667
    atomic_add_fetch_32(&pParentObj->res.numOfRows, numOfRows);
H
Haojun Liao 已提交
2668 2669
  }

H
Haojun Liao 已提交
2670
  if (taos_errno(tres) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2671 2672 2673 2674
    SSqlObj* pSql = (SSqlObj*) tres;
    assert(pSql != NULL && pSql->res.code == numOfRows);
    
    pParentObj->res.code = pSql->res.code;
H
Haojun Liao 已提交
2675

2676 2677 2678 2679 2680
    // set the flag in the parent sqlObj
    if (pSql->cmd.submitSchema) {
      pParentObj->cmd.submitSchema = 1;
    }
  }
H
Haojun Liao 已提交
2681

D
fix bug  
dapan1121 已提交
2682
  if (!subAndCheckDone(tres, pParentObj, pSupporter->index)) {
2683
    tscDebug("%p insert:%p,%d completed, total:%d", pParentObj, tres, pSupporter->index, pParentObj->subState.numOfSub);
H
hjxilinx 已提交
2684 2685
    return;
  }
H
Haojun Liao 已提交
2686

H
hjxilinx 已提交
2687 2688
  // restore user defined fp
  pParentObj->fp = pParentObj->fetchFp;
2689
  int32_t numOfSub = pParentObj->subState.numOfSub;
2690
  doFreeInsertSupporter(pParentObj);
2691 2692 2693 2694 2695 2696 2697 2698 2699

  if (pParentObj->res.code == TSDB_CODE_SUCCESS) {
    tscDebug("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows);

    // todo remove this parameter in async callback function definition.
    // all data has been sent to vnode, call user function
    int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows;
    (*pParentObj->fp)(pParentObj->param, pParentObj, v);
  } else {
2700
    if (!needRetryInsert(pParentObj, numOfSub)) {
H
Haojun Liao 已提交
2701
      tscAsyncResultOnError(pParentObj);
2702 2703
      return;
    }
2704

2705
    int32_t numOfFailed = 0;
2706 2707 2708 2709 2710 2711
    for(int32_t i = 0; i < numOfSub; ++i) {
      SSqlObj* pSql = pParentObj->pSubs[i];
      if (pSql->res.code != TSDB_CODE_SUCCESS) {
        numOfFailed += 1;

        // clean up tableMeta in cache
2712
        tscFreeQueryInfo(&pSql->cmd);
2713 2714
        SQueryInfo* pQueryInfo = tscGetQueryInfoDetailSafely(&pSql->cmd, 0);
        STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, pSql->cmd.clauseIndex, 0);
2715
        tscAddTableMetaInfo(pQueryInfo, &pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL);
2716

2717 2718
        subquerySetState(pSql, &pParentObj->subState, i, 0);

2719 2720 2721 2722 2723 2724 2725
        tscDebug("%p, failed sub:%d, %p", pParentObj, i, pSql);
      }
    }

    tscError("%p Async insertion completed, total inserted:%d rows, numOfFailed:%d, numOfTotal:%d", pParentObj,
             pParentObj->res.numOfRows, numOfFailed, numOfSub);

2726
    tscDebug("%p cleanup %d tableMeta in hashTable", pParentObj, pParentObj->cmd.numOfTables);
2727
    for(int32_t i = 0; i < pParentObj->cmd.numOfTables; ++i) {
2728 2729
      char name[TSDB_TABLE_FNAME_LEN] = {0};
      tNameExtractFullName(pParentObj->cmd.pTableNameList[i], name);
2730
      taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
2731
    }
2732

2733 2734
    pParentObj->cmd.parseFinished = false;

2735
    tscResetSqlCmd(&pParentObj->cmd, false);
2736

H
Haojun Liao 已提交
2737 2738 2739
    // in case of insert, redo parsing the sql string and build new submit data block for two reasons:
    // 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly.
    // 2. vnode may need the schema information along with submit block to update its local table schema.
S
TD-2475  
Shengliang Guan 已提交
2740 2741 2742
    tscDebug("%p re-parse sql to generate submit data, retry:%d", pParentObj, pParentObj->retry);
    pParentObj->retry++;

2743 2744 2745 2746 2747
    int32_t code = tsParseSql(pParentObj, true);
    if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;

    if (code != TSDB_CODE_SUCCESS) {
      pParentObj->res.code = code;
H
Haojun Liao 已提交
2748
      tscAsyncResultOnError(pParentObj);
2749 2750 2751 2752 2753
      return;
    }

    tscDoQuery(pParentObj);
  }
2754 2755 2756 2757 2758 2759 2760
}

/**
 * it is a subquery, so after parse the sql string, copy the submit block to payload of itself
 * @param pSql
 * @return
 */
2761
int32_t tscHandleInsertRetry(SSqlObj* pParent, SSqlObj* pSql) {
2762 2763 2764 2765
  assert(pSql != NULL && pSql->param != NULL);
  SSqlRes* pRes = &pSql->res;

  SInsertSupporter* pSupporter = (SInsertSupporter*) pSql->param;
H
Haojun Liao 已提交
2766
  assert(pSupporter->index < pSupporter->pSql->subState.numOfSub);
2767

2768
  STableDataBlocks* pTableDataBlock = taosArrayGetP(pParent->cmd.pDataBlocks, pSupporter->index);
H
Haojun Liao 已提交
2769 2770 2771
  int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock);

  if ((pRes->code = code)!= TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2772
    tscAsyncResultOnError(pSql);
H
Haojun Liao 已提交
2773
    return code;  // here the pSql may have been released already.
2774 2775 2776
  }

  return tscProcessSql(pSql);
H
hjxilinx 已提交
2777 2778 2779 2780
}

int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
H
Haojun Liao 已提交
2781 2782
  SSqlRes *pRes = &pSql->res;

2783 2784 2785 2786
  // it is the failure retry insert
  if (pSql->pSubs != NULL) {
    for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
      SSqlObj* pSub = pSql->pSubs[i];
2787 2788 2789
      SInsertSupporter* pSup = calloc(1, sizeof(SInsertSupporter));
      pSup->index = i;
      pSup->pSql = pSql;
2790

2791
      pSub->param = pSup;
2792 2793 2794 2795 2796 2797 2798 2799 2800
      tscDebug("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, i);
      if (pSub->res.code != TSDB_CODE_SUCCESS) {
        tscHandleInsertRetry(pSql, pSub);
      }
    }

    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
2801 2802
  pSql->subState.numOfSub = (uint16_t)taosArrayGetSize(pCmd->pDataBlocks);
  assert(pSql->subState.numOfSub > 0);
H
Haojun Liao 已提交
2803 2804

  pRes->code = TSDB_CODE_SUCCESS;
2805

H
Haojun Liao 已提交
2806 2807 2808
  // the number of already initialized subqueries
  int32_t numOfSub = 0;

2809 2810 2811 2812 2813 2814
  if (pSql->subState.states == NULL) {
    pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states));
    if (pSql->subState.states == NULL) {
      pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      goto _error;
    }
D
fix bug  
dapan1121 已提交
2815 2816

    pthread_mutex_init(&pSql->subState.mutex, NULL);
2817 2818 2819
  }

  memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub);
D
fix bug  
dapan1121 已提交
2820
  tscDebug("%p reset all sub states to 0", pSql);
2821

H
Haojun Liao 已提交
2822
  pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
H
Haojun Liao 已提交
2823 2824 2825 2826
  if (pSql->pSubs == NULL) {
    goto _error;
  }

H
Haojun Liao 已提交
2827
  tscDebug("%p submit data to %d vnode(s)", pSql, pSql->subState.numOfSub);
2828

H
Haojun Liao 已提交
2829
  while(numOfSub < pSql->subState.numOfSub) {
H
Haojun Liao 已提交
2830
    SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter));
H
Haojun Liao 已提交
2831 2832 2833 2834
    if (pSupporter == NULL) {
      goto _error;
    }

2835 2836 2837 2838
    pSupporter->pSql   = pSql;
    pSupporter->index  = numOfSub;

    SSqlObj *pNew = createSimpleSubObj(pSql, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT);
H
hjxilinx 已提交
2839
    if (pNew == NULL) {
H
Haojun Liao 已提交
2840
      tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, numOfSub, strerror(errno));
H
Haojun Liao 已提交
2841
      goto _error;
H
hjxilinx 已提交
2842
    }
2843 2844 2845
  
    /*
     * assign the callback function to fetchFp to make sure that the error process function can restore
H
Haojun Liao 已提交
2846
     * the callback function (multiVnodeInsertFinalize) correctly.
2847 2848
     */
    pNew->fetchFp = pNew->fp;
H
Haojun Liao 已提交
2849
    pSql->pSubs[numOfSub] = pNew;
H
Haojun Liao 已提交
2850

2851 2852
    STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, numOfSub);
    pRes->code = tscCopyDataBlockToPayload(pNew, pTableDataBlock);
H
Haojun Liao 已提交
2853
    if (pRes->code == TSDB_CODE_SUCCESS) {
2854
      tscDebug("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, numOfSub);
2855
      numOfSub++;
H
Haojun Liao 已提交
2856
    } else {
H
Haojun Liao 已提交
2857
      tscDebug("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%s", pSql, numOfSub,
H
Haojun Liao 已提交
2858
               pSql->subState.numOfSub, tstrerror(pRes->code));
H
Haojun Liao 已提交
2859
      goto _error;
H
Haojun Liao 已提交
2860
    }
H
hjxilinx 已提交
2861 2862
  }
  
H
Haojun Liao 已提交
2863
  if (numOfSub < pSql->subState.numOfSub) {
H
hjxilinx 已提交
2864
    tscError("%p failed to prepare subObj structure and launch sub-insertion", pSql);
2865
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
Haojun Liao 已提交
2866
    goto _error;
H
hjxilinx 已提交
2867
  }
H
Haojun Liao 已提交
2868

2869 2870
  pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);

H
Haojun Liao 已提交
2871 2872
  // use the local variable
  for (int32_t j = 0; j < numOfSub; ++j) {
H
hjxilinx 已提交
2873
    SSqlObj *pSub = pSql->pSubs[j];
2874
    tscDebug("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, j);
H
hjxilinx 已提交
2875 2876
    tscProcessSql(pSub);
  }
H
Haojun Liao 已提交
2877

H
hjxilinx 已提交
2878
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2879 2880 2881

  _error:
  return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
2882
}
H
hjxilinx 已提交
2883

H
Haojun Liao 已提交
2884 2885 2886
static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t columnIndex, int16_t* bytes) {
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

H
Haojun Liao 已提交
2887
  SInternalField* pInfo = (SInternalField*) TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, columnIndex);
H
Haojun Liao 已提交
2888 2889 2890
  assert(pInfo->pSqlExpr != NULL);

  *bytes = pInfo->pSqlExpr->resBytes;
H
Haojun Liao 已提交
2891
  char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows + pRes->row * (*bytes);
H
Haojun Liao 已提交
2892 2893 2894 2895 2896 2897 2898 2899 2900 2901

  return pData;
}

static void doBuildResFromSubqueries(SSqlObj* pSql) {
  SSqlRes* pRes = &pSql->res;

  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);

  int32_t numOfRes = INT32_MAX;
H
Haojun Liao 已提交
2902
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
Haojun Liao 已提交
2903 2904
    SSqlObj* pSub = pSql->pSubs[i];
    if (pSub == NULL) {
H
Haojun Liao 已提交
2905 2906 2907
      continue;
    }

S
Shengliang Guan 已提交
2908
    int32_t remain = (int32_t)(pSub->res.numOfRows - pSub->res.row);
H
Haojun Liao 已提交
2909
    numOfRes = (int32_t)(MIN(numOfRes, remain));
H
Haojun Liao 已提交
2910 2911
  }

H
Haojun Liao 已提交
2912 2913
  if (numOfRes == 0) {  // no result any more, free all subquery objects
    freeJoinSubqueryObj(pSql);
H
Haojun Liao 已提交
2914 2915 2916
    return;
  }

H
Haojun Liao 已提交
2917
  int32_t rowSize = tscGetResRowLength(pQueryInfo->exprList);
H
Haojun Liao 已提交
2918

H
Haojun Liao 已提交
2919 2920
  assert(numOfRes * rowSize > 0);
  char* tmp = realloc(pRes->pRsp, numOfRes * rowSize + sizeof(tFilePage));
H
Haojun Liao 已提交
2921 2922 2923 2924 2925 2926 2927
  if (tmp == NULL) {
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return;
  } else {
    pRes->pRsp = tmp;
  }

H
Haojun Liao 已提交
2928 2929
  tFilePage* pFilePage = (tFilePage*) pRes->pRsp;
  pFilePage->num = numOfRes;
H
Haojun Liao 已提交
2930

H
Haojun Liao 已提交
2931
  pRes->data = pFilePage->data;
H
Haojun Liao 已提交
2932
  char* data = pRes->data;
H
Haojun Liao 已提交
2933

H
Haojun Liao 已提交
2934 2935 2936 2937 2938
  int16_t bytes = 0;

  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
  for(int32_t i = 0; i < numOfExprs; ++i) {
    SColumnIndex* pIndex = &pRes->pColumnIndex[i];
H
Haojun Liao 已提交
2939 2940
    SSqlRes*      pRes1 = &pSql->pSubs[pIndex->tableIndex]->res;
    SSqlCmd*      pCmd1 = &pSql->pSubs[pIndex->tableIndex]->cmd;
H
Haojun Liao 已提交
2941 2942 2943 2944 2945

    char* pData = getResultBlockPosition(pCmd1, pRes1, pIndex->columnIndex, &bytes);
    memcpy(data, pData, bytes * numOfRes);

    data += bytes * numOfRes;
H
Haojun Liao 已提交
2946 2947 2948 2949 2950 2951 2952 2953 2954 2955
  }

  for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
    SSqlObj* pSub = pSql->pSubs[i];
    if (pSub == NULL) {
      continue;
    }

    pSub->res.row += numOfRes;
    assert(pSub->res.row <= pSub->res.numOfRows);
H
Haojun Liao 已提交
2956 2957 2958 2959
  }

  pRes->numOfRows = numOfRes;
  pRes->numOfClauseTotal += numOfRes;
H
Haojun Liao 已提交
2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970

  int32_t finalRowSize = 0;
  for(int32_t i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
    TAOS_FIELD* pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
    finalRowSize += pField->bytes;
  }

  doArithmeticCalculate(pQueryInfo, pFilePage, rowSize, finalRowSize);

  pRes->data = pFilePage->data;
  tscSetResRawPtr(pRes, pQueryInfo);
H
Haojun Liao 已提交
2971 2972
}

H
hjxilinx 已提交
2973
void tscBuildResFromSubqueries(SSqlObj *pSql) {
H
Haojun Liao 已提交
2974 2975
  SSqlRes* pRes = &pSql->res;

H
hjxilinx 已提交
2976
  if (pRes->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2977
    tscAsyncResultOnError(pSql);
H
hjxilinx 已提交
2978 2979
    return;
  }
H
Haojun Liao 已提交
2980 2981 2982

  if (pRes->tsrow == NULL) {
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
H
Haojun Liao 已提交
2983
    pRes->numOfCols = (int16_t) tscSqlExprNumOfExprs(pQueryInfo);
H
Haojun Liao 已提交
2984

H
Haojun Liao 已提交
2985 2986 2987 2988
    pRes->tsrow  = calloc(pRes->numOfCols, POINTER_BYTES);
    pRes->urow   = calloc(pRes->numOfCols, POINTER_BYTES);
    pRes->buffer = calloc(pRes->numOfCols, POINTER_BYTES);
    pRes->length = calloc(pRes->numOfCols, sizeof(int32_t));
H
Haojun Liao 已提交
2989

H
Haojun Liao 已提交
2990 2991
    if (pRes->tsrow == NULL || pRes->buffer == NULL || pRes->length == NULL) {
      pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
Haojun Liao 已提交
2992
      tscAsyncResultOnError(pSql);
H
Haojun Liao 已提交
2993 2994 2995
      return;
    }

2996
    tscRestoreFuncForSTableQuery(pQueryInfo);
H
Haojun Liao 已提交
2997 2998
  }

H
Haojun Liao 已提交
2999 3000 3001 3002 3003
  assert (pRes->row >= pRes->numOfRows);
  doBuildResFromSubqueries(pSql);
  if (pRes->code == TSDB_CODE_SUCCESS) {
    (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
  } else {
H
Haojun Liao 已提交
3004
    tscAsyncResultOnError(pSql);
H
hjxilinx 已提交
3005 3006 3007
  }
}

3008
static UNUSED_FUNC void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pField) {
H
hjxilinx 已提交
3009 3010
  SSqlRes *pRes = &pSql->res;
  
H
Haojun Liao 已提交
3011
  if (pRes->tsrow[columnIndex] != NULL && pField->type == TSDB_DATA_TYPE_NCHAR) {
H
hjxilinx 已提交
3012 3013 3014 3015 3016 3017 3018 3019
    // convert unicode to native code in a temporary buffer extra one byte for terminated symbol
    if (pRes->buffer[columnIndex] == NULL) {
      pRes->buffer[columnIndex] = malloc(pField->bytes + TSDB_NCHAR_SIZE);
    }
    
    /* string terminated char for binary data*/
    memset(pRes->buffer[columnIndex], 0, pField->bytes + TSDB_NCHAR_SIZE);
    
3020 3021
    int32_t length = taosUcs4ToMbs(pRes->tsrow[columnIndex], pRes->length[columnIndex], pRes->buffer[columnIndex]);
    if ( length >= 0 ) {
3022
      pRes->tsrow[columnIndex] = (unsigned char*)pRes->buffer[columnIndex];
3023
      pRes->length[columnIndex] = length;
H
hjxilinx 已提交
3024
    } else {
B
Bomin Zhang 已提交
3025
      tscError("%p charset:%s to %s. val:%s convert failed.", pSql, DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)pRes->tsrow[columnIndex]);
H
hjxilinx 已提交
3026
      pRes->tsrow[columnIndex] = NULL;
3027
      pRes->length[columnIndex] = 0;
H
hjxilinx 已提交
3028 3029 3030 3031
    }
  }
}

H
Haojun Liao 已提交
3032
char *getArithmeticInputSrc(void *param, const char *name, int32_t colId) {
3033 3034 3035 3036 3037 3038 3039
  SArithmeticSupport *pSupport = (SArithmeticSupport *) param;

  int32_t index = -1;
  SSqlExpr* pExpr = NULL;
  
  for (int32_t i = 0; i < pSupport->numOfCols; ++i) {
    pExpr = taosArrayGetP(pSupport->exprList, i);
B
Bomin Zhang 已提交
3040
    if (strncmp(name, pExpr->aliasName, sizeof(pExpr->aliasName) - 1) == 0) {
3041 3042 3043 3044 3045 3046 3047 3048 3049
      index = i;
      break;
    }
  }

  assert(index >= 0 && index < pSupport->numOfCols);
  return pSupport->data[index] + pSupport->offset * pExpr->resBytes;
}

3050
TAOS_ROW doSetResultRowData(SSqlObj *pSql) {
H
hjxilinx 已提交
3051 3052
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
H
Haojun Liao 已提交
3053

H
hjxilinx 已提交
3054 3055
  assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows);
  if (pRes->row >= pRes->numOfRows) {  // all the results has returned to invoker
S
TD-1848  
Shengliang Guan 已提交
3056
    tfree(pRes->tsrow);
H
hjxilinx 已提交
3057 3058
    return pRes->tsrow;
  }
H
Haojun Liao 已提交
3059

H
hjxilinx 已提交
3060
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
Haojun Liao 已提交
3061

H
Haojun Liao 已提交
3062 3063
  size_t size = tscNumOfFields(pQueryInfo);
  for (int i = 0; i < size; ++i) {
3064
    SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
H
Haojun Liao 已提交
3065

H
Haojun Liao 已提交
3066
    int32_t type  = pInfo->field.type;
3067
    int32_t bytes = pInfo->field.bytes;
H
Haojun Liao 已提交
3068

3069 3070 3071 3072 3073
    if (type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR) {
      pRes->tsrow[i] = isNull(pRes->urow[i], type) ? NULL : pRes->urow[i];
    } else {
      pRes->tsrow[i] = isNull(pRes->urow[i], type) ? NULL : varDataVal(pRes->urow[i]);
      pRes->length[i] = varDataLen(pRes->urow[i]);
H
hjxilinx 已提交
3074
    }
H
Haojun Liao 已提交
3075

H
Haojun Liao 已提交
3076
    ((char**) pRes->urow)[i] += bytes;
H
hjxilinx 已提交
3077
  }
H
Haojun Liao 已提交
3078

H
hjxilinx 已提交
3079 3080 3081 3082
  pRes->row++;  // index increase one-step
  return pRes->tsrow;
}

H
Haojun Liao 已提交
3083
static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
H
hjxilinx 已提交
3084 3085 3086 3087 3088 3089 3090
  bool     hasData = true;
  SSqlCmd *pCmd = &pSql->cmd;
  
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
    bool allSubqueryExhausted = true;
    
H
Haojun Liao 已提交
3091
    for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116
      if (pSql->pSubs[i] == NULL) {
        continue;
      }
      
      SSqlRes *pRes1 = &pSql->pSubs[i]->res;
      SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd;
      
      SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(pCmd1, pCmd1->clauseIndex);
      assert(pQueryInfo1->numOfTables == 1);
      
      STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo1, 0);
      
      /*
       * if the global limitation is not reached, and current result has not exhausted, or next more vnodes are
       * available, goes on
       */
      if (pTableMetaInfo->vgroupIndex < pTableMetaInfo->vgroupList->numOfVgroups && pRes1->row < pRes1->numOfRows &&
          (!tscHasReachLimitation(pQueryInfo1, pRes1))) {
        allSubqueryExhausted = false;
        break;
      }
    }
    
    hasData = !allSubqueryExhausted;
  } else {  // otherwise, in case inner join, if any subquery exhausted, query completed.
H
Haojun Liao 已提交
3117
    for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
3118 3119 3120 3121 3122 3123 3124 3125
      if (pSql->pSubs[i] == 0) {
        continue;
      }
      
      SSqlRes *   pRes1 = &pSql->pSubs[i]->res;
      SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0);
      
      if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pQueryInfo1, pRes1) &&
H
Haojun Liao 已提交
3126
          tscIsProjectionQuery(pQueryInfo1)) || (pRes1->numOfRows == 0)) {
H
hjxilinx 已提交
3127 3128 3129 3130 3131 3132 3133 3134
        hasData = false;
        break;
      }
    }
  }
  
  return hasData;
}