tscSubquery.c 105.3 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
H
Hui Li 已提交
15
#define _GNU_SOURCE
16

H
Haojun Liao 已提交
17
#include "os.h"
H
hjxilinx 已提交
18

H
Haojun Liao 已提交
19
#include "texpr.h"
H
Haojun Liao 已提交
20
#include "qTsbuf.h"
H
Haojun Liao 已提交
21
#include "tcompare.h"
S
slguan 已提交
22
#include "tscLog.h"
H
Haojun Liao 已提交
23 24
#include "tscSubquery.h"
#include "tschemautil.h"
25
#include "tsclient.h"
26
#include "qUtil.h"
H
hjxilinx 已提交
27 28 29

typedef struct SInsertSupporter {
  SSqlObj*  pSql;
30
  int32_t   index;
H
hjxilinx 已提交
31 32
} SInsertSupporter;

H
hjxilinx 已提交
33
static void freeJoinSubqueryObj(SSqlObj* pSql);
H
Haojun Liao 已提交
34
static bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql);
H
hjxilinx 已提交
35

H
Haojun Liao 已提交
36 37 38 39 40
static int32_t tsCompare(int32_t order, int64_t left, int64_t right) {
  if (left == right) {
    return 0;
  }

41
  if (order == TSDB_ORDER_ASC) {
H
Haojun Liao 已提交
42
    return left < right? -1:1;
H
hjxilinx 已提交
43
  } else {
H
Haojun Liao 已提交
44
    return left > right? -1:1;
H
hjxilinx 已提交
45 46 47
  }
}

48 49 50 51 52 53 54 55 56 57 58
static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) {
  while (tsBufNextPos(pTSBuf)) {
    STSElem el1 = tsBufGetElem(pTSBuf);

    int32_t res = tVariantCompare(el1.tag, tag1);
    if (res != 0) { // it is a record with new tag
      return;
    }
  }
}

59 60 61 62
static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, int8_t state) {
  assert(idx < subState->numOfSub);
  assert(subState->states);

D
fix bug  
dapan1121 已提交
63 64
  pthread_mutex_lock(&subState->mutex);
  
65 66 67 68
  tscDebug("subquery:%p,%d state set to %d", pSql, idx, state);
  
  subState->states[idx] = state;

D
fix bug  
dapan1121 已提交
69
  pthread_mutex_unlock(&subState->mutex);
70 71
}

D
fix bug  
dapan1121 已提交
72
static bool allSubqueryDone(SSqlObj *pParentSql) {
73
  bool done = true;
D
fix bug  
dapan1121 已提交
74
  SSubqueryState *subState = &pParentSql->subState;
75 76 77 78 79

  //lock in caller
  
  for (int i = 0; i < subState->numOfSub; i++) {
    if (0 == subState->states[i]) {
D
dapan1121 已提交
80
      tscDebug("%p subquery:%p,%d is NOT finished, total:%d", pParentSql, pParentSql->pSubs[i], i, subState->numOfSub);
81 82 83
      done = false;
      break;
    } else {
D
dapan1121 已提交
84
      tscDebug("%p subquery:%p,%d is finished, total:%d", pParentSql, pParentSql->pSubs[i], i, subState->numOfSub);
85 86 87 88 89 90
    }
  }

  return done;
}

D
fix bug  
dapan1121 已提交
91 92 93
static bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
  SSubqueryState *subState = &pParentSql->subState;

94 95
  assert(idx < subState->numOfSub);

D
fix bug  
dapan1121 已提交
96
  pthread_mutex_lock(&subState->mutex);
97

D
fix bug  
dapan1121 已提交
98 99 100 101 102 103 104 105 106 107
  bool done = allSubqueryDone(pParentSql);

  if (done) {
    tscDebug("%p subquery:%p,%d all subs already done", pParentSql, pSql, idx);
    
    pthread_mutex_unlock(&subState->mutex);
    
    return false;
  }
  
D
dapan1121 已提交
108
  tscDebug("%p subquery:%p,%d state set to 1", pParentSql, pSql, idx);
109 110 111
  
  subState->states[idx] = 1;

D
fix bug  
dapan1121 已提交
112
  done = allSubqueryDone(pParentSql);
113

D
fix bug  
dapan1121 已提交
114
  pthread_mutex_unlock(&subState->mutex);
115 116 117 118 119 120

  return done;
}



H
Haojun Liao 已提交
121 122
static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJoinSupporter* pSupporter2, STimeWindow * win) {
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
H
hjxilinx 已提交
123

H
Haojun Liao 已提交
124 125 126 127 128
  STSBuf* output1 = tsBufCreate(true, pQueryInfo->order.order);
  STSBuf* output2 = tsBufCreate(true, pQueryInfo->order.order);

  win->skey = INT64_MAX;
  win->ekey = INT64_MIN;
H
hjxilinx 已提交
129 130 131

  SLimitVal* pLimit = &pQueryInfo->limit;
  int32_t    order = pQueryInfo->order.order;
H
Haojun Liao 已提交
132

H
hjxilinx 已提交
133 134
  SQueryInfo* pSubQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[0]->cmd, 0);
  SQueryInfo* pSubQueryInfo2 = tscGetQueryInfoDetail(&pSql->pSubs[1]->cmd, 0);
H
Haojun Liao 已提交
135

H
hjxilinx 已提交
136 137 138
  pSubQueryInfo1->tsBuf = output1;
  pSubQueryInfo2->tsBuf = output2;

H
Haojun Liao 已提交
139 140
  TSKEY st = taosGetTimestampUs();

141 142 143 144 145 146
  // no result generated, return directly
  if (pSupporter1->pTSBuf == NULL || pSupporter2->pTSBuf == NULL) {
    tscDebug("%p at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql);
    return 0;
  }

H
hjxilinx 已提交
147 148 149 150 151 152 153
  tsBufResetPos(pSupporter1->pTSBuf);
  tsBufResetPos(pSupporter2->pTSBuf);

  if (!tsBufNextPos(pSupporter1->pTSBuf)) {
    tsBufFlush(output1);
    tsBufFlush(output2);

154
    tscDebug("%p input1 is empty, 0 for secondary query after ts blocks intersecting", pSql);
H
hjxilinx 已提交
155 156 157 158 159 160 161
    return 0;
  }

  if (!tsBufNextPos(pSupporter2->pTSBuf)) {
    tsBufFlush(output1);
    tsBufFlush(output2);

162
    tscDebug("%p input2 is empty, 0 for secondary query after ts blocks intersecting", pSql);
H
hjxilinx 已提交
163 164 165 166 167 168
    return 0;
  }

  int64_t numOfInput1 = 1;
  int64_t numOfInput2 = 1;

H
Haojun Liao 已提交
169 170
  while(1) {
    STSElem elem = tsBufGetElem(pSupporter1->pTSBuf);
H
hjxilinx 已提交
171

H
Haojun Liao 已提交
172
    // no data in pSupporter1 anymore, jump out of loop
173
    if (!tsBufIsValidElem(&elem)) {
H
Haojun Liao 已提交
174 175
      break;
    }
H
hjxilinx 已提交
176

177 178
    // find the data in supporter2 with the same tag value
    STSElem e2 = tsBufFindElemStartPosByTag(pSupporter2->pTSBuf, elem.tag);
H
hjxilinx 已提交
179

H
Haojun Liao 已提交
180 181 182
    /**
     * there are elements in pSupporter2 with the same tag, continue
     */
183 184 185 186
    tVariant tag1 = {0};
    tVariantAssign(&tag1, elem.tag);

    if (tsBufIsValidElem(&e2)) {
H
Haojun Liao 已提交
187 188 189 190
      while (1) {
        STSElem elem1 = tsBufGetElem(pSupporter1->pTSBuf);
        STSElem elem2 = tsBufGetElem(pSupporter2->pTSBuf);

191 192 193 194 195 196 197 198 199 200
        // data with current are exhausted
        if (!tsBufIsValidElem(&elem1) || tVariantCompare(elem1.tag, &tag1) != 0) {
          break;
        }

        if (!tsBufIsValidElem(&elem2) || tVariantCompare(elem2.tag, &tag1) != 0) { // ignore all records with the same tag
          skipRemainValue(pSupporter1->pTSBuf, &tag1);
          break;
        }

H
Haojun Liao 已提交
201 202
        /*
         * in case of stable query, limit/offset is not applied here. the limit/offset is applied to the
203
         * final results which is acquired after the secondary merge of in the client.
H
Haojun Liao 已提交
204 205 206
         */
        int32_t re = tsCompare(order, elem1.ts, elem2.ts);
        if (re < 0) {
207
          tsBufNextPos(pSupporter1->pTSBuf);
H
Haojun Liao 已提交
208 209
          numOfInput1++;
        } else if (re > 0) {
210
          tsBufNextPos(pSupporter2->pTSBuf);
H
Haojun Liao 已提交
211 212 213 214 215 216 217 218 219 220 221
          numOfInput2++;
        } else {
          if (pLimit->offset == 0 || pQueryInfo->interval.interval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
            if (win->skey > elem1.ts) {
              win->skey = elem1.ts;
            }

            if (win->ekey < elem1.ts) {
              win->ekey = elem1.ts;
            }

H
Haojun Liao 已提交
222 223
            tsBufAppend(output1, elem1.id, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
            tsBufAppend(output2, elem2.id, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
H
Haojun Liao 已提交
224
          } else {
225
            pLimit->offset -= 1;//offset apply to projection?
H
Haojun Liao 已提交
226 227
          }

228
          tsBufNextPos(pSupporter1->pTSBuf);
H
Haojun Liao 已提交
229 230
          numOfInput1++;

231
          tsBufNextPos(pSupporter2->pTSBuf);
H
Haojun Liao 已提交
232 233
          numOfInput2++;
        }
H
hjxilinx 已提交
234
      }
H
Haojun Liao 已提交
235
    } else {  // no data in pSupporter2, ignore current data in pSupporter2
236
      skipRemainValue(pSupporter1->pTSBuf, &tag1);
H
hjxilinx 已提交
237 238 239 240 241 242 243 244 245
    }
  }

  /*
   * failed to set the correct ts order yet in two cases:
   * 1. only one element
   * 2. only one element for each tag.
   */
  if (output1->tsOrder == -1) {
246 247
    output1->tsOrder = TSDB_ORDER_ASC;
    output2->tsOrder = TSDB_ORDER_ASC;
H
hjxilinx 已提交
248 249 250 251 252
  }

  tsBufFlush(output1);
  tsBufFlush(output2);

H
Haojun Liao 已提交
253
  tsBufDestroy(pSupporter1->pTSBuf);
D
fix bug  
dapan1121 已提交
254
  pSupporter1->pTSBuf = NULL;
H
Haojun Liao 已提交
255
  tsBufDestroy(pSupporter2->pTSBuf);
D
fix bug  
dapan1121 已提交
256 257
  pSupporter2->pTSBuf = NULL;
    
H
Haojun Liao 已提交
258
  TSKEY et = taosGetTimestampUs();
H
Haojun Liao 已提交
259
  tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks "
H
Haojun Liao 已提交
260
           "intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us",
H
Haojun Liao 已提交
261 262
           pSql, numOfInput1, numOfInput2, output1->numOfTotal, output1->numOfGroups, win->skey, win->ekey,
           tsBufGetNumOfGroup(output1), et - st);
H
hjxilinx 已提交
263 264 265 266 267

  return output1->numOfTotal;
}

// todo handle failed to create sub query
H
Haojun Liao 已提交
268
SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, int32_t index) {
269
  SJoinSupporter* pSupporter = calloc(1, sizeof(SJoinSupporter));
H
hjxilinx 已提交
270 271 272 273 274 275 276 277 278
  if (pSupporter == NULL) {
    return NULL;
  }

  pSupporter->pObj = pSql;

  pSupporter->subqueryIndex = index;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
  
279
  memcpy(&pSupporter->interval, &pQueryInfo->interval, sizeof(pSupporter->interval));
H
hjxilinx 已提交
280 281 282
  pSupporter->limit = pQueryInfo->limit;

  STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, index);
283
  pSupporter->uid = pTableMetaInfo->pTableMeta->id.uid;
H
hjxilinx 已提交
284 285
  assert (pSupporter->uid != 0);

S
slguan 已提交
286
  taosGetTmpfilePath("join-", pSupporter->path);
H
hjxilinx 已提交
287

D
fix bug  
dapan1121 已提交
288 289
  // do NOT create file here to reduce crash generated file left issue
  pSupporter->f = NULL;
H
hjxilinx 已提交
290 291 292 293

  return pSupporter;
}

H
Haojun Liao 已提交
294
static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
H
hjxilinx 已提交
295 296 297 298
  if (pSupporter == NULL) {
    return;
  }

H
hjxilinx 已提交
299 300 301 302 303 304 305
  if (pSupporter->exprList != NULL) {
    tscSqlExprInfoDestroy(pSupporter->exprList);
  }
  
  if (pSupporter->colList != NULL) {
    tscColumnListDestroy(pSupporter->colList);
  }
H
hjxilinx 已提交
306

H
hjxilinx 已提交
307
  tscFieldInfoClear(&pSupporter->fieldsInfo);
H
hjxilinx 已提交
308

D
fix bug  
dapan1121 已提交
309 310 311 312 313 314 315
  if (pSupporter->pTSBuf != NULL) {
    tsBufDestroy(pSupporter->pTSBuf);
    pSupporter->pTSBuf = NULL;
  }

  unlink(pSupporter->path);
  
H
hjxilinx 已提交
316 317
  if (pSupporter->f != NULL) {
    fclose(pSupporter->f);
H
hjxilinx 已提交
318
    pSupporter->f = NULL;
H
hjxilinx 已提交
319 320
  }

D
fix bug  
dapan1121 已提交
321

322 323 324 325 326
  if (pSupporter->pVgroupTables != NULL) {
    taosArrayDestroy(pSupporter->pVgroupTables);
    pSupporter->pVgroupTables = NULL;
  }

S
TD-1848  
Shengliang Guan 已提交
327
  tfree(pSupporter->pIdTagList);
H
hjxilinx 已提交
328 329 330 331 332 333 334 335 336 337
  tscTagCondRelease(&pSupporter->tagCond);
  free(pSupporter);
}

/*
 * need the secondary query process
 * In case of count(ts)/count(*)/spread(ts) query, that are only applied to
 * primary timestamp column , the secondary query is not necessary
 *
 */
H
Haojun Liao 已提交
338
static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
339 340 341
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  
  for (int32_t i = 0; i < numOfCols; ++i) {
342 343
    SColumn* base = taosArrayGet(pQueryInfo->colList, i);
    if (base->colIndex.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
H
hjxilinx 已提交
344 345 346 347 348 349 350
      return true;
    }
  }

  return false;
}

351 352 353
static void filterVgroupTables(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
  int32_t  num = 0;
  int32_t* list = NULL;
H
Haojun Liao 已提交
354
  tsBufGetGroupIdList(pQueryInfo->tsBuf, &num, &list);
355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379

  // The virtual node, of which all tables are disqualified after the timestamp intersection,
  // is removed to avoid next stage query.
  // TODO: If tables from some vnodes are not qualified for next stage query, discard them.
  for (int32_t k = 0; k < taosArrayGetSize(pVgroupTables);) {
    SVgroupTableInfo* p = taosArrayGet(pVgroupTables, k);

    bool found = false;
    for (int32_t f = 0; f < num; ++f) {
      if (p->vgInfo.vgId == list[f]) {
        found = true;
        break;
      }
    }

    if (!found) {
      tscRemoveVgroupTableGroup(pVgroupTables, k);
    } else {
      k++;
    }
  }

  assert(taosArrayGetSize(pVgroupTables) > 0);
  TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);

S
TD-1848  
Shengliang Guan 已提交
380
  tfree(list);
381 382 383 384 385
}

static SArray* buildVgroupTableByResult(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
  int32_t  num = 0;
  int32_t* list = NULL;
H
Haojun Liao 已提交
386
  tsBufGetGroupIdList(pQueryInfo->tsBuf, &num, &list);
387

S
Shengliang Guan 已提交
388
  size_t numOfGroups = taosArrayGetSize(pVgroupTables);
389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406

  SArray* pNew = taosArrayInit(num, sizeof(SVgroupTableInfo));

  SVgroupTableInfo info;
  for (int32_t i = 0; i < num; ++i) {
    int32_t vnodeId = list[i];

    for (int32_t j = 0; j < numOfGroups; ++j) {
      SVgroupTableInfo* p1 = taosArrayGet(pVgroupTables, j);
      if (p1->vgInfo.vgId == vnodeId) {
        tscVgroupTableCopy(&info, p1);
        break;
      }
    }

    taosArrayPush(pNew, &info);
  }

S
TD-1848  
Shengliang Guan 已提交
407
  tfree(list);
408 409 410 411 412
  TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);

  return pNew;
}

H
hjxilinx 已提交
413 414 415
/*
 * launch secondary stage query to fetch the result that contains timestamp in set
 */
H
Haojun Liao 已提交
416
static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
H
hjxilinx 已提交
417
  int32_t         numOfSub = 0;
418
  SJoinSupporter* pSupporter = NULL;
H
hjxilinx 已提交
419
  
H
Haojun Liao 已提交
420
  //If the columns are not involved in the final select clause, the corresponding query will not be issued.
H
Haojun Liao 已提交
421
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
422
    pSupporter = pSql->pSubs[i]->param;
H
hjxilinx 已提交
423
    if (taosArrayGetSize(pSupporter->exprList) > 0) {
H
hjxilinx 已提交
424 425 426 427 428 429 430
      ++numOfSub;
    }
  }
  
  assert(numOfSub > 0);
  
  // scan all subquery, if one sub query has only ts, ignore it
H
Haojun Liao 已提交
431
  tscDebug("%p start to launch secondary subqueries, %d out of %d needs to query", pSql, numOfSub, pSql->subState.numOfSub);
H
hjxilinx 已提交
432 433 434

  bool success = true;
  
H
Haojun Liao 已提交
435
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
436 437 438 439 440
    SSqlObj *pPrevSub = pSql->pSubs[i];
    pSql->pSubs[i] = NULL;
    
    pSupporter = pPrevSub->param;
  
H
hjxilinx 已提交
441
    if (taosArrayGetSize(pSupporter->exprList) == 0) {
442
      tscDebug("%p subIndex: %d, no need to launch query, ignore it", pSql, i);
H
hjxilinx 已提交
443 444
    
      tscDestroyJoinSupporter(pSupporter);
445
      taos_free_result(pPrevSub);
H
hjxilinx 已提交
446 447 448 449 450 451
    
      pSql->pSubs[i] = NULL;
      continue;
    }
  
    SQueryInfo *pSubQueryInfo = tscGetQueryInfoDetail(&pPrevSub->cmd, 0);
H
Haojun Liao 已提交
452
    STSBuf     *pTsBuf = pSubQueryInfo->tsBuf;
H
hjxilinx 已提交
453 454 455
    pSubQueryInfo->tsBuf = NULL;
  
    // free result for async object will also free sqlObj
H
hjxilinx 已提交
456
    assert(tscSqlExprNumOfExprs(pSubQueryInfo) == 1); // ts_comp query only requires one resutl columns
H
hjxilinx 已提交
457 458
    taos_free_result(pPrevSub);
  
459
    SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL);
H
hjxilinx 已提交
460 461 462 463 464
    if (pNew == NULL) {
      tscDestroyJoinSupporter(pSupporter);
      success = false;
      break;
    }
465
    
B
Bomin Zhang 已提交
466

H
hjxilinx 已提交
467 468 469 470
    tscClearSubqueryInfo(&pNew->cmd);
    pSql->pSubs[i] = pNew;
  
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
H
Haojun Liao 已提交
471
    pQueryInfo->tsBuf = pTsBuf;  // transfer the ownership of timestamp comp-z data to the new created object
B
Bomin Zhang 已提交
472

H
hjxilinx 已提交
473
    // set the second stage sub query for join process
H
hjxilinx 已提交
474
    TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE);
475
    memcpy(&pQueryInfo->interval, &pSupporter->interval, sizeof(pQueryInfo->interval));
B
Bomin Zhang 已提交
476

H
hjxilinx 已提交
477
    tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond);
B
Bomin Zhang 已提交
478

H
hjxilinx 已提交
479 480 481
    pQueryInfo->colList = pSupporter->colList;
    pQueryInfo->exprList = pSupporter->exprList;
    pQueryInfo->fieldsInfo = pSupporter->fieldsInfo;
H
Haojun Liao 已提交
482
    pQueryInfo->groupbyExpr = pSupporter->groupInfo;
H
Haojun Liao 已提交
483

484
    assert(pNew->subState.numOfSub == 0 && pNew->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1);
H
hjxilinx 已提交
485
  
486
    tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
487
  
488
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
489
    pTableMetaInfo->pVgroupTables = pSupporter->pVgroupTables;
H
Haojun Liao 已提交
490 491 492

    pSupporter->exprList = NULL;
    pSupporter->colList = NULL;
493
    pSupporter->pVgroupTables = NULL;
H
Haojun Liao 已提交
494 495
    memset(&pSupporter->fieldsInfo, 0, sizeof(SFieldInfo));
    memset(&pSupporter->groupInfo, 0, sizeof(SSqlGroupbyExpr));
H
Haojun Liao 已提交
496

H
hjxilinx 已提交
497 498 499 500 501
    /*
     * When handling the projection query, the offset value will be modified for table-table join, which is changed
     * during the timestamp intersection.
     */
    pSupporter->limit = pQueryInfo->limit;
502
    pQueryInfo->limit = pSupporter->limit;
H
Haojun Liao 已提交
503 504 505 506 507

    SColumnIndex index = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
    SSchema* s = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, 0);

    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, 0);
H
Haojun Liao 已提交
508 509
    int16_t funcId = pExpr->functionId;

H
Haojun Liao 已提交
510
    if ((pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) ||
H
Haojun Liao 已提交
511 512 513 514
        (funcId != TSDB_FUNC_TS && funcId != TSDB_FUNC_TS_DUMMY && funcId != TSDB_FUNC_PRJ)) {

      int16_t functionId = tscIsProjectionQuery(pQueryInfo)? TSDB_FUNC_PRJ : TSDB_FUNC_TS;

515
      tscAddFuncInSelectClause(pQueryInfo, 0, functionId, &index, s, TSDB_COL_NORMAL);
H
Haojun Liao 已提交
516
      tscPrintSelectClause(pNew, 0);
517
      tscFieldInfoUpdateOffset(pQueryInfo);
H
Haojun Liao 已提交
518 519 520 521

      pExpr = tscSqlExprGet(pQueryInfo, 0);
    }

522
    // set the join condition tag column info, todo extract method
H
Haojun Liao 已提交
523 524
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
      assert(pQueryInfo->tagCond.joinInfo.hasJoin);
525
      int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
H
Haojun Liao 已提交
526

527
      // set the tag column id for executor to extract correct tag value
528
      pExpr->param[0] = (tVariant) {.i64 = colId, .nType = TSDB_DATA_TYPE_BIGINT, .nLen = sizeof(int64_t)};
H
Haojun Liao 已提交
529
      pExpr->numOfParams = 1;
H
Haojun Liao 已提交
530 531
    }

532 533 534 535 536 537 538 539
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
      assert(pTableMetaInfo->pVgroupTables != NULL);
      if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
        SArray* p = buildVgroupTableByResult(pQueryInfo, pTableMetaInfo->pVgroupTables);
        tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables);
        pTableMetaInfo->pVgroupTables = p;
      } else {
        filterVgroupTables(pQueryInfo, pTableMetaInfo->pVgroupTables);
H
Haojun Liao 已提交
540 541 542
      }
    }

D
fix bug  
dapan1121 已提交
543 544
    subquerySetState(pPrevSub, &pSql->subState, i, 0);
    
545
    size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
S
TD-1057  
Shengliang Guan 已提交
546
    tscDebug("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s",
547
             pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pQueryInfo->type, taosArrayGetSize(pQueryInfo->exprList),
548
             numOfCols, pQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name));
H
hjxilinx 已提交
549 550 551 552
  }
  
  //prepare the subqueries object failed, abort
  if (!success) {
553
    pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
554
    tscError("%p failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d", pSql,
H
Haojun Liao 已提交
555
        pSql->subState.numOfSub, pSql->res.code);
H
hjxilinx 已提交
556
    freeJoinSubqueryObj(pSql);
H
hjxilinx 已提交
557 558 559 560
    
    return pSql->res.code;
  }
  
H
Haojun Liao 已提交
561
  for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
Haojun Liao 已提交
562
    if (pSql->pSubs[i] == NULL) {
H
hjxilinx 已提交
563 564
      continue;
    }
H
Haojun Liao 已提交
565

H
Haojun Liao 已提交
566
    tscDoQuery(pSql->pSubs[i]);
H
hjxilinx 已提交
567 568 569 570 571
  }

  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
572
void freeJoinSubqueryObj(SSqlObj* pSql) {
H
Haojun Liao 已提交
573
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
574 575 576 577 578
    SSqlObj* pSub = pSql->pSubs[i];
    if (pSub == NULL) {
      continue;
    }
    
579
    SJoinSupporter* p = pSub->param;
H
hjxilinx 已提交
580
    tscDestroyJoinSupporter(p);
H
hjxilinx 已提交
581

582 583
    taos_free_result(pSub);
    pSql->pSubs[i] = NULL;
H
hjxilinx 已提交
584 585
  }

D
fix bug  
dapan1121 已提交
586 587 588 589
  if (pSql->subState.states) {
    pthread_mutex_destroy(&pSql->subState.mutex);
  }
  
590 591
  tfree(pSql->subState.states);
  
D
fix bug  
dapan1121 已提交
592
  
H
Haojun Liao 已提交
593
  pSql->subState.numOfSub = 0;
H
hjxilinx 已提交
594 595
}

D
fix bug  
dapan1121 已提交
596
static int32_t quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
D
fix bug  
dapan1121 已提交
597
  if (subAndCheckDone(pSqlSub, pSqlObj, pSupporter->subqueryIndex)) {
598
    tscError("%p all subquery return and query failed, global code:%s", pSqlObj, tstrerror(pSqlObj->res.code));  
H
hjxilinx 已提交
599
    freeJoinSubqueryObj(pSqlObj);
D
fix bug  
dapan1121 已提交
600
    return 0;
H
hjxilinx 已提交
601
  }
D
fix bug  
dapan1121 已提交
602

D
fix bug  
dapan1121 已提交
603
  return 1;
D
TD-2516  
dapan1121 已提交
604
  //tscDestroyJoinSupporter(pSupporter);
H
hjxilinx 已提交
605 606 607
}

// update the query time range according to the join results on timestamp
H
Haojun Liao 已提交
608 609 610
static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) {
  assert(pQueryInfo->window.skey <= win->skey && pQueryInfo->window.ekey >= win->ekey);
  pQueryInfo->window = *win;
H
Haojun Liao 已提交
611 612


H
hjxilinx 已提交
613 614
}

H
Haojun Liao 已提交
615 616 617
int32_t tidTagsCompar(const void* p1, const void* p2) {
  const STidTags* t1 = (const STidTags*) (p1);
  const STidTags* t2 = (const STidTags*) (p2);
618 619
  
  if (t1->vgId != t2->vgId) {
weixin_48148422's avatar
weixin_48148422 已提交
620 621
    return (t1->vgId > t2->vgId) ? 1 : -1;
  }
622

H
Haojun Liao 已提交
623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641
  tstr* tag1 = (tstr*) t1->tag;
  tstr* tag2 = (tstr*) t2->tag;

  if (tag1->len != tag2->len) {
    return (tag1->len > tag2->len)? 1: -1;
  }

  return strncmp(tag1->data, tag2->data, tag1->len);
}

int32_t tagValCompar(const void* p1, const void* p2) {
  const STidTags* t1 = (const STidTags*) varDataVal(p1);
  const STidTags* t2 = (const STidTags*) varDataVal(p2);

  tstr* tag1 = (tstr*) t1->tag;
  tstr* tag2 = (tstr*) t2->tag;

  if (tag1->len != tag2->len) {
    return (tag1->len > tag2->len)? 1: -1;
642
  }
H
Haojun Liao 已提交
643

H
Haojun Liao 已提交
644
  return memcmp(tag1->data, tag2->data, tag1->len);
645 646
}

H
Haojun Liao 已提交
647
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables) {
H
Haojun Liao 已提交
648 649
  SArray*   result = taosArrayInit(4, sizeof(SVgroupTableInfo));
  SArray*   vgTables = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
650 651
  STidTags* prev = NULL;

H
Haojun Liao 已提交
652 653 654
  size_t numOfTables = taosArrayGetSize(tables);
  for (size_t i = 0; i < numOfTables; i++) {
    STidTags* tt = taosArrayGet(tables, i);
weixin_48148422's avatar
weixin_48148422 已提交
655

H
Haojun Liao 已提交
656
    if (prev == NULL || tt->vgId != prev->vgId) {
657
      SVgroupsInfo* pvg = pTableMetaInfo->vgroupList;
weixin_48148422's avatar
weixin_48148422 已提交
658

H
Haojun Liao 已提交
659 660 661
      SVgroupTableInfo info = {{0}};
      for (int32_t m = 0; m < pvg->numOfVgroups; ++m) {
        if (tt->vgId == pvg->vgroups[m].vgId) {
S
TD-1732  
Shengliang Guan 已提交
662
          tscSVgroupInfoCopy(&info.vgInfo, &pvg->vgroups[m]);
663 664 665
          break;
        }
      }
666
      assert(info.vgInfo.numOfEps != 0);
weixin_48148422's avatar
weixin_48148422 已提交
667

H
Haojun Liao 已提交
668
      vgTables = taosArrayInit(4, sizeof(STableIdInfo));
weixin_48148422's avatar
weixin_48148422 已提交
669
      info.itemList = vgTables;
H
Haojun Liao 已提交
670 671 672

      if (taosArrayGetSize(result) > 0) {
        SVgroupTableInfo* prevGroup = taosArrayGet(result, taosArrayGetSize(result) - 1);
P
plum-lihui 已提交
673
        tscDebug("%p vgId:%d, tables:%"PRIzu, pSql, prevGroup->vgInfo.vgId, taosArrayGetSize(prevGroup->itemList));
H
Haojun Liao 已提交
674 675
      }

H
Haojun Liao 已提交
676
      taosArrayPush(result, &info);
677
    }
weixin_48148422's avatar
weixin_48148422 已提交
678

H
Haojun Liao 已提交
679 680
    STableIdInfo item = {.uid = tt->uid, .tid = tt->tid, .key = INT64_MIN};
    taosArrayPush(vgTables, &item);
H
Haojun Liao 已提交
681

H
Haojun Liao 已提交
682
    tscTrace("%p tid:%d, uid:%"PRIu64",vgId:%d added", pSql, tt->tid, tt->uid, tt->vgId);
weixin_48148422's avatar
weixin_48148422 已提交
683
    prev = tt;
684
  }
weixin_48148422's avatar
weixin_48148422 已提交
685 686

  pTableMetaInfo->pVgroupTables = result;
H
Haojun Liao 已提交
687
  pTableMetaInfo->vgroupIndex = 0;
H
Haojun Liao 已提交
688 689 690

  if (taosArrayGetSize(result) > 0) {
    SVgroupTableInfo* g = taosArrayGet(result, taosArrayGetSize(result) - 1);
P
plum-lihui 已提交
691
    tscDebug("%p vgId:%d, tables:%"PRIzu, pSql, g->vgInfo.vgId, taosArrayGetSize(g->itemList));
H
Haojun Liao 已提交
692
  }
693 694
}

695
static void issueTsCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* pParent) {
696 697 698
  SSqlCmd* pCmd = &pSql->cmd;
  tscClearSubqueryInfo(pCmd);
  tscFreeSqlResult(pSql);
H
Haojun Liao 已提交
699

700
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
H
Haojun Liao 已提交
701 702
  assert(pQueryInfo->numOfTables == 1);

703 704 705 706 707 708 709 710 711 712 713 714
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  tscInitQueryInfo(pQueryInfo);

  TSDB_QUERY_CLEAR_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
  TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
  
  pCmd->command = TSDB_SQL_SELECT;
  pSql->fp = tscJoinQueryCallback;
  
  SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
  
  SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
715
  tscAddFuncInSelectClause(pQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL);
716 717
  
  // set the tags value for ts_comp function
H
Haojun Liao 已提交
718 719
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0);
720
    int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
721
    pExpr->param->i64 = tagColId;
H
Haojun Liao 已提交
722 723 724
    pExpr->numOfParams = 1;
  }

725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740
  // add the filter tag column
  if (pSupporter->colList != NULL) {
    size_t s = taosArrayGetSize(pSupporter->colList);
    
    for (int32_t i = 0; i < s; ++i) {
      SColumn *pCol = taosArrayGetP(pSupporter->colList, i);
      
      if (pCol->numOfFilters > 0) {  // copy to the pNew->cmd.colList if it is filtered.
        SColumn *p = tscColumnClone(pCol);
        taosArrayPush(pQueryInfo->colList, &p);
      }
    }
  }
  
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  
741
  tscDebug(
H
Haojun Liao 已提交
742
      "%p subquery:%p tableIndex:%d, vgroupIndex:%d, numOfVgroups:%d, type:%d, ts_comp query to retrieve timestamps, "
S
TD-1057  
Shengliang Guan 已提交
743
      "numOfExpr:%" PRIzu ", colList:%" PRIzu ", numOfOutputFields:%d, name:%s",
H
Haojun Liao 已提交
744
      pParent, pSql, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pQueryInfo->type,
745
      tscSqlExprNumOfExprs(pQueryInfo), numOfCols, pQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name));
746 747 748 749
  
  tscProcessSql(pSql);
}

H
Haojun Liao 已提交
750
static bool checkForDuplicateTagVal(SSchema* pColSchema, SJoinSupporter* p1, SSqlObj* pPSqlObj) {
H
Haojun Liao 已提交
751 752 753
  for(int32_t i = 1; i < p1->num; ++i) {
    STidTags* prev = (STidTags*) varDataVal(p1->pIdTagList + (i - 1) * p1->tagSize);
    STidTags* p = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize);
754
    assert(prev->vgId >= 1 && p->vgId >= 1);
H
Haojun Liao 已提交
755 756

    if (doCompare(prev->tag, p->tag, pColSchema->type, pColSchema->bytes) == 0) {
H
Haojun Liao 已提交
757 758
      tscError("%p join tags have same value for different table, free all sub SqlObj and quit", pPSqlObj);
      pPSqlObj->res.code = TSDB_CODE_QRY_DUP_JOIN_KEY;
H
Haojun Liao 已提交
759 760 761 762 763 764 765
      return false;
    }
  }

  return true;
}

766
static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray** s1, SArray** s2) {
H
Haojun Liao 已提交
767 768 769
  SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
  SJoinSupporter* p2 = pParentSql->pSubs[1]->param;

H
Haojun Liao 已提交
770 771
  tscDebug("%p all subquery retrieve <tid, tags> complete, do tags match, %d, %d", pParentSql, p1->num, p2->num);

H
Haojun Liao 已提交
772 773 774
  // sort according to the tag value
  qsort(p1->pIdTagList, p1->num, p1->tagSize, tagValCompar);
  qsort(p2->pIdTagList, p2->num, p2->tagSize, tagValCompar);
H
Haojun Liao 已提交
775 776

  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
777
  int16_t tagColId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
H
Haojun Liao 已提交
778

H
Haojun Liao 已提交
779
  SSchema* pColSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
H
Haojun Liao 已提交
780

H
Haojun Liao 已提交
781
  // int16_t for padding
H
Haojun Liao 已提交
782 783 784
  int32_t size = p1->tagSize - sizeof(int16_t);
  *s1 = taosArrayInit(p1->num, size);
  *s2 = taosArrayInit(p2->num, size);
H
Haojun Liao 已提交
785

H
Haojun Liao 已提交
786
  if (!(checkForDuplicateTagVal(pColSchema, p1, pParentSql) && checkForDuplicateTagVal(pColSchema, p2, pParentSql))) {
787
    return TSDB_CODE_QRY_DUP_JOIN_KEY;
H
Haojun Liao 已提交
788 789 790 791 792 793
  }

  int32_t i = 0, j = 0;
  while(i < p1->num && j < p2->num) {
    STidTags* pp1 = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize);
    STidTags* pp2 = (STidTags*) varDataVal(p2->pIdTagList + j * p2->tagSize);
794
    assert(pp1->tid != 0 && pp2->tid != 0);
H
Haojun Liao 已提交
795 796 797

    int32_t ret = doCompare(pp1->tag, pp2->tag, pColSchema->type, pColSchema->bytes);
    if (ret == 0) {
798
      tscDebug("%p tag matched, vgId:%d, val:%d, tid:%d, uid:%"PRIu64", tid:%d, uid:%"PRIu64, pParentSql, pp1->vgId,
H
Haojun Liao 已提交
799 800 801 802 803 804 805 806 807 808 809 810
               *(int*) pp1->tag, pp1->tid, pp1->uid, pp2->tid, pp2->uid);

      taosArrayPush(*s1, pp1);
      taosArrayPush(*s2, pp2);
      j++;
      i++;
    } else if (ret > 0) {
      j++;
    } else {
      i++;
    }
  }
811

H
Haojun Liao 已提交
812 813 814 815 816 817 818 819
  // reorganize the tid-tag value according to both the vgroup id and tag values
  // sort according to the tag value
  size_t t1 = taosArrayGetSize(*s1);
  size_t t2 = taosArrayGetSize(*s2);

  qsort((*s1)->pData, t1, size, tidTagsCompar);
  qsort((*s2)->pData, t2, size, tidTagsCompar);

H
Haojun Liao 已提交
820 821 822 823 824 825 826 827 828 829 830 831
#if 0
  for(int32_t k = 0; k < t1; ++k) {
    STidTags* p =  (*s1)->pData + size * k;
    printf("%d, tag:%s\n", p->vgId, ((tstr*)(p->tag))->data);
  }

  for(int32_t k = 0; k < t1; ++k) {
    STidTags* p =  (*s2)->pData + size * k;
    printf("%d, tag:%s\n", p->vgId, ((tstr*)(p->tag))->data);
  }
#endif

P
plum-lihui 已提交
832
  tscDebug("%p tags match complete, result: %"PRIzu", %"PRIzu, pParentSql, t1, t2);
833
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
834 835
}

H
Haojun Liao 已提交
836
static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
837
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
838

H
hjxilinx 已提交
839
  SSqlObj* pParentSql = pSupporter->pObj;
H
Haojun Liao 已提交
840

H
hjxilinx 已提交
841 842
  SSqlObj* pSql = (SSqlObj*)tres;
  SSqlCmd* pCmd = &pSql->cmd;
H
Haojun Liao 已提交
843 844 845
  SSqlRes* pRes = &pSql->res;

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
Haojun Liao 已提交
846
  assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY));
H
Haojun Liao 已提交
847

D
dapan1121 已提交
848 849
  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, pParentSql->res.code);
D
fix bug  
dapan1121 已提交
850 851 852
    if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
      return;
    }
D
dapan1121 已提交
853 854 855 856 857 858

    tscAsyncResultOnError(pParentSql);

    return;
  }

H
Haojun Liao 已提交
859 860 861
  // check for the error code firstly
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    // todo retry if other subqueries are not failed
H
Haojun Liao 已提交
862

H
Haojun Liao 已提交
863 864
    assert(numOfRows < 0 && numOfRows == taos_errno(pSql));
    tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
H
Haojun Liao 已提交
865

H
Haojun Liao 已提交
866
    pParentSql->res.code = numOfRows;
D
fix bug  
dapan1121 已提交
867 868 869
    if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
      return;
    }
H
Haojun Liao 已提交
870

H
Haojun Liao 已提交
871
    tscAsyncResultOnError(pParentSql);
H
Haojun Liao 已提交
872 873
    return;
  }
H
Haojun Liao 已提交
874

H
Haojun Liao 已提交
875 876
  // keep the results in memory
  if (numOfRows > 0) {
877
    size_t validLen = (size_t)(pSupporter->tagSize * pRes->numOfRows);
H
Haojun Liao 已提交
878
    size_t length = pSupporter->totalLen + validLen;
H
Haojun Liao 已提交
879

H
Haojun Liao 已提交
880 881 882 883
    // todo handle memory error
    char* tmp = realloc(pSupporter->pIdTagList, length);
    if (tmp == NULL) {
      tscError("%p failed to malloc memory", pSql);
H
Haojun Liao 已提交
884

H
Haojun Liao 已提交
885
      pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
D
fix bug  
dapan1121 已提交
886 887 888
      if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
        return;
      }
H
Haojun Liao 已提交
889

H
Haojun Liao 已提交
890
      tscAsyncResultOnError(pParentSql);
H
Haojun Liao 已提交
891 892
      return;
    }
H
Haojun Liao 已提交
893

H
Haojun Liao 已提交
894
    pSupporter->pIdTagList = tmp;
H
Haojun Liao 已提交
895

H
Haojun Liao 已提交
896
    memcpy(pSupporter->pIdTagList + pSupporter->totalLen, pRes->data, validLen);
S
Shengliang Guan 已提交
897 898
    pSupporter->totalLen += (int32_t)validLen;
    pSupporter->num += (int32_t)pRes->numOfRows;
H
Haojun Liao 已提交
899

H
Haojun Liao 已提交
900 901 902 903 904 905
    // query not completed, continue to retrieve tid + tag tuples
    if (!pRes->completed) {
      taos_fetch_rows_a(tres, tidTagRetrieveCallback, param);
      return;
    }
  }
H
Haojun Liao 已提交
906

H
Haojun Liao 已提交
907 908 909 910
  // data in current vnode has all returned to client, try next vnode if exits
  // <tid + tag> tuples have been retrieved to client, try <tid + tag> tuples from the next vnode
  if (hasMoreVnodesToTry(pSql)) {
    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
911

H
Haojun Liao 已提交
912 913 914
    int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
    pTableMetaInfo->vgroupIndex += 1;
    assert(pTableMetaInfo->vgroupIndex < totalVgroups);
H
Haojun Liao 已提交
915

916
    tscDebug("%p tid_tag from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%d",
H
Haojun Liao 已提交
917
             pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, pSupporter->num);
H
Haojun Liao 已提交
918

H
Haojun Liao 已提交
919 920
    pCmd->command = TSDB_SQL_SELECT;
    tscResetForNextRetrieve(&pSql->res);
H
Haojun Liao 已提交
921

H
Haojun Liao 已提交
922 923 924 925 926
    // set the callback function
    pSql->fp = tscJoinQueryCallback;
    tscProcessSql(pSql);
    return;
  }
H
Haojun Liao 已提交
927

H
Haojun Liao 已提交
928 929
  // no data exists in next vnode, mark the <tid, tags> query completed
  // only when there is no subquery exits any more, proceeds to get the intersect of the <tid, tags> tuple sets.
D
fix bug  
dapan1121 已提交
930
  if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
931
    tscDebug("%p tagRetrieve:%p,%d completed, total:%d", pParentSql, tres, pSupporter->subqueryIndex, pParentSql->subState.numOfSub);
H
Haojun Liao 已提交
932
    return;
933
  }  
H
Haojun Liao 已提交
934

H
Haojun Liao 已提交
935
  SArray *s1 = NULL, *s2 = NULL;
936 937 938 939
  int32_t code = getIntersectionOfTableTuple(pQueryInfo, pParentSql, &s1, &s2);
  if (code != TSDB_CODE_SUCCESS) {
    freeJoinSubqueryObj(pParentSql);
    pParentSql->res.code = code;
H
Haojun Liao 已提交
940
    tscAsyncResultOnError(pParentSql);
941 942 943

    taosArrayDestroy(s1);
    taosArrayDestroy(s2);
944 945 946
    return;
  }

H
Haojun Liao 已提交
947
  if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) {  // no results,return.
H
Haojun Liao 已提交
948 949
    assert(pParentSql->fp != tscJoinQueryCallback);

950
    tscDebug("%p tag intersect does not generated qualified tables for join, free all sub SqlObj and quit", pParentSql);
H
Haojun Liao 已提交
951
    freeJoinSubqueryObj(pParentSql);
H
Haojun Liao 已提交
952

953 954
    // set no result command
    pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
H
Haojun Liao 已提交
955 956
    assert(pParentSql->fp != tscJoinQueryCallback);

H
Haojun Liao 已提交
957
    (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
B
Bomin Zhang 已提交
958 959 960 961
  } else {
    // proceed to for ts_comp query
    SSqlCmd* pSubCmd1 = &pParentSql->pSubs[0]->cmd;
    SSqlCmd* pSubCmd2 = &pParentSql->pSubs[1]->cmd;
H
Haojun Liao 已提交
962

B
Bomin Zhang 已提交
963 964 965
    SQueryInfo*     pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0);
    STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo1, 0);
    tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo1, s1);
H
Haojun Liao 已提交
966

B
Bomin Zhang 已提交
967 968 969
    SQueryInfo*     pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0);
    STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0);
    tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2);
H
Haojun Liao 已提交
970

H
Haojun Liao 已提交
971
    SSqlObj* psub1 = pParentSql->pSubs[0];
H
Haojun Liao 已提交
972
    ((SJoinSupporter*)psub1->param)->pVgroupTables =  tscVgroupTableInfoDup(pTableMetaInfo1->pVgroupTables);
H
Haojun Liao 已提交
973 974

    SSqlObj* psub2 = pParentSql->pSubs[1];
H
Haojun Liao 已提交
975
    ((SJoinSupporter*)psub2->param)->pVgroupTables =  tscVgroupTableInfoDup(pTableMetaInfo2->pVgroupTables);
H
Haojun Liao 已提交
976

H
Haojun Liao 已提交
977
    pParentSql->subState.numOfSub = 2;
D
fix bug  
dapan1121 已提交
978
    
979
    memset(pParentSql->subState.states, 0, sizeof(pParentSql->subState.states[0]) * pParentSql->subState.numOfSub);
D
fix bug  
dapan1121 已提交
980 981
    tscDebug("%p reset all sub states to 0", pParentSql);
    
H
Haojun Liao 已提交
982
    for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) {
B
Bomin Zhang 已提交
983
      SSqlObj* sub = pParentSql->pSubs[m];
984
      issueTsCompQuery(sub, sub->param, pParentSql);
B
Bomin Zhang 已提交
985
    }
H
Haojun Liao 已提交
986
  }
B
Bomin Zhang 已提交
987 988 989

  taosArrayDestroy(s1);
  taosArrayDestroy(s2);
H
Haojun Liao 已提交
990
}
H
Haojun Liao 已提交
991

H
Haojun Liao 已提交
992 993
static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
994

H
Haojun Liao 已提交
995 996 997 998 999 1000 1001 1002 1003
  SSqlObj* pParentSql = pSupporter->pObj;

  SSqlObj* pSql = (SSqlObj*)tres;
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  assert(!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE));

D
dapan1121 已提交
1004 1005
  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, pParentSql->res.code);
D
fix bug  
dapan1121 已提交
1006 1007 1008
    if (quitAllSubquery(pSql, pParentSql, pSupporter)){
      return;
    }
D
dapan1121 已提交
1009 1010 1011 1012 1013 1014

    tscAsyncResultOnError(pParentSql);

    return;
  }

H
Haojun Liao 已提交
1015 1016 1017 1018 1019 1020 1021
  // check for the error code firstly
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    // todo retry if other subqueries are not failed yet 
    assert(numOfRows < 0 && numOfRows == taos_errno(pSql));
    tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);

    pParentSql->res.code = numOfRows;
D
fix bug  
dapan1121 已提交
1022 1023 1024
    if (quitAllSubquery(pSql, pParentSql, pSupporter)){
      return;
    }
H
Haojun Liao 已提交
1025

H
Haojun Liao 已提交
1026
    tscAsyncResultOnError(pParentSql);
1027 1028
    return;
  }
H
Haojun Liao 已提交
1029

H
Haojun Liao 已提交
1030
  if (numOfRows > 0) {  // write the compressed timestamp to disk file
D
fix bug  
dapan1121 已提交
1031
    if(pSupporter->f == NULL) {
S
TD-1207  
Shengliang Guan 已提交
1032
      pSupporter->f = fopen(pSupporter->path, "wb");
D
fix bug  
dapan1121 已提交
1033 1034 1035 1036 1037 1038

      if (pSupporter->f == NULL) {
        tscError("%p failed to create tmp file:%s, reason:%s", pSql, pSupporter->path, strerror(errno));
        
        pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);

D
fix bug  
dapan1121 已提交
1039 1040 1041
        if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
          return;
        }
D
fix bug  
dapan1121 已提交
1042 1043 1044 1045 1046 1047 1048
        
        tscAsyncResultOnError(pParentSql);

        return;
      }
    }
      
1049
    fwrite(pRes->data, (size_t)pRes->numOfRows, 1, pSupporter->f);
H
Haojun Liao 已提交
1050 1051 1052 1053 1054 1055 1056 1057
    fclose(pSupporter->f);
    pSupporter->f = NULL;

    STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true);
    if (pBuf == NULL) {  // in error process, close the fd
      tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows);

      pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
D
fix bug  
dapan1121 已提交
1058

D
fix bug  
dapan1121 已提交
1059 1060 1061
      if (quitAllSubquery(pSql, pParentSql, pSupporter)){
        return;
      }
D
fix bug  
dapan1121 已提交
1062
      
H
Haojun Liao 已提交
1063
      tscAsyncResultOnError(pParentSql);
H
Haojun Liao 已提交
1064

H
hjxilinx 已提交
1065 1066
      return;
    }
1067

H
Haojun Liao 已提交
1068
    if (pSupporter->pTSBuf == NULL) {
1069
      tscDebug("%p create tmp file for ts block:%s, size:%d bytes", pSql, pBuf->path, numOfRows);
H
Haojun Liao 已提交
1070 1071 1072
      pSupporter->pTSBuf = pBuf;
    } else {
      assert(pQueryInfo->numOfTables == 1);  // for subquery, only one
H
Haojun Liao 已提交
1073
      tsBufMerge(pSupporter->pTSBuf, pBuf);
H
Haojun Liao 已提交
1074
      tsBufDestroy(pBuf);
H
Haojun Liao 已提交
1075
    }
H
hjxilinx 已提交
1076

H
Haojun Liao 已提交
1077 1078
    // continue to retrieve ts-comp data from vnode
    if (!pRes->completed) {
S
slguan 已提交
1079
      taosGetTmpfilePath("ts-join", pSupporter->path);
S
TD-1207  
Shengliang Guan 已提交
1080
      pSupporter->f = fopen(pSupporter->path, "wb");
H
Haojun Liao 已提交
1081
      pRes->row = pRes->numOfRows;
H
hjxilinx 已提交
1082

H
Haojun Liao 已提交
1083 1084
      taos_fetch_rows_a(tres, tsCompRetrieveCallback, param);
      return;
H
hjxilinx 已提交
1085
    }
H
Haojun Liao 已提交
1086
  }
H
Haojun Liao 已提交
1087

H
Haojun Liao 已提交
1088 1089
  if (hasMoreVnodesToTry(pSql)) {
    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
1090

H
Haojun Liao 已提交
1091 1092 1093
    int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
    pTableMetaInfo->vgroupIndex += 1;
    assert(pTableMetaInfo->vgroupIndex < totalVgroups);
H
Haojun Liao 已提交
1094

1095
    tscDebug("%p results from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%" PRId64,
H
Haojun Liao 已提交
1096 1097
             pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups,
             pRes->numOfClauseTotal);
H
Haojun Liao 已提交
1098

H
Haojun Liao 已提交
1099 1100
    pCmd->command = TSDB_SQL_SELECT;
    tscResetForNextRetrieve(&pSql->res);
H
Haojun Liao 已提交
1101

H
Haojun Liao 已提交
1102
    assert(pSupporter->f == NULL);
S
slguan 已提交
1103
    taosGetTmpfilePath("ts-join", pSupporter->path);
H
Haojun Liao 已提交
1104 1105
    
    // TODO check for failure
S
TD-1207  
Shengliang Guan 已提交
1106
    pSupporter->f = fopen(pSupporter->path, "wb");
H
Haojun Liao 已提交
1107
    pRes->row = pRes->numOfRows;
H
Haojun Liao 已提交
1108

H
Haojun Liao 已提交
1109 1110 1111 1112 1113
    // set the callback function
    pSql->fp = tscJoinQueryCallback;
    tscProcessSql(pSql);
    return;
  }
H
Haojun Liao 已提交
1114

D
fix bug  
dapan1121 已提交
1115
  if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
H
Haojun Liao 已提交
1116
    return;
1117
  }  
H
hjxilinx 已提交
1118

1119
  tscDebug("%p all subquery retrieve ts complete, do ts block intersect", pParentSql);
H
hjxilinx 已提交
1120

H
Haojun Liao 已提交
1121 1122 1123
  // proceeds to launched secondary query to retrieve final data
  SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
  SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
H
Haojun Liao 已提交
1124

H
Haojun Liao 已提交
1125 1126 1127
  STimeWindow win = TSWINDOW_INITIALIZER;
  int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &win);
  if (num <= 0) {  // no result during ts intersect
1128
    tscDebug("%p no results generated in ts intersection, free all sub SqlObj and quit", pParentSql);
H
Haojun Liao 已提交
1129
    freeJoinSubqueryObj(pParentSql);
1130 1131 1132

    // set no result command
    pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
H
Haojun Liao 已提交
1133
    (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
H
Haojun Liao 已提交
1134 1135 1136 1137 1138 1139
    return;
  }

  // launch the query the retrieve actual results from vnode along with the filtered timestamp
  SQueryInfo* pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
  updateQueryTimeRange(pPQueryInfo, &win);
H
Haojun Liao 已提交
1140 1141

  //update the vgroup that involved in real data query
H
Haojun Liao 已提交
1142
  tscLaunchRealSubqueries(pParentSql);
H
Haojun Liao 已提交
1143
}
H
Haojun Liao 已提交
1144

H
Haojun Liao 已提交
1145 1146
static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfRows) {
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
1147

H
Haojun Liao 已提交
1148
  SSqlObj* pParentSql = pSupporter->pObj;
H
Haojun Liao 已提交
1149

H
Haojun Liao 已提交
1150 1151 1152
  SSqlObj* pSql = (SSqlObj*)tres;
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;
H
Haojun Liao 已提交
1153

H
Haojun Liao 已提交
1154
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
D
dapan1121 已提交
1155 1156 1157

  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, pParentSql->res.code);
D
fix bug  
dapan1121 已提交
1158 1159 1160 1161
    if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
      return;
    }
    
D
dapan1121 已提交
1162 1163 1164 1165 1166 1167
    tscAsyncResultOnError(pParentSql);

    return;
  }

  
H
Haojun Liao 已提交
1168 1169
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(numOfRows == taos_errno(pSql));
H
hjxilinx 已提交
1170

H
Haojun Liao 已提交
1171 1172
    pParentSql->res.code = numOfRows;
    tscError("%p retrieve failed, index:%d, code:%s", pSql, pSupporter->subqueryIndex, tstrerror(numOfRows));
H
Haojun Liao 已提交
1173

H
Haojun Liao 已提交
1174
    tscAsyncResultOnError(pParentSql);
H
Haojun Liao 已提交
1175
    return;
H
Haojun Liao 已提交
1176
  }
H
Haojun Liao 已提交
1177

H
Haojun Liao 已提交
1178 1179 1180
  if (numOfRows >= 0) {
    pRes->numOfTotal += pRes->numOfRows;
  }
H
Haojun Liao 已提交
1181

H
Haojun Liao 已提交
1182
  SSubqueryState* pState = &pParentSql->subState;
H
Haojun Liao 已提交
1183 1184 1185
  if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) {
    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
    assert(pQueryInfo->numOfTables == 1);
H
Haojun Liao 已提交
1186

H
Haojun Liao 已提交
1187
    // for projection query, need to try next vnode if current vnode is exhausted
H
Haojun Liao 已提交
1188 1189
    int32_t numOfVgroups = 0;  // TODO refactor
    if (pTableMetaInfo->pVgroupTables != NULL) {
S
Shengliang Guan 已提交
1190
      numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
H
Haojun Liao 已提交
1191 1192 1193 1194 1195
    } else {
      numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
    }

    if ((++pTableMetaInfo->vgroupIndex) < numOfVgroups) {
H
Haojun Liao 已提交
1196
      tscDebug("%p no result in current vnode anymore, try next vnode, vgIndex:%d", pSql, pTableMetaInfo->vgroupIndex);
H
Haojun Liao 已提交
1197 1198
      pSql->cmd.command = TSDB_SQL_SELECT;
      pSql->fp = tscJoinQueryCallback;
H
Haojun Liao 已提交
1199

H
Haojun Liao 已提交
1200 1201
      tscProcessSql(pSql);
      return;
H
Haojun Liao 已提交
1202 1203
    } else {
      tscDebug("%p no result in current subquery anymore", pSql);
H
Haojun Liao 已提交
1204 1205 1206
    }
  }

D
fix bug  
dapan1121 已提交
1207
  if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
1208
    tscDebug("%p sub:%p,%d completed, total:%d", pParentSql, tres, pSupporter->subqueryIndex, pState->numOfSub);
H
Haojun Liao 已提交
1209 1210 1211
    return;
  }

H
Haojun Liao 已提交
1212
  tscDebug("%p all %d secondary subqueries retrieval completed, code:%d", tres, pState->numOfSub, pParentSql->res.code);
H
Haojun Liao 已提交
1213 1214 1215 1216 1217 1218 1219

  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    freeJoinSubqueryObj(pParentSql);
    pParentSql->res.completed = true;
  }

  // update the records for each subquery in parent sql object.
1220
  bool stableQuery = tscIsTwoStageSTableQuery(pQueryInfo, 0);
H
Haojun Liao 已提交
1221
  for (int32_t i = 0; i < pState->numOfSub; ++i) {
H
Haojun Liao 已提交
1222
    if (pParentSql->pSubs[i] == NULL) {
H
Haojun Liao 已提交
1223
      tscDebug("%p %p sub:%d not retrieve data", pParentSql, NULL, i);
H
Haojun Liao 已提交
1224
      continue;
H
hjxilinx 已提交
1225
    }
H
Haojun Liao 已提交
1226 1227

    SSqlRes* pRes1 = &pParentSql->pSubs[i]->res;
H
Haojun Liao 已提交
1228 1229

    if (pRes1->row > 0 && pRes1->numOfRows > 0) {
H
Haojun Liao 已提交
1230
      tscDebug("%p sub:%p index:%d numOfRows:%d total:%"PRId64 " (not retrieve)", pParentSql, pParentSql->pSubs[i], i,
H
Haojun Liao 已提交
1231 1232 1233
               pRes1->numOfRows, pRes1->numOfTotal);
      assert(pRes1->row < pRes1->numOfRows);
    } else {
1234
      if (!stableQuery) {
1235 1236 1237
        pRes1->numOfClauseTotal += pRes1->numOfRows;
      }

H
Haojun Liao 已提交
1238
      tscDebug("%p sub:%p index:%d numOfRows:%d total:%"PRId64, pParentSql, pParentSql->pSubs[i], i,
H
Haojun Liao 已提交
1239 1240
               pRes1->numOfRows, pRes1->numOfTotal);
    }
H
hjxilinx 已提交
1241
  }
H
Haojun Liao 已提交
1242 1243 1244

  // data has retrieved to client, build the join results
  tscBuildResFromSubqueries(pParentSql);
H
hjxilinx 已提交
1245 1246
}

1247
void tscFetchDatablockForSubquery(SSqlObj* pSql) {
H
Haojun Liao 已提交
1248
  assert(pSql->subState.numOfSub >= 1);
H
hjxilinx 已提交
1249
  
H
hjxilinx 已提交
1250
  int32_t numOfFetch = 0;
H
Haojun Liao 已提交
1251 1252
  bool    hasData = true;
  bool    reachLimit = false;
H
Haojun Liao 已提交
1253 1254

  // if the subquery is NULL, it does not involved in the final result generation
H
Haojun Liao 已提交
1255
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
1256 1257
    SSqlObj* pSub = pSql->pSubs[i];
    if (pSub == NULL) {
H
hjxilinx 已提交
1258 1259
      continue;
    }
H
Haojun Liao 已提交
1260

H
hjxilinx 已提交
1261
    SSqlRes *pRes = &pSub->res;
H
Haojun Liao 已提交
1262

H
hjxilinx 已提交
1263 1264
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0);

H
Haojun Liao 已提交
1265 1266
    if (!tscHasReachLimitation(pQueryInfo, pRes)) {
      if (pRes->row >= pRes->numOfRows) {
H
Haojun Liao 已提交
1267
        // no data left in current result buffer
H
Haojun Liao 已提交
1268 1269
        hasData = false;

H
Haojun Liao 已提交
1270 1271
        // The current query is completed for the active vnode, try next vnode if exists
        // If it is completed, no need to fetch anymore.
H
Haojun Liao 已提交
1272 1273
        if (!pRes->completed) {
          numOfFetch++;
H
Haojun Liao 已提交
1274
        }
H
hjxilinx 已提交
1275
      }
H
Haojun Liao 已提交
1276 1277
    } else {  // has reach the limitation, no data anymore
      if (pRes->row >= pRes->numOfRows) {
H
Haojun Liao 已提交
1278 1279
        reachLimit = true;
        hasData    = false;
H
Haojun Liao 已提交
1280 1281
        break;
      }
H
hjxilinx 已提交
1282
    }
H
Haojun Liao 已提交
1283
  }
H
hjxilinx 已提交
1284

H
hjxilinx 已提交
1285 1286 1287 1288
  // has data remains in client side, and continue to return data to app
  if (hasData) {
    tscBuildResFromSubqueries(pSql);
    return;
H
Haojun Liao 已提交
1289
  }
H
Haojun Liao 已提交
1290

H
Haojun Liao 已提交
1291 1292
  // If at least one subquery is completed in current vnode, try the next vnode in case of multi-vnode
  // super table projection query.
1293 1294 1295 1296 1297 1298 1299
  if (reachLimit) {
    pSql->res.completed = true;
    freeJoinSubqueryObj(pSql);

    if (pSql->res.code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, 0);
    } else {
H
Haojun Liao 已提交
1300
      tscAsyncResultOnError(pSql);
1301 1302 1303 1304 1305 1306
    }

    return;
  }

  if (numOfFetch <= 0) {
H
Haojun Liao 已提交
1307 1308
    bool tryNextVnode = false;

1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321
    bool orderedPrjQuery = false;
    for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
      SSqlObj* pSub = pSql->pSubs[i];
      if (pSub == NULL) {
        continue;
      }

      SQueryInfo* p = tscGetQueryInfoDetail(&pSub->cmd, 0);
      orderedPrjQuery = tscNonOrderedProjectionQueryOnSTable(p, 0);
      if (orderedPrjQuery) {
        break;
      }
    }
H
Haojun Liao 已提交
1322

D
fix bug  
dapan1121 已提交
1323

1324
    if (orderedPrjQuery) {
1325
      for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
Haojun Liao 已提交
1326 1327
        SSqlObj* pSub = pSql->pSubs[i];
        if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) {
D
fix bug  
dapan1121 已提交
1328
          subquerySetState(pSub, &pSql->subState, i, 0);
H
Haojun Liao 已提交
1329 1330 1331
        }
      }
    }
D
fix bug  
dapan1121 已提交
1332
    
H
Haojun Liao 已提交
1333 1334 1335 1336 1337 1338 1339 1340 1341

    for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
      SSqlObj* pSub = pSql->pSubs[i];
      if (pSub == NULL) {
        continue;
      }

      SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0);

1342 1343
      if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && pSub->res.row >= pSub->res.numOfRows &&
          pSub->res.completed) {
H
Haojun Liao 已提交
1344 1345 1346 1347 1348 1349
        STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
        assert(pQueryInfo->numOfTables == 1);

        // for projection query, need to try next vnode if current vnode is exhausted
        int32_t numOfVgroups = 0;  // TODO refactor
        if (pTableMetaInfo->pVgroupTables != NULL) {
S
Shengliang Guan 已提交
1350
          numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
H
Haojun Liao 已提交
1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371
        } else {
          numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
        }

        if ((++pTableMetaInfo->vgroupIndex) < numOfVgroups) {
          tscDebug("%p no result in current vnode anymore, try next vnode, vgIndex:%d", pSub,
                   pTableMetaInfo->vgroupIndex);
          pSub->cmd.command = TSDB_SQL_SELECT;
          pSub->fp = tscJoinQueryCallback;

          tscProcessSql(pSub);
          tryNextVnode = true;
        } else {
          tscDebug("%p no result in current subquery anymore", pSub);
        }
      }
    }

    if (tryNextVnode) {
      return;
    }
H
Haojun Liao 已提交
1372 1373 1374 1375

    pSql->res.completed = true;
    freeJoinSubqueryObj(pSql);

H
hjxilinx 已提交
1376 1377 1378
    if (pSql->res.code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, 0);
    } else {
H
Haojun Liao 已提交
1379
      tscAsyncResultOnError(pSql);
H
hjxilinx 已提交
1380
    }
1381

H
hjxilinx 已提交
1382 1383 1384 1385
    return;
  }

  // TODO multi-vnode retrieve for projection query with limitation has bugs, since the global limiation is not handled
H
Haojun Liao 已提交
1386
  // retrieve data from current vnode.
1387
  tscDebug("%p retrieve data from %d subqueries", pSql, numOfFetch);
H
Haojun Liao 已提交
1388
  SJoinSupporter* pSupporter = NULL;
1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401

  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
    SSqlObj* pSql1 = pSql->pSubs[i];
    if (pSql1 == NULL) {
      continue;
    }

    SSqlRes* pRes1 = &pSql1->res;

    if (pRes1->row >= pRes1->numOfRows) {
      subquerySetState(pSql1, &pSql->subState, i, 0);
    }
  }
H
Haojun Liao 已提交
1402

H
Haojun Liao 已提交
1403
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
1404 1405 1406 1407
    SSqlObj* pSql1 = pSql->pSubs[i];
    if (pSql1 == NULL) {
      continue;
    }
H
Haojun Liao 已提交
1408

H
hjxilinx 已提交
1409 1410 1411
    SSqlRes* pRes1 = &pSql1->res;
    SSqlCmd* pCmd1 = &pSql1->cmd;

1412
    pSupporter = (SJoinSupporter*)pSql1->param;
H
hjxilinx 已提交
1413 1414 1415 1416 1417 1418 1419 1420

    // wait for all subqueries completed
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd1, 0);
    assert(pRes1->numOfRows >= 0 && pQueryInfo->numOfTables == 1);

    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
    
    if (pRes1->row >= pRes1->numOfRows) {
1421
      tscDebug("%p subquery:%p retrieve data from vnode, subquery:%d, vgroupIndex:%d", pSql, pSql1,
H
hjxilinx 已提交
1422
               pSupporter->subqueryIndex, pTableMetaInfo->vgroupIndex);
H
hjxilinx 已提交
1423 1424

      tscResetForNextRetrieve(pRes1);
H
Haojun Liao 已提交
1425
      pSql1->fp = joinRetrieveFinalResCallback;
H
hjxilinx 已提交
1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441

      if (pCmd1->command < TSDB_SQL_LOCAL) {
        pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
      }

      tscProcessSql(pSql1);
    }
  }
}

// all subqueries return, set the result output index
void tscSetupOutputColumnIndex(SSqlObj* pSql) {
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;


H
Haojun Liao 已提交
1442
  // the column transfer support struct has been built
H
hjxilinx 已提交
1443
  if (pRes->pColumnIndex != NULL) {
H
Haojun Liao 已提交
1444
    return;
H
hjxilinx 已提交
1445 1446 1447 1448
  }

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

S
Shengliang Guan 已提交
1449
  int32_t numOfExprs = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
H
Haojun Liao 已提交
1450
  pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * numOfExprs);
H
Haojun Liao 已提交
1451 1452 1453 1454
  if (pRes->pColumnIndex == NULL) {
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return;
  }
H
Haojun Liao 已提交
1455 1456

  for (int32_t i = 0; i < numOfExprs; ++i) {
H
hjxilinx 已提交
1457 1458 1459 1460 1461
    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);

    int32_t tableIndexOfSub = -1;
    for (int32_t j = 0; j < pQueryInfo->numOfTables; ++j) {
      STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, j);
1462
      if (pTableMetaInfo->pTableMeta->id.uid == pExpr->uid) {
H
hjxilinx 已提交
1463 1464 1465 1466 1467 1468 1469 1470 1471 1472
        tableIndexOfSub = j;
        break;
      }
    }

    assert(tableIndexOfSub >= 0 && tableIndexOfSub < pQueryInfo->numOfTables);
    
    SSqlCmd* pSubCmd = &pSql->pSubs[tableIndexOfSub]->cmd;
    SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(pSubCmd, 0);
    
H
Haojun Liao 已提交
1473 1474
    size_t numOfSubExpr = taosArrayGetSize(pSubQueryInfo->exprList);
    for (int32_t k = 0; k < numOfSubExpr; ++k) {
H
hjxilinx 已提交
1475 1476 1477 1478 1479 1480 1481
      SSqlExpr* pSubExpr = tscSqlExprGet(pSubQueryInfo, k);
      if (pExpr->functionId == pSubExpr->functionId && pExpr->colInfo.colId == pSubExpr->colInfo.colId) {
        pRes->pColumnIndex[i] = (SColumnIndex){.tableIndex = tableIndexOfSub, .columnIndex = k};
        break;
      }
    }
  }
H
Haojun Liao 已提交
1482 1483

  // restore the offset value for super table query in case of final result.
1484
  tscRestoreFuncForSTableQuery(pQueryInfo);
H
Haojun Liao 已提交
1485
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
1486 1487 1488 1489
}

void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
  SSqlObj* pSql = (SSqlObj*)tres;
H
Haojun Liao 已提交
1490

1491
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
1492
  SSqlObj* pParentSql = pSupporter->pObj;
D
fix bug  
dapan1121 已提交
1493
  
H
hjxilinx 已提交
1494
  // There is only one subquery and table for each subquery.
H
hjxilinx 已提交
1495
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
D
fix bug  
dapan1121 已提交
1496 1497
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);

H
hjxilinx 已提交
1498
  assert(pQueryInfo->numOfTables == 1 && pSql->cmd.numOfClause == 1);
H
hjxilinx 已提交
1499

H
Haojun Liao 已提交
1500 1501 1502
  // retrieve actual query results from vnode during the second stage join subquery
  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code, pParentSql->res.code);
D
fix bug  
dapan1121 已提交
1503 1504 1505
    if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
      return;
    }
D
fix bug  
dapan1121 已提交
1506

H
Haojun Liao 已提交
1507
    tscAsyncResultOnError(pParentSql);
H
hjxilinx 已提交
1508

H
Haojun Liao 已提交
1509 1510
    return;
  }
H
hjxilinx 已提交
1511

H
Haojun Liao 已提交
1512 1513 1514
  // TODO here retry is required, not directly returns to client
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(taos_errno(pSql) == code);
H
hjxilinx 已提交
1515

1516
    tscError("%p abort query, code:%s, global code:%s", pSql, tstrerror(code), tstrerror(pParentSql->res.code));
H
Haojun Liao 已提交
1517 1518
    pParentSql->res.code = code;

D
fix bug  
dapan1121 已提交
1519 1520 1521 1522
    if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
      return;
    }
    
H
Haojun Liao 已提交
1523
    tscAsyncResultOnError(pParentSql);
H
Haojun Liao 已提交
1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543

    return;
  }

  // retrieve <tid, tag> tuples from vnode
  if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) {
    pSql->fp = tidTagRetrieveCallback;
    pSql->cmd.command = TSDB_SQL_FETCH;
    tscProcessSql(pSql);
    return;
  }

  // retrieve ts_comp info from vnode
  if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
    pSql->fp = tsCompRetrieveCallback;
    pSql->cmd.command = TSDB_SQL_FETCH;
    tscProcessSql(pSql);
    return;
  }

1544 1545
  // In case of consequence query from other vnode, do not wait for other query response here.
  if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) {
D
fix bug  
dapan1121 已提交
1546
    if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
1547
      return;
1548
    }      
H
Haojun Liao 已提交
1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559
  }

  tscSetupOutputColumnIndex(pParentSql);

  /**
   * if the query is a continue query (vgroupIndex > 0 for projection query) for next vnode, do the retrieval of
   * data instead of returning to its invoker
   */
  if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
    pSql->fp = joinRetrieveFinalResCallback;  // continue retrieve data
    pSql->cmd.command = TSDB_SQL_FETCH;
1560
    
H
Haojun Liao 已提交
1561 1562 1563 1564 1565
    tscProcessSql(pSql);
  } else {  // first retrieve from vnode during the secondary stage sub-query
    // set the command flag must be after the semaphore been correctly set.
    if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
      (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
H
hjxilinx 已提交
1566
    } else {
H
Haojun Liao 已提交
1567
      tscAsyncResultOnError(pParentSql);
H
hjxilinx 已提交
1568 1569 1570 1571 1572 1573 1574
    }
  }
}

/////////////////////////////////////////////////////////////////////////////////////////
static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code);

1575
static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
H
hjxilinx 已提交
1576

H
Haojun Liao 已提交
1577
int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) {
H
hjxilinx 已提交
1578 1579 1580 1581
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  
  pSql->res.qhandle = 0x1;
H
Haojun Liao 已提交
1582 1583
  assert(pSql->res.numOfRows == 0);

H
hjxilinx 已提交
1584
  if (pSql->pSubs == NULL) {
H
Haojun Liao 已提交
1585
    pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
H
hjxilinx 已提交
1586
    if (pSql->pSubs == NULL) {
1587
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
1588 1589 1590
    }
  }
  
1591
  SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL);
H
hjxilinx 已提交
1592
  if (pNew == NULL) {
1593
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
1594 1595
  }
  
1596
  pSql->pSubs[tableIndex] = pNew;
H
hjxilinx 已提交
1597 1598 1599 1600 1601 1602 1603 1604
  
  if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
    addGroupInfoForSubquery(pSql, pNew, 0, tableIndex);
    
    // refactor as one method
    SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
    assert(pNewQueryInfo != NULL);
    
1605 1606 1607 1608 1609 1610 1611
    // update the table index
    size_t num = taosArrayGetSize(pNewQueryInfo->colList);
    for (int32_t i = 0; i < num; ++i) {
      SColumn* pCol = taosArrayGetP(pNewQueryInfo->colList, i);
      pCol->colIndex.tableIndex = 0;
    }
    
H
hjxilinx 已提交
1612 1613 1614 1615 1616 1617 1618
    pSupporter->colList = pNewQueryInfo->colList;
    pNewQueryInfo->colList = NULL;
    
    pSupporter->exprList = pNewQueryInfo->exprList;
    pNewQueryInfo->exprList = NULL;
    
    pSupporter->fieldsInfo = pNewQueryInfo->fieldsInfo;
H
hjxilinx 已提交
1619
  
H
hjxilinx 已提交
1620 1621
    // this data needs to be transfer to support struct
    memset(&pNewQueryInfo->fieldsInfo, 0, sizeof(SFieldInfo));
H
Haojun Liao 已提交
1622 1623 1624
    if (tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond) != 0) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
1625

H
Haojun Liao 已提交
1626 1627 1628
    pSupporter->groupInfo = pNewQueryInfo->groupbyExpr;
    memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));

H
hjxilinx 已提交
1629
    pNew->cmd.numOfCols = 0;
1630
    pNewQueryInfo->interval.interval = 0;
H
Haojun Liao 已提交
1631 1632 1633 1634 1635
    pSupporter->limit = pNewQueryInfo->limit;

    pNewQueryInfo->limit.limit = -1;
    pNewQueryInfo->limit.offset = 0;

H
hjxilinx 已提交
1636 1637 1638
    // backup the data and clear it in the sqlcmd object
    memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));
    
H
hjxilinx 已提交
1639
    tscInitQueryInfo(pNewQueryInfo);
H
hjxilinx 已提交
1640 1641
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
    
weixin_48148422's avatar
weixin_48148422 已提交
1642
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // return the tableId & tag
H
Haojun Liao 已提交
1643
      SColumnIndex colIndex = {0};
H
Haojun Liao 已提交
1644 1645 1646 1647

      STagCond* pTagCond = &pSupporter->tagCond;
      assert(pTagCond->joinInfo.hasJoin);

1648
      int32_t tagColId = tscGetJoinTagColIdByUid(pTagCond, pTableMetaInfo->pTableMeta->id.uid);
H
Haojun Liao 已提交
1649 1650 1651
      SSchema* s = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);

      colIndex.columnIndex = tscGetTagColIndexById(pTableMetaInfo->pTableMeta, tagColId);
1652

H
Haojun Liao 已提交
1653
      int16_t bytes = 0;
H
Haojun Liao 已提交
1654
      int16_t type  = 0;
H
Haojun Liao 已提交
1655 1656
      int32_t inter = 0;

H
Haojun Liao 已提交
1657
      getResultDataInfo(s->type, s->bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0);
H
Haojun Liao 已提交
1658

S
Shengliang Guan 已提交
1659
      SSchema s1 = {.colId = s->colId, .type = (uint8_t)type, .bytes = bytes};
H
Haojun Liao 已提交
1660
      pSupporter->tagSize = s1.bytes;
1661
      assert(isValidDataType(s1.type) && s1.bytes > 0);
H
Haojun Liao 已提交
1662

1663 1664
      // set get tags query type
      TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
1665
      tscAddFuncInSelectClause(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &colIndex, &s1, TSDB_COL_TAG);
1666
      size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
1667
  
1668
      tscDebug(
1669
          "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to tid_tag query to retrieve (tableId, tags), "
S
TD-1057  
Shengliang Guan 已提交
1670
          "exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, tagIndex:%d, name:%s",
1671
          pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo),
1672
          numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, colIndex.columnIndex, tNameGetTableName(&pNewQueryInfo->pTableMetaInfo[0]->name));
1673 1674
    } else {
      SSchema      colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
H
Haojun Liao 已提交
1675
      SColumnIndex colIndex = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
1676
      tscAddFuncInSelectClause(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &colIndex, &colSchema, TSDB_COL_NORMAL);
1677 1678 1679 1680

      // set the tags value for ts_comp function
      SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0);

H
Haojun Liao 已提交
1681
      if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
1682
        int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
1683
        pExpr->param->i64 = tagColId;
H
Haojun Liao 已提交
1684 1685
        pExpr->numOfParams = 1;
      }
1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702

      // add the filter tag column
      if (pSupporter->colList != NULL) {
        size_t s = taosArrayGetSize(pSupporter->colList);

        for (int32_t i = 0; i < s; ++i) {
          SColumn *pCol = taosArrayGetP(pSupporter->colList, i);

          if (pCol->numOfFilters > 0) {  // copy to the pNew->cmd.colList if it is filtered.
            SColumn *p = tscColumnClone(pCol);
            taosArrayPush(pNewQueryInfo->colList, &p);
          }
        }
      }

      size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);

1703
      tscDebug(
B
Bomin Zhang 已提交
1704
          "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%u, transfer to ts_comp query to retrieve timestamps, "
S
TD-1057  
Shengliang Guan 已提交
1705
          "exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s",
1706
          pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo),
1707
          numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pNewQueryInfo->pTableMetaInfo[0]->name));
1708
    }
H
hjxilinx 已提交
1709
  } else {
H
hjxilinx 已提交
1710
    assert(0);
H
hjxilinx 已提交
1711 1712 1713 1714
    SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
    pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
  }

H
Haojun Liao 已提交
1715
  return TSDB_CODE_SUCCESS;
H
hjxilinx 已提交
1716 1717
}

H
Haojun Liao 已提交
1718
void tscHandleMasterJoinQuery(SSqlObj* pSql) {
H
hjxilinx 已提交
1719
  SSqlCmd* pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1720 1721
  SSqlRes* pRes = &pSql->res;

H
hjxilinx 已提交
1722 1723
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0);
1724

H
Haojun Liao 已提交
1725
  int32_t code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1726
  pSql->subState.numOfSub = pQueryInfo->numOfTables;
H
Haojun Liao 已提交
1727

1728 1729 1730 1731 1732 1733
  if (pSql->subState.states == NULL) {
    pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states));
    if (pSql->subState.states == NULL) {
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      goto _error;
    }
D
fix bug  
dapan1121 已提交
1734 1735
    
    pthread_mutex_init(&pSql->subState.mutex, NULL);
1736
  }
D
dapan1121 已提交
1737 1738 1739

  memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub);
  tscDebug("%p reset all sub states to 0", pSql);
1740
  
H
Haojun Liao 已提交
1741 1742
  bool hasEmptySub = false;

1743
  tscDebug("%p start subquery, total:%d", pSql, pQueryInfo->numOfTables);
H
hjxilinx 已提交
1744
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
Haojun Liao 已提交
1745

H
Haojun Liao 已提交
1746
    SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i);
H
hjxilinx 已提交
1747 1748 1749
    
    if (pSupporter == NULL) {  // failed to create support struct, abort current query
      tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i);
H
Haojun Liao 已提交
1750 1751
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      goto _error;
H
hjxilinx 已提交
1752 1753
    }
    
H
Haojun Liao 已提交
1754
    code = tscCreateJoinSubquery(pSql, i, pSupporter);
H
hjxilinx 已提交
1755 1756
    if (code != TSDB_CODE_SUCCESS) {  // failed to create subquery object, quit query
      tscDestroyJoinSupporter(pSupporter);
H
Haojun Liao 已提交
1757 1758 1759 1760 1761 1762 1763
      goto _error;
    }

    SSqlObj* pSub = pSql->pSubs[i];
    STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSub->cmd, 0, 0);
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) && (pTableMetaInfo->vgroupList->numOfVgroups == 0)) {
      hasEmptySub = true;
H
hjxilinx 已提交
1764 1765 1766
      break;
    }
  }
H
Haojun Liao 已提交
1767

H
Haojun Liao 已提交
1768 1769 1770 1771 1772
  if (hasEmptySub) {  // at least one subquery is empty, do nothing and return
    freeJoinSubqueryObj(pSql);
    pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
    (*pSql->fp)(pSql->param, pSql, 0);
  } else {
D
fix bug  
dapan1121 已提交
1773
    int fail = 0;
H
Haojun Liao 已提交
1774
    for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
Haojun Liao 已提交
1775
      SSqlObj* pSub = pSql->pSubs[i];
D
fix bug  
dapan1121 已提交
1776 1777 1778 1779 1780
      if (fail) {
        (*pSub->fp)(pSub->param, pSub, 0);
        continue;
      }
      
H
Haojun Liao 已提交
1781
      if ((code = tscProcessSql(pSub)) != TSDB_CODE_SUCCESS) {
D
fix bug  
dapan1121 已提交
1782 1783
        pRes->code = code;
        (*pSub->fp)(pSub->param, pSub, 0);
D
fix bug  
dapan1121 已提交
1784
        fail = 1;
H
Haojun Liao 已提交
1785 1786 1787
      }
    }

D
fix bug  
dapan1121 已提交
1788 1789 1790 1791
    if(fail) {
      return;
    }

H
Haojun Liao 已提交
1792 1793 1794 1795 1796 1797 1798
    pSql->cmd.command = TSDB_SQL_TABLE_JOIN_RETRIEVE;
  }

  return;

  _error:
  pRes->code = code;
H
Haojun Liao 已提交
1799
  tscAsyncResultOnError(pSql);
H
hjxilinx 已提交
1800 1801
}

H
Haojun Liao 已提交
1802 1803
static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) {
  assert(numOfSubs <= pSql->subState.numOfSub && numOfSubs >= 0);
H
hjxilinx 已提交
1804 1805 1806 1807 1808 1809 1810
  
  for(int32_t i = 0; i < numOfSubs; ++i) {
    SSqlObj* pSub = pSql->pSubs[i];
    assert(pSub != NULL);
    
    SRetrieveSupport* pSupport = pSub->param;
    
S
TD-1848  
Shengliang Guan 已提交
1811 1812
    tfree(pSupport->localBuffer);
    tfree(pSupport);
H
hjxilinx 已提交
1813
    
1814
    taos_free_result(pSub);
H
hjxilinx 已提交
1815 1816 1817
  }
}

D
TD-2516  
dapan1121 已提交
1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834
void tscLockByThread(int64_t *lockedBy) {
  int64_t tid = taosGetSelfPthreadId();
  int     i = 0;
  while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) {
    if (++i % 100 == 0) {
      sched_yield();
    }
  }
}

void tscUnlockByThread(int64_t *lockedBy) {
  int64_t tid = taosGetSelfPthreadId();
  if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) {
    assert(false);
  }
}

1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858
typedef struct SFirstRoundQuerySup {
  SSqlObj  *pParent;
  int32_t   numOfRows;
  SArray   *pColsInfo;
  int32_t   tagLen;
  STColumn *pTagCols;
  SArray   *pResult;   // SArray<SInterResult>
  int64_t   interval;
  char*     buf;
  int32_t   bufLen;
} SFirstRoundQuerySup;

void doAppendData(SInterResult* pInterResult, TAOS_ROW row, int32_t numOfCols, SQueryInfo* pQueryInfo) {
  TSKEY key = INT64_MIN;
  for(int32_t i = 0; i < numOfCols; ++i) {
    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
    if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
      continue;
    }

    if (pExpr->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
      key = *(TSKEY*) row[i];
      continue;
    }
D
TD-2516  
dapan1121 已提交
1859

1860 1861 1862 1863 1864 1865 1866 1867
    double v = 0;
    if (row[i] != NULL) {
      v = *(double*) row[i];
    } else {
      SET_DOUBLE_NULL(&v);
    }

    int32_t id = pExpr->colInfo.colId;
H
Haojun Liao 已提交
1868
    int32_t numOfQueriedCols = (int32_t) taosArrayGetSize(pInterResult->pResult);
1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968

    SArray* p = NULL;
    for(int32_t j = 0; j < numOfQueriedCols; ++j) {
      SStddevInterResult* pColRes = taosArrayGet(pInterResult->pResult, j);
      if (pColRes->colId == id) {
        p = pColRes->pResult;
        break;
      }
    }

    //append a new column
    if (p == NULL) {
      SStddevInterResult t = {.colId = id, .pResult = taosArrayInit(10, sizeof(SResPair)),};
      taosArrayPush(pInterResult->pResult, &t);
      p = t.pResult;
    }

    SResPair pair = {.avg = v, .key = key};
    taosArrayPush(p, &pair);
  }
}

void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
  SSqlObj* pSql = (SSqlObj*)tres;
  SSqlRes* pRes = &pSql->res;

  SFirstRoundQuerySup* pSup = param;
  SQueryInfo*          pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);

  if (numOfRows > 0) {
    TAOS_ROW row = NULL;
    int32_t  numOfCols = taos_field_count(tres);

    if (pSup->tagLen == 0) {  // no tags, all rows belong to one group
      SInterResult interResult = {.tags = NULL, .pResult = taosArrayInit(4, sizeof(SStddevInterResult))};
      taosArrayPush(pSup->pResult, &interResult);

      while ((row = taos_fetch_row(tres)) != NULL) {
        doAppendData(&interResult, row, numOfCols, pQueryInfo);
      }
    } else {  // tagLen > 0
      char* p = calloc(1, pSup->tagLen);

      while ((row = taos_fetch_row(tres)) != NULL) {
        int32_t* length = taos_fetch_lengths(tres);
        memset(p, 0, pSup->tagLen);

        int32_t offset = 0;
        for (int32_t i = 0; i < numOfCols && offset < pSup->tagLen; ++i) {
          SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
          if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
            memcpy(p + offset, row[i], length[i]);
            offset += pExpr->resBytes;
          }
        }

        assert(offset == pSup->tagLen);
        size_t size = taosArrayGetSize(pSup->pResult);

        if (size > 0) {
          SInterResult* pInterResult = taosArrayGetLast(pSup->pResult);
          if (memcmp(pInterResult->tags, p, pSup->tagLen) == 0) {  // belongs to the same group
            doAppendData(pInterResult, row, numOfCols, pQueryInfo);
          } else {
            char* tags = malloc( pSup->tagLen);
            memcpy(tags, p, pSup->tagLen);

            SInterResult interResult = {.tags = tags, .pResult = taosArrayInit(4, sizeof(SStddevInterResult))};
            taosArrayPush(pSup->pResult, &interResult);
            doAppendData(&interResult, row, numOfCols, pQueryInfo);
          }
        } else {
          char* tags = malloc(pSup->tagLen);
          memcpy(tags, p, pSup->tagLen);

          SInterResult interResult = {.tags = tags, .pResult = taosArrayInit(4, sizeof(SStddevInterResult))};
          taosArrayPush(pSup->pResult, &interResult);
          doAppendData(&interResult, row, numOfCols, pQueryInfo);
        }
      }

      tfree(p);
    }
  }

  pSup->numOfRows += numOfRows;
  if (!pRes->completed) {
    taos_fetch_rows_a(tres, tscFirstRoundRetrieveCallback, param);
    return;
  }

  // set the parameters for the second round query process
  SSqlObj    *pParent = pSup->pParent;
  SSqlCmd    *pPCmd   = &pParent->cmd;
  SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(pPCmd, 0);

  if (pSup->numOfRows > 0) {
    SBufferWriter bw = tbufInitWriter(NULL, false);
    interResToBinary(&bw, pSup->pResult, pSup->tagLen);

H
Haojun Liao 已提交
1969
    pQueryInfo1->bufLen = (int32_t) tbufTell(&bw);
1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036
    pQueryInfo1->buf = tbufGetData(&bw, true);

    // set the serialized binary string as the parameter of arithmetic expression
    tbufCloseWriter(&bw);
  }

  taosArrayDestroyEx(pSup->pResult, freeInterResult);
  taosArrayDestroy(pSup->pColsInfo);
  tfree(pSup);

  taos_free_result(pSql);

  pQueryInfo1->round = 1;
  tscDoQuery(pParent);
}

void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) {
  int32_t c = taos_errno(tres);
  if (c != TSDB_CODE_SUCCESS) {
    // TODO HANDLE ERROR
  }

  taos_fetch_rows_a(tres, tscFirstRoundRetrieveCallback, param);
}

int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  STableMetaInfo* pTableMetaInfo1 = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);

  SFirstRoundQuerySup *pSup = calloc(1, sizeof(SFirstRoundQuerySup));

  pSup->pParent  = pSql;
  pSup->interval = pQueryInfo->interval.interval;
  pSup->pResult  = taosArrayInit(6, sizeof(SStddevInterResult));
  pSup->pColsInfo = taosArrayInit(6, sizeof(int16_t)); // result column id

  SSqlObj *pNew = createSubqueryObj(pSql, 0, tscFirstRoundCallback, pSup, TSDB_SQL_SELECT, NULL);
  SSqlCmd *pCmd = &pNew->cmd;

  tscClearSubqueryInfo(pCmd);
  tscFreeSqlResult(pSql);

  SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  assert(pQueryInfo->numOfTables == 1);

  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);

  tscInitQueryInfo(pNewQueryInfo);
  pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr;
  if (pQueryInfo->groupbyExpr.columnInfo != NULL) {
    pNewQueryInfo->groupbyExpr.columnInfo = taosArrayDup(pQueryInfo->groupbyExpr.columnInfo);
    if (pNewQueryInfo->groupbyExpr.columnInfo == NULL) {
      terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
//      goto _error;
    }
  }

  if (tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond) != 0) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
//    goto _error;
  }

  pNewQueryInfo->interval = pQueryInfo->interval;

  pCmd->command = TSDB_SQL_SELECT;
  pNew->fp = tscFirstRoundCallback;

H
Haojun Liao 已提交
2037
  int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090

  int32_t index = 0;
  int32_t numOfTags = 0;
  for(int32_t i = 0; i < numOfExprs; ++i) {
    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
    if (pExpr->functionId == TSDB_FUNC_TS && pQueryInfo->interval.interval > 0) {
      taosArrayPush(pSup->pColsInfo, &pExpr->resColId);

      SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
      SSchema* schema = tscGetColumnSchemaById(pTableMetaInfo1->pTableMeta, pExpr->colInfo.colId);

      SSqlExpr* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_TS, &colIndex, schema, TSDB_COL_NORMAL);
      p->resColId = pExpr->resColId;  // update the result column id
    } else if (pExpr->functionId == TSDB_FUNC_STDDEV_DST) {
      taosArrayPush(pSup->pColsInfo, &pExpr->resColId);

      SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = pExpr->colInfo.colIndex};
      SSchema schema = {.type = TSDB_DATA_TYPE_DOUBLE, .bytes = sizeof(double)};
      tstrncpy(schema.name, pExpr->aliasName, tListLen(schema.name));

      SSqlExpr* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_AVG, &colIndex, &schema, TSDB_COL_NORMAL);
      p->resColId = pExpr->resColId;  // update the result column id
    } else if (pExpr->functionId == TSDB_FUNC_TAG) {
      pSup->tagLen += pExpr->resBytes;
      SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = pExpr->colInfo.colIndex};

      SSchema* schema = NULL;
      if (pExpr->colInfo.colId != TSDB_TBNAME_COLUMN_INDEX) {
        schema = tscGetColumnSchemaById(pTableMetaInfo1->pTableMeta, pExpr->colInfo.colId);
      } else {
        schema = tGetTbnameColumnSchema();
      }

      SSqlExpr* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_TAG, &colIndex, schema, TSDB_COL_TAG);
      p->resColId = pExpr->resColId;
      numOfTags += 1;
    }
  }

  SColumnIndex columnIndex = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
  tscInsertPrimaryTsSourceColumn(pNewQueryInfo, &columnIndex);

  tscTansformFuncForSTableQuery(pNewQueryInfo);

  tscDebug(
      "%p first round subquery:%p tableIndex:%d, vgroupIndex:%d, numOfVgroups:%d, type:%d, query to retrieve timestamps, "
      "numOfExpr:%" PRIzu ", colList:%d, numOfOutputFields:%d, name:%s",
      pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pNewQueryInfo->type,
      tscSqlExprNumOfExprs(pNewQueryInfo), index+1, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name));

  tscHandleMasterSTableQuery(pNew);
  return TSDB_CODE_SUCCESS;
}
D
TD-2516  
dapan1121 已提交
2091

H
hjxilinx 已提交
2092 2093 2094 2095 2096
int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
  
  // pRes->code check only serves in launching metric sub-queries
2097
  if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
H
hjxilinx 已提交
2098
    pCmd->command = TSDB_SQL_RETRIEVE_LOCALMERGE;  // enable the abort of kill super table function.
H
hjxilinx 已提交
2099 2100 2101
    return pRes->code;
  }
  
2102
  tExtMemBuffer   **pMemoryBuf = NULL;
H
Haojun Liao 已提交
2103 2104
  tOrderDescriptor *pDesc  = NULL;
  SColumnModel     *pModel = NULL;
H
Haojun Liao 已提交
2105
  SColumnModel     *pFinalModel = NULL;
H
Haojun Liao 已提交
2106

H
Haojun Liao 已提交
2107
  pRes->qhandle = 0x1;  // hack the qhandle check
H
hjxilinx 已提交
2108
  
H
Haojun Liao 已提交
2109
  const uint32_t nBufferSize = (1u << 16u);  // 64KB
H
hjxilinx 已提交
2110
  
H
Haojun Liao 已提交
2111
  SQueryInfo     *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
2112
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
2113 2114
  SSubqueryState *pState = &pSql->subState;

H
Haojun Liao 已提交
2115 2116 2117 2118
  pState->numOfSub = 0;
  if (pTableMetaInfo->pVgroupTables == NULL) {
    pState->numOfSub = pTableMetaInfo->vgroupList->numOfVgroups;
  } else {
S
Shengliang Guan 已提交
2119
    pState->numOfSub = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
H
Haojun Liao 已提交
2120 2121
  }

H
Haojun Liao 已提交
2122
  assert(pState->numOfSub > 0);
H
hjxilinx 已提交
2123
  
H
Haojun Liao 已提交
2124
  int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, &pFinalModel, nBufferSize);
H
hjxilinx 已提交
2125
  if (ret != 0) {
2126
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
Haojun Liao 已提交
2127
    tscAsyncResultOnError(pSql);
S
TD-1848  
Shengliang Guan 已提交
2128
    tfree(pMemoryBuf);
H
hjxilinx 已提交
2129
    return ret;
H
hjxilinx 已提交
2130 2131
  }
  
H
Haojun Liao 已提交
2132
  tscDebug("%p retrieved query data from %d vnode(s)", pSql, pState->numOfSub);
2133
  pSql->pSubs = calloc(pState->numOfSub, POINTER_BYTES);
H
Haojun Liao 已提交
2134
  if (pSql->pSubs == NULL) {
S
TD-1848  
Shengliang Guan 已提交
2135
    tfree(pSql->pSubs);
H
Haojun Liao 已提交
2136
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
Haojun Liao 已提交
2137
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel,pState->numOfSub);
H
Haojun Liao 已提交
2138

H
Haojun Liao 已提交
2139
    tscAsyncResultOnError(pSql);
H
Haojun Liao 已提交
2140 2141 2142
    return ret;
  }

2143 2144 2145 2146 2147 2148 2149 2150
  if (pState->states == NULL) {
    pState->states = calloc(pState->numOfSub, sizeof(*pState->states));
    if (pState->states == NULL) {
      pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      tscAsyncResultOnError(pSql);
      tfree(pMemoryBuf);
      return ret;
    }
D
fix bug  
dapan1121 已提交
2151 2152

    pthread_mutex_init(&pState->mutex, NULL);
2153 2154 2155
  }

  memset(pState->states, 0, sizeof(*pState->states) * pState->numOfSub);
D
fix bug  
dapan1121 已提交
2156
  tscDebug("%p reset all sub states to 0", pSql);
2157
  
H
hjxilinx 已提交
2158 2159 2160
  pRes->code = TSDB_CODE_SUCCESS;
  
  int32_t i = 0;
H
Haojun Liao 已提交
2161
  for (; i < pState->numOfSub; ++i) {
H
hjxilinx 已提交
2162 2163 2164 2165 2166 2167 2168 2169
    SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport));
    if (trs == NULL) {
      tscError("%p failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
      break;
    }
    
    trs->pExtMemBuffer = pMemoryBuf;
    trs->pOrderDescriptor = pDesc;
H
Haojun Liao 已提交
2170

H
hjxilinx 已提交
2171 2172 2173
    trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage));
    if (trs->localBuffer == NULL) {
      tscError("%p failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
S
TD-1848  
Shengliang Guan 已提交
2174
      tfree(trs);
H
hjxilinx 已提交
2175 2176 2177
      break;
    }
    
H
Haojun Liao 已提交
2178 2179
    trs->subqueryIndex  = i;
    trs->pParentSql     = pSql;
H
hjxilinx 已提交
2180
    trs->pFinalColModel = pModel;
H
Haojun Liao 已提交
2181
    trs->pFFColModel    = pFinalModel;
H
Haojun Liao 已提交
2182

2183
    SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL);
H
hjxilinx 已提交
2184 2185
    if (pNew == NULL) {
      tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
S
TD-1848  
Shengliang Guan 已提交
2186 2187
      tfree(trs->localBuffer);
      tfree(trs);
H
hjxilinx 已提交
2188 2189 2190 2191 2192 2193 2194
      break;
    }
    
    // todo handle multi-vnode situation
    if (pQueryInfo->tsBuf) {
      SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
      pNewQueryInfo->tsBuf = tsBufClone(pQueryInfo->tsBuf);
H
Haojun Liao 已提交
2195
      assert(pNewQueryInfo->tsBuf != NULL);
H
hjxilinx 已提交
2196 2197
    }
    
2198
    tscDebug("%p sub:%p create subquery success. orderOfSub:%d", pSql, pNew, trs->subqueryIndex);
H
hjxilinx 已提交
2199 2200
  }
  
H
Haojun Liao 已提交
2201
  if (i < pState->numOfSub) {
H
hjxilinx 已提交
2202
    tscError("%p failed to prepare subquery structure and launch subqueries", pSql);
2203
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
2204
    
H
Haojun Liao 已提交
2205
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel, pState->numOfSub);
H
Haojun Liao 已提交
2206
    doCleanupSubqueries(pSql, i);
H
hjxilinx 已提交
2207 2208 2209
    return pRes->code;   // free all allocated resource
  }
  
2210
  if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
H
Haojun Liao 已提交
2211
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel, pState->numOfSub);
H
Haojun Liao 已提交
2212
    doCleanupSubqueries(pSql, i);
H
hjxilinx 已提交
2213 2214 2215
    return pRes->code;
  }
  
H
Haojun Liao 已提交
2216
  for(int32_t j = 0; j < pState->numOfSub; ++j) {
H
hjxilinx 已提交
2217 2218 2219
    SSqlObj* pSub = pSql->pSubs[j];
    SRetrieveSupport* pSupport = pSub->param;
    
2220
    tscDebug("%p sub:%p launch subquery, orderOfSub:%d.", pSql, pSub, pSupport->subqueryIndex);
H
hjxilinx 已提交
2221 2222
    tscProcessSql(pSub);
  }
H
Haojun Liao 已提交
2223

H
hjxilinx 已提交
2224 2225 2226
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
2227 2228
static void tscFreeRetrieveSup(SSqlObj *pSql) {
  SRetrieveSupport *trsupport = pSql->param;
2229

H
Haojun Liao 已提交
2230 2231 2232 2233 2234 2235 2236
  void* p = atomic_val_compare_exchange_ptr(&pSql->param, trsupport, 0);
  if (p == NULL) {
    tscDebug("%p retrieve supp already released", pSql);
    return;
  }

  tscDebug("%p start to free subquery supp obj:%p", pSql, trsupport);
S
TD-1848  
Shengliang Guan 已提交
2237 2238
  tfree(trsupport->localBuffer);
  tfree(trsupport);
H
hjxilinx 已提交
2239 2240
}

2241
static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows);
2242
static void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows);
H
hjxilinx 已提交
2243

H
Haojun Liao 已提交
2244
static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t code) {
H
hjxilinx 已提交
2245
// set no disk space error info
2246
  tscError("sub:%p failed to flush data to disk, reason:%s", tres, tstrerror(code));
H
Haojun Liao 已提交
2247
  SSqlObj* pParentSql = trsupport->pParentSql;
H
Haojun Liao 已提交
2248 2249

  pParentSql->res.code = code;
H
hjxilinx 已提交
2250
  trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
H
Haojun Liao 已提交
2251
  tscHandleSubqueryError(trsupport, tres, pParentSql->res.code);
H
hjxilinx 已提交
2252 2253
}

H
Haojun Liao 已提交
2254 2255 2256 2257
/*
 * current query failed, and the retry count is less than the available
 * count, retry query clear previous retrieved data, then launch a new sub query
 */
D
fix bug  
dapan1121 已提交
2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273
static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32_t code) {
  SRetrieveSupport *trsupport = malloc(sizeof(SRetrieveSupport));
  if (trsupport == NULL) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  memcpy(trsupport, oriTrs, sizeof(*trsupport));

  const uint32_t nBufferSize = (1u << 16u);  // 64KB
  trsupport->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage));
  if (trsupport->localBuffer == NULL) {
    tscError("%p failed to malloc buffer for local buffer, reason:%s", pSql, strerror(errno));
    tfree(trsupport);
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  
H
Haojun Liao 已提交
2274 2275 2276
  SSqlObj *pParentSql = trsupport->pParentSql;
  int32_t  subqueryIndex = trsupport->subqueryIndex;

S
TD-1732  
Shengliang Guan 已提交
2277 2278
  STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
  SVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
H
Haojun Liao 已提交
2279 2280 2281 2282 2283

  tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]);

  // clear local saved number of results
  trsupport->localBuffer->num = 0;
2284
  tscError("%p sub:%p retrieve/query failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSql, pSql,
H
Haojun Liao 已提交
2285 2286
           tstrerror(code), subqueryIndex, trsupport->numOfRetry);

2287
  SSqlObj *pNew = tscCreateSTableSubquery(trsupport->pParentSql, trsupport, pSql);
H
Haojun Liao 已提交
2288
  if (pNew == NULL) {
2289
    tscError("%p sub:%p failed to create new subquery due to error:%s, abort retry, vgId:%d, orderOfSub:%d",
D
fix bug  
dapan1121 已提交
2290
             oriTrs->pParentSql, pSql, tstrerror(terrno), pVgroup->vgId, oriTrs->subqueryIndex);
H
Haojun Liao 已提交
2291

2292
    pParentSql->res.code = terrno;
D
fix bug  
dapan1121 已提交
2293
    oriTrs->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
H
Haojun Liao 已提交
2294

D
fix bug  
dapan1121 已提交
2295
    tfree(trsupport);
H
Haojun Liao 已提交
2296 2297 2298
    return pParentSql->res.code;
  }

2299 2300 2301 2302
  int32_t ret = tscProcessSql(pNew);

  // if failed to process sql, let following code handle the pSql
  if (ret == TSDB_CODE_SUCCESS) {
D
fix bug  
dapan1121 已提交
2303
    tscFreeRetrieveSup(pSql);
2304
    taos_free_result(pSql);
H
Haojun Liao 已提交
2305
    return ret;
D
fix bug  
dapan1121 已提交
2306 2307 2308 2309
  } else {    
    pSql->pSubs[trsupport->subqueryIndex] = pSql;
    tscFreeRetrieveSup(pNew);
    taos_free_result(pNew);
H
Haojun Liao 已提交
2310
    return ret;
2311
  }
H
Haojun Liao 已提交
2312 2313
}

2314
void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
H
Haojun Liao 已提交
2315 2316 2317 2318 2319
  // it has been freed already
  if (pSql->param != trsupport || pSql->param == NULL) {
    return;
  }

H
Haojun Liao 已提交
2320
  SSqlObj *pParentSql = trsupport->pParentSql;
H
hjxilinx 已提交
2321 2322 2323
  int32_t  subqueryIndex = trsupport->subqueryIndex;
  
  assert(pSql != NULL);
H
Haojun Liao 已提交
2324

H
Haojun Liao 已提交
2325
  SSubqueryState* pState = &pParentSql->subState;
H
Haojun Liao 已提交
2326

2327
  // retrieved in subquery failed. OR query cancelled in retrieve phase.
H
Haojun Liao 已提交
2328 2329
  if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) {

H
hjxilinx 已提交
2330 2331
    /*
     * kill current sub-query connection, which may retrieve data from vnodes;
2332
     * Here we get: pPObj->res.code == TSDB_CODE_TSC_QUERY_CANCELLED
H
hjxilinx 已提交
2333 2334 2335
     */
    pSql->res.numOfRows = 0;
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;  // disable retry efforts
2336 2337
    tscDebug("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%s", pParentSql, pSql,
             subqueryIndex, tstrerror(pParentSql->res.code));
H
hjxilinx 已提交
2338
  }
H
Haojun Liao 已提交
2339

H
hjxilinx 已提交
2340
  if (numOfRows >= 0) {  // current query is successful, but other sub query failed, still abort current query.
2341
    tscDebug("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pParentSql, pSql, numOfRows, subqueryIndex);
2342 2343
    tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%s", pParentSql, pSql,
             subqueryIndex, tstrerror(pParentSql->res.code));
H
hjxilinx 已提交
2344
  } else {
H
Haojun Liao 已提交
2345
    if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pParentSql->res.code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2346
      if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
2347 2348 2349
        return;
      }
    } else {  // reach the maximum retry count, abort
H
Haojun Liao 已提交
2350 2351 2352
      atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows);
      tscError("%p sub:%p retrieve failed,code:%s,orderOfSub:%d failed.no more retry,set global code:%s", pParentSql, pSql,
               tstrerror(numOfRows), subqueryIndex, tstrerror(pParentSql->res.code));
H
hjxilinx 已提交
2353 2354
    }
  }
H
Haojun Liao 已提交
2355

D
fix bug  
dapan1121 已提交
2356
  if (!subAndCheckDone(pSql, pParentSql, subqueryIndex)) {
2357
    tscDebug("%p sub:%p,%d freed, not finished, total:%d", pParentSql, pSql, trsupport->subqueryIndex, pState->numOfSub);
H
Haojun Liao 已提交
2358

H
Haojun Liao 已提交
2359
    tscFreeRetrieveSup(pSql);
S
Shengliang Guan 已提交
2360
    return;
2361
  }  
H
hjxilinx 已提交
2362 2363
  
  // all subqueries are failed
H
Haojun Liao 已提交
2364
  tscError("%p retrieve from %d vnode(s) completed,code:%s.FAILED.", pParentSql, pState->numOfSub,
H
Haojun Liao 已提交
2365 2366
      tstrerror(pParentSql->res.code));

H
hjxilinx 已提交
2367
  // release allocated resource
H
Haojun Liao 已提交
2368
  tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel, trsupport->pFFColModel,
H
Haojun Liao 已提交
2369
                            pState->numOfSub);
H
hjxilinx 已提交
2370
  
H
Haojun Liao 已提交
2371
  tscFreeRetrieveSup(pSql);
2372

H
hjxilinx 已提交
2373
  // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
2374 2375 2376 2377 2378 2379
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0);

  if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
    (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code);
  } else {  // regular super table query
    if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2380
      tscAsyncResultOnError(pParentSql);
2381 2382
    }
  }
H
hjxilinx 已提交
2383 2384
}

2385 2386
static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* pSql) {
  int32_t           idx = trsupport->subqueryIndex;
H
Haojun Liao 已提交
2387
  SSqlObj *         pParentSql = trsupport->pParentSql;
2388 2389
  tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
  
H
Haojun Liao 已提交
2390
  SSubqueryState* pState = &pParentSql->subState;
2391 2392
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  
2393 2394
  STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
  
2395
  // data in from current vnode is stored in cache and disk
S
Shengliang Guan 已提交
2396
  uint32_t numOfRowsFromSubquery = (uint32_t)(trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num);
H
Haojun Liao 已提交
2397 2398 2399
  SVgroupsInfo* vgroupsInfo = pTableMetaInfo->vgroupList;
  tscDebug("%p sub:%p all data retrieved from ep:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pParentSql, pSql,
           vgroupsInfo->vgroups[0].epAddr[0].fqdn, vgroupsInfo->vgroups[0].vgId, numOfRowsFromSubquery, idx);
2400 2401 2402 2403
  
  tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity);

#ifdef _DEBUG_VIEW
2404
  printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->num);
2405 2406
    SSrcColumnInfo colInfo[256] = {0};
    tscGetSrcColumnInfo(colInfo, pQueryInfo);
2407 2408
    tColModelDisplayEx(pDesc->pColumnModel, trsupport->localBuffer->data, trsupport->localBuffer->num,
                       trsupport->localBuffer->num, colInfo);
2409 2410
#endif
  
H
Haojun Liao 已提交
2411 2412 2413
  if (tsTotalTmpDirGB != 0 && tsAvailTmpDirectorySpace < tsReservedTmpDirectorySpace) {
    tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pParentSql, pSql,
             tsAvailTmpDirectorySpace, tsReservedTmpDirectorySpace);
S
Shengliang Guan 已提交
2414 2415
    tscAbortFurtherRetryRetrieval(trsupport, pSql, TSDB_CODE_TSC_NO_DISKSPACE);
    return;
2416 2417 2418
  }
  
  // each result for a vnode is ordered as an independant list,
H
Haojun Liao 已提交
2419
  // then used as an input of loser tree for disk-based merge
2420 2421
  int32_t code = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pQueryInfo->groupbyExpr.orderType);
  if (code != 0) { // set no disk space error info, and abort retry
S
Shengliang Guan 已提交
2422 2423
    tscAbortFurtherRetryRetrieval(trsupport, pSql, code);
    return;
2424 2425
  }
  
D
fix bug  
dapan1121 已提交
2426
  if (!subAndCheckDone(pSql, pParentSql, idx)) {
2427
    tscDebug("%p sub:%p orderOfSub:%d freed, not finished", pParentSql, pSql, trsupport->subqueryIndex);
H
Haojun Liao 已提交
2428

H
Haojun Liao 已提交
2429
    tscFreeRetrieveSup(pSql);
S
Shengliang Guan 已提交
2430
    return;
2431
  }  
2432 2433 2434 2435
  
  // all sub-queries are returned, start to local merge process
  pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage;
  
H
Haojun Liao 已提交
2436
  tscDebug("%p retrieve from %d vnodes completed.final NumOfRows:%" PRId64 ",start to build loser tree", pParentSql,
H
Haojun Liao 已提交
2437
           pState->numOfSub, pState->numOfRetrievedRows);
2438
  
H
Haojun Liao 已提交
2439
  SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0);
2440 2441
  tscClearInterpInfo(pPQueryInfo);
  
H
Haojun Liao 已提交
2442
  tscCreateLocalMerger(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, trsupport->pFinalColModel, trsupport->pFFColModel, pParentSql);
H
Haojun Liao 已提交
2443
  tscDebug("%p build loser tree completed", pParentSql);
2444
  
H
Haojun Liao 已提交
2445 2446 2447
  pParentSql->res.precision = pSql->res.precision;
  pParentSql->res.numOfRows = 0;
  pParentSql->res.row = 0;
2448
  
H
Haojun Liao 已提交
2449
  tscFreeRetrieveSup(pSql);
H
Haojun Liao 已提交
2450

2451 2452 2453 2454 2455
  // set the command flag must be after the semaphore been correctly set.
  pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE;
  if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
    (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
  } else {
H
Haojun Liao 已提交
2456
    tscAsyncResultOnError(pParentSql);
2457
  }
2458 2459 2460
}

static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
H
Haojun Liao 已提交
2461 2462 2463 2464
  SSqlObj *pSql = (SSqlObj *)tres;
  assert(pSql != NULL);

  // this query has been freed already
H
hjxilinx 已提交
2465
  SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
H
Haojun Liao 已提交
2466 2467 2468 2469 2470 2471
  if (pSql->param == NULL || param == NULL) {
    tscDebug("%p already freed in dnodecallback", pSql);
    assert(pSql->res.code == TSDB_CODE_TSC_QUERY_CANCELLED);
    return;
  }

H
hjxilinx 已提交
2472
  tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
H
Haojun Liao 已提交
2473
  int32_t           idx   = trsupport->subqueryIndex;
H
Haojun Liao 已提交
2474
  SSqlObj *         pParentSql = trsupport->pParentSql;
H
Haojun Liao 已提交
2475

H
Haojun Liao 已提交
2476
  SSubqueryState* pState = &pParentSql->subState;
H
hjxilinx 已提交
2477
  
H
Haojun Liao 已提交
2478
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
TD-1732  
Shengliang Guan 已提交
2479
  SVgroupInfo  *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
H
Haojun Liao 已提交
2480 2481 2482

  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
2483
    tscDebug("%p query cancelled/failed, sub:%p, vgId:%d, orderOfSub:%d, code:%s, global code:%s",
H
Haojun Liao 已提交
2484 2485 2486 2487 2488 2489 2490 2491 2492
             pParentSql, pSql, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(numOfRows), tstrerror(pParentSql->res.code));

    tscHandleSubqueryError(param, tres, numOfRows);
    return;
  }

  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(numOfRows == taos_errno(pSql));

H
Haojun Liao 已提交
2493 2494 2495 2496
    if (numOfRows == TSDB_CODE_TSC_QUERY_CANCELLED) {
      trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
    }

H
Haojun Liao 已提交
2497
    if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
2498
      tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(numOfRows), trsupport->numOfRetry);
H
Haojun Liao 已提交
2499 2500 2501 2502 2503

      if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) {
        return;
      }
    } else {
H
Haojun Liao 已提交
2504
      tscDebug("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(numOfRows));
H
Haojun Liao 已提交
2505 2506 2507 2508 2509
      atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows);  // set global code and abort
    }

    tscHandleSubqueryError(param, tres, numOfRows);
    return;
H
hjxilinx 已提交
2510 2511 2512 2513 2514 2515 2516 2517 2518
  }
  
  SSqlRes *   pRes = &pSql->res;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  
  if (numOfRows > 0) {
    assert(pRes->numOfRows == numOfRows);
    int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows);
    
H
Haojun Liao 已提交
2519
    tscDebug("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%" PRIu64 " from ep:%s, orderOfSub:%d", pParentSql, pSql,
2520
             pRes->numOfRows, pState->numOfRetrievedRows, pSql->epSet.fqdn[pSql->epSet.inUse], idx);
2521

H
hjxilinx 已提交
2522
    if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
B
Bomin Zhang 已提交
2523
      tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64,
H
Haojun Liao 已提交
2524
               pParentSql, pSql, tsMaxNumOfOrderedResults, num);
S
Shengliang Guan 已提交
2525 2526
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_SORTED_RES_TOO_MANY);
      return;
H
hjxilinx 已提交
2527 2528 2529
    }

#ifdef _DEBUG_VIEW
2530
    printf("received data from vnode: %"PRIu64" rows\n", pRes->numOfRows);
H
hjxilinx 已提交
2531 2532 2533 2534 2535
    SSrcColumnInfo colInfo[256] = {0};

    tscGetSrcColumnInfo(colInfo, pQueryInfo);
    tColModelDisplayEx(pDesc->pColumnModel, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo);
#endif
2536
    
H
Haojun Liao 已提交
2537 2538 2539 2540
    // no disk space for tmp directory
    if (tsTotalTmpDirGB != 0 && tsAvailTmpDirectorySpace < tsReservedTmpDirectorySpace) {
      tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pParentSql, pSql,
               tsAvailTmpDirectorySpace, tsReservedTmpDirectorySpace);
S
Shengliang Guan 已提交
2541 2542
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_NO_DISKSPACE);
      return;
H
hjxilinx 已提交
2543 2544 2545
    }
    
    int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data,
H
Haojun Liao 已提交
2546
                               pRes->numOfRows, pQueryInfo->groupbyExpr.orderType);
2547
    if (ret != 0) { // set no disk space error info, and abort retry
2548
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_NO_DISKSPACE);
2549 2550 2551 2552
    } else if (pRes->completed) {
      tscAllDataRetrievedFromDnode(trsupport, pSql);
    } else { // continue fetch data from dnode
      taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
H
hjxilinx 已提交
2553
    }
2554
    
2555 2556
  } else { // all data has been retrieved to client
    tscAllDataRetrievedFromDnode(trsupport, pSql);
H
hjxilinx 已提交
2557 2558 2559
  }
}

2560
static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
H
hjxilinx 已提交
2561 2562
  const int32_t table_index = 0;
  
2563
  SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, TSDB_SQL_SELECT, prevSqlObj);
H
hjxilinx 已提交
2564 2565
  if (pNew != NULL) {  // the sub query of two-stage super table query
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
2566

H
hjxilinx 已提交
2567
    pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
H
Haojun Liao 已提交
2568
    assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1 && trsupport->subqueryIndex < pSql->subState.numOfSub);
H
hjxilinx 已提交
2569
    
H
hjxilinx 已提交
2570
    // launch subquery for each vnode, so the subquery index equals to the vgroupIndex.
H
hjxilinx 已提交
2571
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index);
H
hjxilinx 已提交
2572
    pTableMetaInfo->vgroupIndex = trsupport->subqueryIndex;
H
hjxilinx 已提交
2573 2574 2575 2576 2577 2578 2579
    
    pSql->pSubs[trsupport->subqueryIndex] = pNew;
  }
  
  return pNew;
}

2580
// todo there is are race condition in this function, while cancel is called by user.
H
hjxilinx 已提交
2581
void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
2582 2583 2584 2585 2586 2587 2588
  // the param may be null, since it may be done by other query threads. and the asyncOnError may enter in this
  // function while kill query by a user.
  if (param == NULL) {
    assert(code != TSDB_CODE_SUCCESS);
    return;
  }

2589
  SRetrieveSupport *trsupport = (SRetrieveSupport *) param;
H
hjxilinx 已提交
2590
  
H
Haojun Liao 已提交
2591
  SSqlObj*  pParentSql = trsupport->pParentSql;
2592
  SSqlObj*  pSql = (SSqlObj *) tres;
2593

2594 2595
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  assert(pSql->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1);
H
hjxilinx 已提交
2596
  
2597
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
S
TD-1732  
Shengliang Guan 已提交
2598
  SVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[trsupport->subqueryIndex];
H
Haojun Liao 已提交
2599

H
Haojun Liao 已提交
2600
  // stable query killed or other subquery failed, all query stopped
H
Haojun Liao 已提交
2601
  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
2602
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
H
Haojun Liao 已提交
2603
    tscError("%p query cancelled or failed, sub:%p, vgId:%d, orderOfSub:%d, code:%s, global code:%s",
H
Haojun Liao 已提交
2604 2605 2606 2607
        pParentSql, pSql, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(code), tstrerror(pParentSql->res.code));

    tscHandleSubqueryError(param, tres, code);
    return;
H
hjxilinx 已提交
2608 2609 2610
  }
  
  /*
H
Haojun Liao 已提交
2611
   * if a subquery on a vnode failed, all retrieve operations from vnode that occurs later
2612
   * than this one are actually not necessary, we simply call the tscRetrieveFromDnodeCallBack
H
hjxilinx 已提交
2613 2614
   * function to abort current and remain retrieve process.
   *
2615
   * NOTE: thread safe is required.
H
hjxilinx 已提交
2616
   */
H
Haojun Liao 已提交
2617 2618
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(code == taos_errno(pSql));
2619

H
Haojun Liao 已提交
2620
    if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
2621
      tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry);
H
Haojun Liao 已提交
2622
      if (tscReissueSubquery(trsupport, pSql, code) == TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
2623 2624
        return;
      }
2625
    } else {
H
Haojun Liao 已提交
2626
      tscError("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(code));
H
Haojun Liao 已提交
2627
      atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code);  // set global code and abort
2628
    }
H
Haojun Liao 已提交
2629 2630 2631 2632 2633

    tscHandleSubqueryError(param, tres, pParentSql->res.code);
    return;
  }

H
Haojun Liao 已提交
2634
  tscDebug("%p sub:%p query complete, ep:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSql, pSql,
2635
             pVgroup->epAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex);
H
Haojun Liao 已提交
2636 2637 2638 2639 2640

  if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode
    tscRetrieveFromDnodeCallBack(param, pSql, 0);
  } else {
    taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
H
hjxilinx 已提交
2641 2642 2643
  }
}

2644 2645
static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) {
  if (pParentObj->retry > pParentObj->maxRetry) {
H
Haojun Liao 已提交
2646
    tscError("%p max retry reached, abort the retry effort", pParentObj);
2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666
    return false;
  }

  for (int32_t i = 0; i < numOfSub; ++i) {
    int32_t code = pParentObj->pSubs[i]->res.code;
    if (code == TSDB_CODE_SUCCESS) {
      continue;
    }

    if (code != TSDB_CODE_TDB_TABLE_RECONFIGURE && code != TSDB_CODE_TDB_INVALID_TABLE_ID &&
        code != TSDB_CODE_VND_INVALID_VGROUP_ID && code != TSDB_CODE_RPC_NETWORK_UNAVAIL &&
        code != TSDB_CODE_APP_NOT_READY) {
      pParentObj->res.code = code;
      return false;
    }
  }

  return true;
}

H
Haojun Liao 已提交
2667 2668 2669 2670 2671 2672 2673 2674 2675
static void doFreeInsertSupporter(SSqlObj* pSqlObj) {
  assert(pSqlObj != NULL && pSqlObj->subState.numOfSub > 0);

  for(int32_t i = 0; i < pSqlObj->subState.numOfSub; ++i) {
    SSqlObj* pSql = pSqlObj->pSubs[i];
    tfree(pSql->param);
  }
}

H
Haojun Liao 已提交
2676
static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) {
H
hjxilinx 已提交
2677 2678
  SInsertSupporter *pSupporter = (SInsertSupporter *)param;
  SSqlObj* pParentObj = pSupporter->pSql;
H
Haojun Liao 已提交
2679

H
Haojun Liao 已提交
2680
  // record the total inserted rows
H
Haojun Liao 已提交
2681
  if (numOfRows > 0) {
D
fix bug  
dapan1121 已提交
2682
    atomic_add_fetch_32(&pParentObj->res.numOfRows, numOfRows);
H
Haojun Liao 已提交
2683 2684
  }

H
Haojun Liao 已提交
2685
  if (taos_errno(tres) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2686 2687 2688 2689
    SSqlObj* pSql = (SSqlObj*) tres;
    assert(pSql != NULL && pSql->res.code == numOfRows);
    
    pParentObj->res.code = pSql->res.code;
H
Haojun Liao 已提交
2690

2691 2692 2693 2694 2695
    // set the flag in the parent sqlObj
    if (pSql->cmd.submitSchema) {
      pParentObj->cmd.submitSchema = 1;
    }
  }
H
Haojun Liao 已提交
2696

D
fix bug  
dapan1121 已提交
2697
  if (!subAndCheckDone(tres, pParentObj, pSupporter->index)) {
2698
    tscDebug("%p insert:%p,%d completed, total:%d", pParentObj, tres, pSupporter->index, pParentObj->subState.numOfSub);
H
hjxilinx 已提交
2699 2700
    return;
  }
H
Haojun Liao 已提交
2701

H
hjxilinx 已提交
2702 2703
  // restore user defined fp
  pParentObj->fp = pParentObj->fetchFp;
2704
  int32_t numOfSub = pParentObj->subState.numOfSub;
2705
  doFreeInsertSupporter(pParentObj);
2706 2707 2708 2709 2710 2711 2712 2713 2714

  if (pParentObj->res.code == TSDB_CODE_SUCCESS) {
    tscDebug("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows);

    // todo remove this parameter in async callback function definition.
    // all data has been sent to vnode, call user function
    int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows;
    (*pParentObj->fp)(pParentObj->param, pParentObj, v);
  } else {
2715
    if (!needRetryInsert(pParentObj, numOfSub)) {
H
Haojun Liao 已提交
2716
      tscAsyncResultOnError(pParentObj);
2717 2718
      return;
    }
2719

2720
    int32_t numOfFailed = 0;
2721 2722 2723 2724 2725 2726
    for(int32_t i = 0; i < numOfSub; ++i) {
      SSqlObj* pSql = pParentObj->pSubs[i];
      if (pSql->res.code != TSDB_CODE_SUCCESS) {
        numOfFailed += 1;

        // clean up tableMeta in cache
2727
        tscFreeQueryInfo(&pSql->cmd);
2728 2729
        SQueryInfo* pQueryInfo = tscGetQueryInfoDetailSafely(&pSql->cmd, 0);
        STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, pSql->cmd.clauseIndex, 0);
2730
        tscAddTableMetaInfo(pQueryInfo, &pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL);
2731

2732 2733
        subquerySetState(pSql, &pParentObj->subState, i, 0);

2734 2735 2736 2737 2738 2739 2740
        tscDebug("%p, failed sub:%d, %p", pParentObj, i, pSql);
      }
    }

    tscError("%p Async insertion completed, total inserted:%d rows, numOfFailed:%d, numOfTotal:%d", pParentObj,
             pParentObj->res.numOfRows, numOfFailed, numOfSub);

2741
    tscDebug("%p cleanup %d tableMeta in hashTable", pParentObj, pParentObj->cmd.numOfTables);
2742
    for(int32_t i = 0; i < pParentObj->cmd.numOfTables; ++i) {
2743 2744
      char name[TSDB_TABLE_FNAME_LEN] = {0};
      tNameExtractFullName(pParentObj->cmd.pTableNameList[i], name);
2745
      taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
2746
    }
2747

2748 2749
    pParentObj->cmd.parseFinished = false;

2750
    tscResetSqlCmd(&pParentObj->cmd, false);
2751

H
Haojun Liao 已提交
2752 2753 2754
    // in case of insert, redo parsing the sql string and build new submit data block for two reasons:
    // 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly.
    // 2. vnode may need the schema information along with submit block to update its local table schema.
S
TD-2475  
Shengliang Guan 已提交
2755 2756 2757
    tscDebug("%p re-parse sql to generate submit data, retry:%d", pParentObj, pParentObj->retry);
    pParentObj->retry++;

2758 2759 2760 2761 2762
    int32_t code = tsParseSql(pParentObj, true);
    if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;

    if (code != TSDB_CODE_SUCCESS) {
      pParentObj->res.code = code;
H
Haojun Liao 已提交
2763
      tscAsyncResultOnError(pParentObj);
2764 2765 2766 2767 2768
      return;
    }

    tscDoQuery(pParentObj);
  }
2769 2770 2771 2772 2773 2774 2775
}

/**
 * it is a subquery, so after parse the sql string, copy the submit block to payload of itself
 * @param pSql
 * @return
 */
2776
int32_t tscHandleInsertRetry(SSqlObj* pParent, SSqlObj* pSql) {
2777 2778 2779 2780
  assert(pSql != NULL && pSql->param != NULL);
  SSqlRes* pRes = &pSql->res;

  SInsertSupporter* pSupporter = (SInsertSupporter*) pSql->param;
H
Haojun Liao 已提交
2781
  assert(pSupporter->index < pSupporter->pSql->subState.numOfSub);
2782

2783
  STableDataBlocks* pTableDataBlock = taosArrayGetP(pParent->cmd.pDataBlocks, pSupporter->index);
H
Haojun Liao 已提交
2784 2785 2786
  int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock);

  if ((pRes->code = code)!= TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2787
    tscAsyncResultOnError(pSql);
H
Haojun Liao 已提交
2788
    return code;  // here the pSql may have been released already.
2789 2790 2791
  }

  return tscProcessSql(pSql);
H
hjxilinx 已提交
2792 2793 2794 2795
}

int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
H
Haojun Liao 已提交
2796 2797
  SSqlRes *pRes = &pSql->res;

2798 2799 2800 2801
  // it is the failure retry insert
  if (pSql->pSubs != NULL) {
    for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
      SSqlObj* pSub = pSql->pSubs[i];
2802 2803 2804
      SInsertSupporter* pSup = calloc(1, sizeof(SInsertSupporter));
      pSup->index = i;
      pSup->pSql = pSql;
2805

2806
      pSub->param = pSup;
2807 2808 2809 2810 2811 2812 2813 2814 2815
      tscDebug("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, i);
      if (pSub->res.code != TSDB_CODE_SUCCESS) {
        tscHandleInsertRetry(pSql, pSub);
      }
    }

    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
2816 2817
  pSql->subState.numOfSub = (uint16_t)taosArrayGetSize(pCmd->pDataBlocks);
  assert(pSql->subState.numOfSub > 0);
H
Haojun Liao 已提交
2818 2819

  pRes->code = TSDB_CODE_SUCCESS;
2820

H
Haojun Liao 已提交
2821 2822 2823
  // the number of already initialized subqueries
  int32_t numOfSub = 0;

2824 2825 2826 2827 2828 2829
  if (pSql->subState.states == NULL) {
    pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states));
    if (pSql->subState.states == NULL) {
      pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      goto _error;
    }
D
fix bug  
dapan1121 已提交
2830 2831

    pthread_mutex_init(&pSql->subState.mutex, NULL);
2832 2833 2834
  }

  memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub);
D
fix bug  
dapan1121 已提交
2835
  tscDebug("%p reset all sub states to 0", pSql);
2836

H
Haojun Liao 已提交
2837
  pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
H
Haojun Liao 已提交
2838 2839 2840 2841
  if (pSql->pSubs == NULL) {
    goto _error;
  }

H
Haojun Liao 已提交
2842
  tscDebug("%p submit data to %d vnode(s)", pSql, pSql->subState.numOfSub);
2843

H
Haojun Liao 已提交
2844
  while(numOfSub < pSql->subState.numOfSub) {
H
Haojun Liao 已提交
2845
    SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter));
H
Haojun Liao 已提交
2846 2847 2848 2849
    if (pSupporter == NULL) {
      goto _error;
    }

2850 2851 2852 2853
    pSupporter->pSql   = pSql;
    pSupporter->index  = numOfSub;

    SSqlObj *pNew = createSimpleSubObj(pSql, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT);
H
hjxilinx 已提交
2854
    if (pNew == NULL) {
H
Haojun Liao 已提交
2855
      tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, numOfSub, strerror(errno));
H
Haojun Liao 已提交
2856
      goto _error;
H
hjxilinx 已提交
2857
    }
2858 2859 2860
  
    /*
     * assign the callback function to fetchFp to make sure that the error process function can restore
H
Haojun Liao 已提交
2861
     * the callback function (multiVnodeInsertFinalize) correctly.
2862 2863
     */
    pNew->fetchFp = pNew->fp;
H
Haojun Liao 已提交
2864
    pSql->pSubs[numOfSub] = pNew;
H
Haojun Liao 已提交
2865

2866 2867
    STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, numOfSub);
    pRes->code = tscCopyDataBlockToPayload(pNew, pTableDataBlock);
H
Haojun Liao 已提交
2868
    if (pRes->code == TSDB_CODE_SUCCESS) {
2869
      tscDebug("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, numOfSub);
2870
      numOfSub++;
H
Haojun Liao 已提交
2871
    } else {
H
Haojun Liao 已提交
2872
      tscDebug("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%s", pSql, numOfSub,
H
Haojun Liao 已提交
2873
               pSql->subState.numOfSub, tstrerror(pRes->code));
H
Haojun Liao 已提交
2874
      goto _error;
H
Haojun Liao 已提交
2875
    }
H
hjxilinx 已提交
2876 2877
  }
  
H
Haojun Liao 已提交
2878
  if (numOfSub < pSql->subState.numOfSub) {
H
hjxilinx 已提交
2879
    tscError("%p failed to prepare subObj structure and launch sub-insertion", pSql);
2880
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
Haojun Liao 已提交
2881
    goto _error;
H
hjxilinx 已提交
2882
  }
H
Haojun Liao 已提交
2883

2884 2885
  pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);

H
Haojun Liao 已提交
2886 2887
  // use the local variable
  for (int32_t j = 0; j < numOfSub; ++j) {
H
hjxilinx 已提交
2888
    SSqlObj *pSub = pSql->pSubs[j];
2889
    tscDebug("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, j);
H
hjxilinx 已提交
2890 2891
    tscProcessSql(pSub);
  }
H
Haojun Liao 已提交
2892

H
hjxilinx 已提交
2893
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2894 2895 2896

  _error:
  return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
2897
}
H
hjxilinx 已提交
2898

H
Haojun Liao 已提交
2899 2900 2901
static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t columnIndex, int16_t* bytes) {
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

H
Haojun Liao 已提交
2902
  SInternalField* pInfo = (SInternalField*) TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, columnIndex);
H
Haojun Liao 已提交
2903 2904 2905
  assert(pInfo->pSqlExpr != NULL);

  *bytes = pInfo->pSqlExpr->resBytes;
H
Haojun Liao 已提交
2906
  char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows + pRes->row * (*bytes);
H
Haojun Liao 已提交
2907 2908 2909 2910 2911 2912 2913 2914 2915 2916

  return pData;
}

static void doBuildResFromSubqueries(SSqlObj* pSql) {
  SSqlRes* pRes = &pSql->res;

  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);

  int32_t numOfRes = INT32_MAX;
H
Haojun Liao 已提交
2917
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
Haojun Liao 已提交
2918 2919
    SSqlObj* pSub = pSql->pSubs[i];
    if (pSub == NULL) {
H
Haojun Liao 已提交
2920 2921 2922
      continue;
    }

S
Shengliang Guan 已提交
2923
    int32_t remain = (int32_t)(pSub->res.numOfRows - pSub->res.row);
H
Haojun Liao 已提交
2924
    numOfRes = (int32_t)(MIN(numOfRes, remain));
H
Haojun Liao 已提交
2925 2926
  }

H
Haojun Liao 已提交
2927 2928
  if (numOfRes == 0) {  // no result any more, free all subquery objects
    freeJoinSubqueryObj(pSql);
H
Haojun Liao 已提交
2929 2930 2931
    return;
  }

H
Haojun Liao 已提交
2932
  int32_t rowSize = tscGetResRowLength(pQueryInfo->exprList);
H
Haojun Liao 已提交
2933

H
Haojun Liao 已提交
2934 2935
  assert(numOfRes * rowSize > 0);
  char* tmp = realloc(pRes->pRsp, numOfRes * rowSize + sizeof(tFilePage));
H
Haojun Liao 已提交
2936 2937 2938 2939 2940 2941 2942
  if (tmp == NULL) {
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return;
  } else {
    pRes->pRsp = tmp;
  }

H
Haojun Liao 已提交
2943 2944
  tFilePage* pFilePage = (tFilePage*) pRes->pRsp;
  pFilePage->num = numOfRes;
H
Haojun Liao 已提交
2945

H
Haojun Liao 已提交
2946
  pRes->data = pFilePage->data;
H
Haojun Liao 已提交
2947
  char* data = pRes->data;
H
Haojun Liao 已提交
2948

H
Haojun Liao 已提交
2949 2950 2951 2952 2953
  int16_t bytes = 0;

  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
  for(int32_t i = 0; i < numOfExprs; ++i) {
    SColumnIndex* pIndex = &pRes->pColumnIndex[i];
H
Haojun Liao 已提交
2954 2955
    SSqlRes*      pRes1 = &pSql->pSubs[pIndex->tableIndex]->res;
    SSqlCmd*      pCmd1 = &pSql->pSubs[pIndex->tableIndex]->cmd;
H
Haojun Liao 已提交
2956 2957 2958 2959 2960

    char* pData = getResultBlockPosition(pCmd1, pRes1, pIndex->columnIndex, &bytes);
    memcpy(data, pData, bytes * numOfRes);

    data += bytes * numOfRes;
H
Haojun Liao 已提交
2961 2962 2963 2964 2965 2966 2967 2968 2969 2970
  }

  for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
    SSqlObj* pSub = pSql->pSubs[i];
    if (pSub == NULL) {
      continue;
    }

    pSub->res.row += numOfRes;
    assert(pSub->res.row <= pSub->res.numOfRows);
H
Haojun Liao 已提交
2971 2972 2973 2974
  }

  pRes->numOfRows = numOfRes;
  pRes->numOfClauseTotal += numOfRes;
H
Haojun Liao 已提交
2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985

  int32_t finalRowSize = 0;
  for(int32_t i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
    TAOS_FIELD* pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
    finalRowSize += pField->bytes;
  }

  doArithmeticCalculate(pQueryInfo, pFilePage, rowSize, finalRowSize);

  pRes->data = pFilePage->data;
  tscSetResRawPtr(pRes, pQueryInfo);
H
Haojun Liao 已提交
2986 2987
}

H
hjxilinx 已提交
2988
void tscBuildResFromSubqueries(SSqlObj *pSql) {
H
Haojun Liao 已提交
2989 2990
  SSqlRes* pRes = &pSql->res;

H
hjxilinx 已提交
2991
  if (pRes->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2992
    tscAsyncResultOnError(pSql);
H
hjxilinx 已提交
2993 2994
    return;
  }
H
Haojun Liao 已提交
2995 2996 2997

  if (pRes->tsrow == NULL) {
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
H
Haojun Liao 已提交
2998
    pRes->numOfCols = (int16_t) tscSqlExprNumOfExprs(pQueryInfo);
H
Haojun Liao 已提交
2999

H
Haojun Liao 已提交
3000 3001 3002 3003
    pRes->tsrow  = calloc(pRes->numOfCols, POINTER_BYTES);
    pRes->urow   = calloc(pRes->numOfCols, POINTER_BYTES);
    pRes->buffer = calloc(pRes->numOfCols, POINTER_BYTES);
    pRes->length = calloc(pRes->numOfCols, sizeof(int32_t));
H
Haojun Liao 已提交
3004

H
Haojun Liao 已提交
3005 3006
    if (pRes->tsrow == NULL || pRes->buffer == NULL || pRes->length == NULL) {
      pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
Haojun Liao 已提交
3007
      tscAsyncResultOnError(pSql);
H
Haojun Liao 已提交
3008 3009 3010
      return;
    }

3011
    tscRestoreFuncForSTableQuery(pQueryInfo);
H
Haojun Liao 已提交
3012 3013
  }

H
Haojun Liao 已提交
3014 3015 3016 3017 3018
  assert (pRes->row >= pRes->numOfRows);
  doBuildResFromSubqueries(pSql);
  if (pRes->code == TSDB_CODE_SUCCESS) {
    (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
  } else {
H
Haojun Liao 已提交
3019
    tscAsyncResultOnError(pSql);
H
hjxilinx 已提交
3020 3021 3022
  }
}

3023
static UNUSED_FUNC void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pField) {
H
hjxilinx 已提交
3024 3025
  SSqlRes *pRes = &pSql->res;
  
H
Haojun Liao 已提交
3026
  if (pRes->tsrow[columnIndex] != NULL && pField->type == TSDB_DATA_TYPE_NCHAR) {
H
hjxilinx 已提交
3027 3028 3029 3030 3031 3032 3033 3034
    // convert unicode to native code in a temporary buffer extra one byte for terminated symbol
    if (pRes->buffer[columnIndex] == NULL) {
      pRes->buffer[columnIndex] = malloc(pField->bytes + TSDB_NCHAR_SIZE);
    }
    
    /* string terminated char for binary data*/
    memset(pRes->buffer[columnIndex], 0, pField->bytes + TSDB_NCHAR_SIZE);
    
3035 3036
    int32_t length = taosUcs4ToMbs(pRes->tsrow[columnIndex], pRes->length[columnIndex], pRes->buffer[columnIndex]);
    if ( length >= 0 ) {
3037
      pRes->tsrow[columnIndex] = (unsigned char*)pRes->buffer[columnIndex];
3038
      pRes->length[columnIndex] = length;
H
hjxilinx 已提交
3039
    } else {
B
Bomin Zhang 已提交
3040
      tscError("%p charset:%s to %s. val:%s convert failed.", pSql, DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)pRes->tsrow[columnIndex]);
H
hjxilinx 已提交
3041
      pRes->tsrow[columnIndex] = NULL;
3042
      pRes->length[columnIndex] = 0;
H
hjxilinx 已提交
3043 3044 3045 3046
    }
  }
}

H
Haojun Liao 已提交
3047
char *getArithmeticInputSrc(void *param, const char *name, int32_t colId) {
3048 3049 3050 3051 3052 3053 3054
  SArithmeticSupport *pSupport = (SArithmeticSupport *) param;

  int32_t index = -1;
  SSqlExpr* pExpr = NULL;
  
  for (int32_t i = 0; i < pSupport->numOfCols; ++i) {
    pExpr = taosArrayGetP(pSupport->exprList, i);
B
Bomin Zhang 已提交
3055
    if (strncmp(name, pExpr->aliasName, sizeof(pExpr->aliasName) - 1) == 0) {
3056 3057 3058 3059 3060 3061 3062 3063 3064
      index = i;
      break;
    }
  }

  assert(index >= 0 && index < pSupport->numOfCols);
  return pSupport->data[index] + pSupport->offset * pExpr->resBytes;
}

3065
TAOS_ROW doSetResultRowData(SSqlObj *pSql) {
H
hjxilinx 已提交
3066 3067
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
H
Haojun Liao 已提交
3068

H
hjxilinx 已提交
3069 3070
  assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows);
  if (pRes->row >= pRes->numOfRows) {  // all the results has returned to invoker
S
TD-1848  
Shengliang Guan 已提交
3071
    tfree(pRes->tsrow);
H
hjxilinx 已提交
3072 3073
    return pRes->tsrow;
  }
H
Haojun Liao 已提交
3074

H
hjxilinx 已提交
3075
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
Haojun Liao 已提交
3076

H
Haojun Liao 已提交
3077 3078
  size_t size = tscNumOfFields(pQueryInfo);
  for (int i = 0; i < size; ++i) {
3079
    SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
H
Haojun Liao 已提交
3080

H
Haojun Liao 已提交
3081
    int32_t type  = pInfo->field.type;
3082
    int32_t bytes = pInfo->field.bytes;
H
Haojun Liao 已提交
3083

3084 3085 3086 3087 3088
    if (type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR) {
      pRes->tsrow[i] = isNull(pRes->urow[i], type) ? NULL : pRes->urow[i];
    } else {
      pRes->tsrow[i] = isNull(pRes->urow[i], type) ? NULL : varDataVal(pRes->urow[i]);
      pRes->length[i] = varDataLen(pRes->urow[i]);
H
hjxilinx 已提交
3089
    }
H
Haojun Liao 已提交
3090

H
Haojun Liao 已提交
3091
    ((char**) pRes->urow)[i] += bytes;
H
hjxilinx 已提交
3092
  }
H
Haojun Liao 已提交
3093

H
hjxilinx 已提交
3094 3095 3096 3097
  pRes->row++;  // index increase one-step
  return pRes->tsrow;
}

H
Haojun Liao 已提交
3098
static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
H
hjxilinx 已提交
3099 3100 3101 3102 3103 3104 3105
  bool     hasData = true;
  SSqlCmd *pCmd = &pSql->cmd;
  
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
    bool allSubqueryExhausted = true;
    
H
Haojun Liao 已提交
3106
    for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131
      if (pSql->pSubs[i] == NULL) {
        continue;
      }
      
      SSqlRes *pRes1 = &pSql->pSubs[i]->res;
      SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd;
      
      SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(pCmd1, pCmd1->clauseIndex);
      assert(pQueryInfo1->numOfTables == 1);
      
      STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo1, 0);
      
      /*
       * if the global limitation is not reached, and current result has not exhausted, or next more vnodes are
       * available, goes on
       */
      if (pTableMetaInfo->vgroupIndex < pTableMetaInfo->vgroupList->numOfVgroups && pRes1->row < pRes1->numOfRows &&
          (!tscHasReachLimitation(pQueryInfo1, pRes1))) {
        allSubqueryExhausted = false;
        break;
      }
    }
    
    hasData = !allSubqueryExhausted;
  } else {  // otherwise, in case inner join, if any subquery exhausted, query completed.
H
Haojun Liao 已提交
3132
    for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
3133 3134 3135 3136 3137 3138 3139 3140
      if (pSql->pSubs[i] == 0) {
        continue;
      }
      
      SSqlRes *   pRes1 = &pSql->pSubs[i]->res;
      SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0);
      
      if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pQueryInfo1, pRes1) &&
H
Haojun Liao 已提交
3141
          tscIsProjectionQuery(pQueryInfo1)) || (pRes1->numOfRows == 0)) {
H
hjxilinx 已提交
3142 3143 3144 3145 3146 3147 3148 3149
        hasData = false;
        break;
      }
    }
  }
  
  return hasData;
}