tscSubquery.c 78.2 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
H
Hui Li 已提交
15 16
#define _GNU_SOURCE
 
H
Haojun Liao 已提交
17
#include "os.h"
H
hjxilinx 已提交
18

H
Haojun Liao 已提交
19 20
#include "qAst.h"
#include "qTsbuf.h"
H
Haojun Liao 已提交
21
#include "tcompare.h"
S
slguan 已提交
22
#include "tscLog.h"
H
Haojun Liao 已提交
23 24
#include "tscSubquery.h"
#include "tschemautil.h"
25
#include "tsclient.h"
26
#include "tscSubquery.h"
H
hjxilinx 已提交
27 28 29

typedef struct SInsertSupporter {
  SSqlObj*  pSql;
30
  int32_t   index;
H
hjxilinx 已提交
31 32
} SInsertSupporter;

H
hjxilinx 已提交
33
static void freeJoinSubqueryObj(SSqlObj* pSql);
H
Haojun Liao 已提交
34
static bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql);
H
hjxilinx 已提交
35

36
static bool tsCompare(int32_t order, int64_t left, int64_t right) {
37
  if (order == TSDB_ORDER_ASC) {
H
hjxilinx 已提交
38 39 40 41 42 43
    return left < right;
  } else {
    return left > right;
  }
}

H
Haojun Liao 已提交
44 45
static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJoinSupporter* pSupporter2, STimeWindow * win) {
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
H
hjxilinx 已提交
46

H
Haojun Liao 已提交
47 48 49 50 51
  STSBuf* output1 = tsBufCreate(true, pQueryInfo->order.order);
  STSBuf* output2 = tsBufCreate(true, pQueryInfo->order.order);

  win->skey = INT64_MAX;
  win->ekey = INT64_MIN;
H
hjxilinx 已提交
52 53 54 55 56 57 58 59 60 61

  SLimitVal* pLimit = &pQueryInfo->limit;
  int32_t    order = pQueryInfo->order.order;
  
  SQueryInfo* pSubQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[0]->cmd, 0);
  SQueryInfo* pSubQueryInfo2 = tscGetQueryInfoDetail(&pSql->pSubs[1]->cmd, 0);
  
  pSubQueryInfo1->tsBuf = output1;
  pSubQueryInfo2->tsBuf = output2;

62 63 64 65 66 67
  // no result generated, return directly
  if (pSupporter1->pTSBuf == NULL || pSupporter2->pTSBuf == NULL) {
    tscDebug("%p at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql);
    return 0;
  }

H
hjxilinx 已提交
68 69 70 71 72 73 74
  tsBufResetPos(pSupporter1->pTSBuf);
  tsBufResetPos(pSupporter2->pTSBuf);

  if (!tsBufNextPos(pSupporter1->pTSBuf)) {
    tsBufFlush(output1);
    tsBufFlush(output2);

75
    tscDebug("%p input1 is empty, 0 for secondary query after ts blocks intersecting", pSql);
H
hjxilinx 已提交
76 77 78 79 80 81 82
    return 0;
  }

  if (!tsBufNextPos(pSupporter2->pTSBuf)) {
    tsBufFlush(output1);
    tsBufFlush(output2);

83
    tscDebug("%p input2 is empty, 0 for secondary query after ts blocks intersecting", pSql);
H
hjxilinx 已提交
84 85 86 87 88 89 90 91 92 93 94
    return 0;
  }

  int64_t numOfInput1 = 1;
  int64_t numOfInput2 = 1;

  while (1) {
    STSElem elem1 = tsBufGetElem(pSupporter1->pTSBuf);
    STSElem elem2 = tsBufGetElem(pSupporter2->pTSBuf);

#ifdef _DEBUG_VIEW
H
Haojun Liao 已提交
95
    tscInfo("%" PRId64 ", tags:%"PRId64" \t %" PRId64 ", tags:%"PRId64, elem1.ts, elem1.tag.i64Key, elem2.ts, elem2.tag.i64Key);
H
hjxilinx 已提交
96 97
#endif

98 99
    int32_t res = tVariantCompare(&elem1.tag, &elem2.tag);
    if (res == -1 || (res == 0 && tsCompare(order, elem1.ts, elem2.ts))) {
H
hjxilinx 已提交
100 101 102 103 104
      if (!tsBufNextPos(pSupporter1->pTSBuf)) {
        break;
      }

      numOfInput1++;
105
    } else if ((res > 0) || (res == 0 && tsCompare(order, elem2.ts, elem1.ts))) {
H
hjxilinx 已提交
106 107 108 109 110 111 112 113 114 115
      if (!tsBufNextPos(pSupporter2->pTSBuf)) {
        break;
      }

      numOfInput2++;
    } else {
      /*
       * in case of stable query, limit/offset is not applied here. the limit/offset is applied to the
       * final results which is acquired after the secondry merge of in the client.
       */
116
      if (pLimit->offset == 0 || pQueryInfo->interval.interval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
H
Haojun Liao 已提交
117 118
        if (win->skey > elem1.ts) {
          win->skey = elem1.ts;
H
hjxilinx 已提交
119 120
        }
  
H
Haojun Liao 已提交
121 122
        if (win->ekey < elem1.ts) {
          win->ekey = elem1.ts;
H
hjxilinx 已提交
123 124
        }
        
125 126
        tsBufAppend(output1, elem1.vnode, &elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
        tsBufAppend(output2, elem2.vnode, &elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
H
hjxilinx 已提交
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
      } else {
        pLimit->offset -= 1;
      }

      if (!tsBufNextPos(pSupporter1->pTSBuf)) {
        break;
      }

      numOfInput1++;

      if (!tsBufNextPos(pSupporter2->pTSBuf)) {
        break;
      }

      numOfInput2++;
    }
  }

  /*
   * failed to set the correct ts order yet in two cases:
   * 1. only one element
   * 2. only one element for each tag.
   */
  if (output1->tsOrder == -1) {
151 152
    output1->tsOrder = TSDB_ORDER_ASC;
    output2->tsOrder = TSDB_ORDER_ASC;
H
hjxilinx 已提交
153 154 155 156 157
  }

  tsBufFlush(output1);
  tsBufFlush(output2);

H
Haojun Liao 已提交
158 159
  tsBufDestroy(pSupporter1->pTSBuf);
  tsBufDestroy(pSupporter2->pTSBuf);
H
hjxilinx 已提交
160

161
  tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " for secondary query after ts blocks "
H
Haojun Liao 已提交
162 163
           "intersecting, skey:%" PRId64 ", ekey:%" PRId64, pSql, numOfInput1, numOfInput2, output1->numOfTotal,
           win->skey, win->ekey);
H
hjxilinx 已提交
164 165 166 167 168

  return output1->numOfTotal;
}

// todo handle failed to create sub query
H
Haojun Liao 已提交
169
SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, int32_t index) {
170
  SJoinSupporter* pSupporter = calloc(1, sizeof(SJoinSupporter));
H
hjxilinx 已提交
171 172 173 174 175 176 177 178 179
  if (pSupporter == NULL) {
    return NULL;
  }

  pSupporter->pObj = pSql;

  pSupporter->subqueryIndex = index;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
  
180
  memcpy(&pSupporter->interval, &pQueryInfo->interval, sizeof(pSupporter->interval));
H
hjxilinx 已提交
181 182 183
  pSupporter->limit = pQueryInfo->limit;

  STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, index);
184
  pSupporter->uid = pTableMetaInfo->pTableMeta->id.uid;
H
hjxilinx 已提交
185 186
  assert (pSupporter->uid != 0);

S
slguan 已提交
187
  taosGetTmpfilePath("join-", pSupporter->path);
H
hjxilinx 已提交
188 189
  pSupporter->f = fopen(pSupporter->path, "w");

H
Haojun Liao 已提交
190
  // todo handle error
H
hjxilinx 已提交
191 192 193 194 195 196 197
  if (pSupporter->f == NULL) {
    tscError("%p failed to create tmp file:%s, reason:%s", pSql, pSupporter->path, strerror(errno));
  }

  return pSupporter;
}

H
Haojun Liao 已提交
198
static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
H
hjxilinx 已提交
199 200 201 202
  if (pSupporter == NULL) {
    return;
  }

H
hjxilinx 已提交
203 204 205 206 207 208 209
  if (pSupporter->exprList != NULL) {
    tscSqlExprInfoDestroy(pSupporter->exprList);
  }
  
  if (pSupporter->colList != NULL) {
    tscColumnListDestroy(pSupporter->colList);
  }
H
hjxilinx 已提交
210

H
hjxilinx 已提交
211
  tscFieldInfoClear(&pSupporter->fieldsInfo);
H
hjxilinx 已提交
212 213 214 215

  if (pSupporter->f != NULL) {
    fclose(pSupporter->f);
    unlink(pSupporter->path);
H
hjxilinx 已提交
216
    pSupporter->f = NULL;
H
hjxilinx 已提交
217 218
  }

S
Shengliang Guan 已提交
219
  taosTFree(pSupporter->pIdTagList);
H
hjxilinx 已提交
220 221 222 223 224 225 226 227 228 229
  tscTagCondRelease(&pSupporter->tagCond);
  free(pSupporter);
}

/*
 * need the secondary query process
 * In case of count(ts)/count(*)/spread(ts) query, that are only applied to
 * primary timestamp column , the secondary query is not necessary
 *
 */
H
Haojun Liao 已提交
230
static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
231 232 233
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  
  for (int32_t i = 0; i < numOfCols; ++i) {
234 235
    SColumn* base = taosArrayGet(pQueryInfo->colList, i);
    if (base->colIndex.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
H
hjxilinx 已提交
236 237 238 239 240 241 242 243 244 245
      return true;
    }
  }

  return false;
}

/*
 * launch secondary stage query to fetch the result that contains timestamp in set
 */
H
Haojun Liao 已提交
246
static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
H
hjxilinx 已提交
247
  int32_t         numOfSub = 0;
248
  SJoinSupporter* pSupporter = NULL;
H
hjxilinx 已提交
249
  
H
Haojun Liao 已提交
250
  //If the columns are not involved in the final select clause, the corresponding query will not be issued.
H
Haojun Liao 已提交
251
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
252
    pSupporter = pSql->pSubs[i]->param;
H
hjxilinx 已提交
253
    if (taosArrayGetSize(pSupporter->exprList) > 0) {
H
hjxilinx 已提交
254 255 256 257 258 259 260
      ++numOfSub;
    }
  }
  
  assert(numOfSub > 0);
  
  // scan all subquery, if one sub query has only ts, ignore it
H
Haojun Liao 已提交
261
  tscDebug("%p start to launch secondary subqueries, %d out of %d needs to query", pSql, numOfSub, pSql->subState.numOfSub);
H
hjxilinx 已提交
262

H
hjxilinx 已提交
263
  //the subqueries that do not actually launch the secondary query to virtual node is set as completed.
H
Haojun Liao 已提交
264
  SSubqueryState* pState = &pSql->subState;
H
Haojun Liao 已提交
265
  pState->numOfRemain = numOfSub;
H
Haojun Liao 已提交
266

H
hjxilinx 已提交
267 268
  bool success = true;
  
H
Haojun Liao 已提交
269
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
270 271 272 273 274
    SSqlObj *pPrevSub = pSql->pSubs[i];
    pSql->pSubs[i] = NULL;
    
    pSupporter = pPrevSub->param;
  
H
hjxilinx 已提交
275
    if (taosArrayGetSize(pSupporter->exprList) == 0) {
276
      tscDebug("%p subIndex: %d, no need to launch query, ignore it", pSql, i);
H
hjxilinx 已提交
277 278
    
      tscDestroyJoinSupporter(pSupporter);
279
      taos_free_result(pPrevSub);
H
hjxilinx 已提交
280 281 282 283 284 285 286 287 288 289
    
      pSql->pSubs[i] = NULL;
      continue;
    }
  
    SQueryInfo *pSubQueryInfo = tscGetQueryInfoDetail(&pPrevSub->cmd, 0);
    STSBuf *pTSBuf = pSubQueryInfo->tsBuf;
    pSubQueryInfo->tsBuf = NULL;
  
    // free result for async object will also free sqlObj
H
hjxilinx 已提交
290
    assert(tscSqlExprNumOfExprs(pSubQueryInfo) == 1); // ts_comp query only requires one resutl columns
H
hjxilinx 已提交
291 292
    taos_free_result(pPrevSub);
  
293
    SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL);
H
hjxilinx 已提交
294 295 296 297 298
    if (pNew == NULL) {
      tscDestroyJoinSupporter(pSupporter);
      success = false;
      break;
    }
B
Bomin Zhang 已提交
299

H
hjxilinx 已提交
300 301 302 303 304
    tscClearSubqueryInfo(&pNew->cmd);
    pSql->pSubs[i] = pNew;
  
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
    pQueryInfo->tsBuf = pTSBuf;  // transfer the ownership of timestamp comp-z data to the new created object
B
Bomin Zhang 已提交
305

H
hjxilinx 已提交
306
    // set the second stage sub query for join process
H
hjxilinx 已提交
307
    TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE);
H
Haojun Liao 已提交
308

309
    memcpy(&pQueryInfo->interval, &pSupporter->interval, sizeof(pQueryInfo->interval));
B
Bomin Zhang 已提交
310

H
hjxilinx 已提交
311
    tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond);
B
Bomin Zhang 已提交
312

H
hjxilinx 已提交
313 314 315
    pQueryInfo->colList = pSupporter->colList;
    pQueryInfo->exprList = pSupporter->exprList;
    pQueryInfo->fieldsInfo = pSupporter->fieldsInfo;
H
Haojun Liao 已提交
316

H
hjxilinx 已提交
317 318 319
    pSupporter->exprList = NULL;
    pSupporter->colList = NULL;
    memset(&pSupporter->fieldsInfo, 0, sizeof(SFieldInfo));
H
hjxilinx 已提交
320 321
  
    SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
H
Haojun Liao 已提交
322
    assert(pNew->subState.numOfSub == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
H
hjxilinx 已提交
323
  
H
hjxilinx 已提交
324
    tscFieldInfoUpdateOffset(pNewQueryInfo);
H
hjxilinx 已提交
325 326 327 328 329 330 331 332 333
  
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
  
    /*
     * When handling the projection query, the offset value will be modified for table-table join, which is changed
     * during the timestamp intersection.
     */
    pSupporter->limit = pQueryInfo->limit;
    pNewQueryInfo->limit = pSupporter->limit;
H
Haojun Liao 已提交
334 335 336 337 338

    SColumnIndex index = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
    SSchema* s = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, 0);

    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, 0);
H
Haojun Liao 已提交
339 340
    int16_t funcId = pExpr->functionId;

H
Haojun Liao 已提交
341
    if ((pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) ||
H
Haojun Liao 已提交
342 343 344 345 346
        (funcId != TSDB_FUNC_TS && funcId != TSDB_FUNC_TS_DUMMY && funcId != TSDB_FUNC_PRJ)) {

      int16_t functionId = tscIsProjectionQuery(pQueryInfo)? TSDB_FUNC_PRJ : TSDB_FUNC_TS;

      tscAddSpecialColumnForSelect(pQueryInfo, 0, functionId, &index, s, TSDB_COL_NORMAL);
H
Haojun Liao 已提交
347 348
      tscPrintSelectClause(pNew, 0);
      tscFieldInfoUpdateOffset(pNewQueryInfo);
H
Haojun Liao 已提交
349 350 351 352

      pExpr = tscSqlExprGet(pQueryInfo, 0);
    }

353
    // set the join condition tag column info, todo extract method
H
Haojun Liao 已提交
354 355
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
      assert(pQueryInfo->tagCond.joinInfo.hasJoin);
356
      int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
H
Haojun Liao 已提交
357

358
      // set the tag column id for executor to extract correct tag value
H
Haojun Liao 已提交
359 360
      pExpr->param[0].i64Key = colId;
      pExpr->numOfParams = 1;
H
Haojun Liao 已提交
361 362
    }

363
    size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
S
TD-1057  
Shengliang Guan 已提交
364
    tscDebug("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s",
H
Haojun Liao 已提交
365 366
             pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, taosArrayGetSize(pNewQueryInfo->exprList),
             numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, pTableMetaInfo->name);
H
hjxilinx 已提交
367 368 369 370
  }
  
  //prepare the subqueries object failed, abort
  if (!success) {
371
    pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
372
    tscError("%p failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d", pSql,
H
Haojun Liao 已提交
373
        pSql->subState.numOfSub, pSql->res.code);
H
hjxilinx 已提交
374
    freeJoinSubqueryObj(pSql);
H
hjxilinx 已提交
375 376 377 378
    
    return pSql->res.code;
  }
  
H
Haojun Liao 已提交
379
  for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
Haojun Liao 已提交
380
    if (pSql->pSubs[i] == NULL) {
H
hjxilinx 已提交
381 382
      continue;
    }
H
Haojun Liao 已提交
383

H
Haojun Liao 已提交
384
    tscDoQuery(pSql->pSubs[i]);
H
hjxilinx 已提交
385 386 387 388 389
  }

  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
390
void freeJoinSubqueryObj(SSqlObj* pSql) {
H
Haojun Liao 已提交
391
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
392 393 394 395 396
    SSqlObj* pSub = pSql->pSubs[i];
    if (pSub == NULL) {
      continue;
    }
    
397
    SJoinSupporter* p = pSub->param;
H
hjxilinx 已提交
398
    tscDestroyJoinSupporter(p);
H
hjxilinx 已提交
399

H
hjxilinx 已提交
400 401
    if (pSub->res.code == TSDB_CODE_SUCCESS) {
      taos_free_result(pSub);
H
hjxilinx 已提交
402 403 404
    }
  }

H
Haojun Liao 已提交
405
  pSql->subState.numOfSub = 0;
H
hjxilinx 已提交
406 407
}

408
static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
H
Haojun Liao 已提交
409
  assert(pSqlObj->subState.numOfRemain > 0);
H
Haojun Liao 已提交
410

H
Haojun Liao 已提交
411
  if (atomic_sub_fetch_32(&pSqlObj->subState.numOfRemain, 1) <= 0) {
H
hjxilinx 已提交
412
    tscError("%p all subquery return and query failed, global code:%d", pSqlObj, pSqlObj->res.code);
H
hjxilinx 已提交
413
    freeJoinSubqueryObj(pSqlObj);
H
hjxilinx 已提交
414 415 416 417
  }
}

// update the query time range according to the join results on timestamp
H
Haojun Liao 已提交
418 419 420
static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) {
  assert(pQueryInfo->window.skey <= win->skey && pQueryInfo->window.ekey >= win->ekey);
  pQueryInfo->window = *win;
H
hjxilinx 已提交
421 422
}

weixin_48148422's avatar
weixin_48148422 已提交
423
int32_t tscCompareTidTags(const void* p1, const void* p2) {
H
Haojun Liao 已提交
424 425
  const STidTags* t1 = (const STidTags*) varDataVal(p1);
  const STidTags* t2 = (const STidTags*) varDataVal(p2);
426 427
  
  if (t1->vgId != t2->vgId) {
weixin_48148422's avatar
weixin_48148422 已提交
428 429
    return (t1->vgId > t2->vgId) ? 1 : -1;
  }
430

weixin_48148422's avatar
weixin_48148422 已提交
431 432
  if (t1->tid != t2->tid) {
    return (t1->tid > t2->tid) ? 1 : -1;
433
  }
weixin_48148422's avatar
weixin_48148422 已提交
434
  return 0;
435 436
}

H
Haojun Liao 已提交
437
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables) {
H
Haojun Liao 已提交
438 439
  SArray*   result = taosArrayInit(4, sizeof(SVgroupTableInfo));
  SArray*   vgTables = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
440 441
  STidTags* prev = NULL;

H
Haojun Liao 已提交
442 443 444
  size_t numOfTables = taosArrayGetSize(tables);
  for (size_t i = 0; i < numOfTables; i++) {
    STidTags* tt = taosArrayGet(tables, i);
weixin_48148422's avatar
weixin_48148422 已提交
445

H
Haojun Liao 已提交
446
    if (prev == NULL || tt->vgId != prev->vgId) {
447
      SVgroupsInfo* pvg = pTableMetaInfo->vgroupList;
weixin_48148422's avatar
weixin_48148422 已提交
448

H
Haojun Liao 已提交
449 450 451
      SVgroupTableInfo info = {{0}};
      for (int32_t m = 0; m < pvg->numOfVgroups; ++m) {
        if (tt->vgId == pvg->vgroups[m].vgId) {
H
Haojun Liao 已提交
452
          tscSCMVgroupInfoCopy(&info.vgInfo, &pvg->vgroups[m]);
453 454 455
          break;
        }
      }
456
      assert(info.vgInfo.numOfEps != 0);
weixin_48148422's avatar
weixin_48148422 已提交
457

H
Haojun Liao 已提交
458
      vgTables = taosArrayInit(4, sizeof(STableIdInfo));
weixin_48148422's avatar
weixin_48148422 已提交
459
      info.itemList = vgTables;
H
Haojun Liao 已提交
460
      taosArrayPush(result, &info);
461
    }
weixin_48148422's avatar
weixin_48148422 已提交
462

463
    tscDebug("%p tid:%d, uid:%"PRIu64",vgId:%d added for vnode query", pSql, tt->tid, tt->uid, tt->vgId)
H
Haojun Liao 已提交
464 465
    STableIdInfo item = {.uid = tt->uid, .tid = tt->tid, .key = INT64_MIN};
    taosArrayPush(vgTables, &item);
weixin_48148422's avatar
weixin_48148422 已提交
466
    prev = tt;
467
  }
weixin_48148422's avatar
weixin_48148422 已提交
468 469

  pTableMetaInfo->pVgroupTables = result;
H
Haojun Liao 已提交
470
  pTableMetaInfo->vgroupIndex = 0;
471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494
}

static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* pParent) {
  SSqlCmd* pCmd = &pSql->cmd;
  tscClearSubqueryInfo(pCmd);
  tscFreeSqlResult(pSql);
  
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  
  tscInitQueryInfo(pQueryInfo);

  TSDB_QUERY_CLEAR_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
  TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
  
  pCmd->command = TSDB_SQL_SELECT;
  pSql->fp = tscJoinQueryCallback;
  
  SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
  
  SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
  tscAddSpecialColumnForSelect(pQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL);
  
  // set the tags value for ts_comp function
H
Haojun Liao 已提交
495 496
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0);
497
    int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
H
Haojun Liao 已提交
498 499 500 501
    pExpr->param->i64Key = tagColId;
    pExpr->numOfParams = 1;
  }

502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517
  // add the filter tag column
  if (pSupporter->colList != NULL) {
    size_t s = taosArrayGetSize(pSupporter->colList);
    
    for (int32_t i = 0; i < s; ++i) {
      SColumn *pCol = taosArrayGetP(pSupporter->colList, i);
      
      if (pCol->numOfFilters > 0) {  // copy to the pNew->cmd.colList if it is filtered.
        SColumn *p = tscColumnClone(pCol);
        taosArrayPush(pQueryInfo->colList, &p);
      }
    }
  }
  
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  
518
  tscDebug(
H
Haojun Liao 已提交
519
      "%p subquery:%p tableIndex:%d, vgroupIndex:%d, numOfVgroups:%d, type:%d, ts_comp query to retrieve timestamps, "
S
TD-1057  
Shengliang Guan 已提交
520
      "numOfExpr:%" PRIzu ", colList:%" PRIzu ", numOfOutputFields:%d, name:%s",
H
Haojun Liao 已提交
521 522
      pParent, pSql, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pQueryInfo->type,
      tscSqlExprNumOfExprs(pQueryInfo), numOfCols, pQueryInfo->fieldsInfo.numOfOutput, pTableMetaInfo->name);
523 524 525 526
  
  tscProcessSql(pSql);
}

H
Haojun Liao 已提交
527
static bool checkForDuplicateTagVal(SQueryInfo* pQueryInfo, SJoinSupporter* p1, SSqlObj* pPSqlObj) {
H
Haojun Liao 已提交
528 529 530 531 532 533 534 535 536
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);

  SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);// todo: tags mismatch, tags not completed
  SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, 0);
  SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];

  for(int32_t i = 1; i < p1->num; ++i) {
    STidTags* prev = (STidTags*) varDataVal(p1->pIdTagList + (i - 1) * p1->tagSize);
    STidTags* p = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize);
537
    assert(prev->vgId >= 1 && p->vgId >= 1);
H
Haojun Liao 已提交
538 539

    if (doCompare(prev->tag, p->tag, pColSchema->type, pColSchema->bytes) == 0) {
H
Haojun Liao 已提交
540 541
      tscError("%p join tags have same value for different table, free all sub SqlObj and quit", pPSqlObj);
      pPSqlObj->res.code = TSDB_CODE_QRY_DUP_JOIN_KEY;
H
Haojun Liao 已提交
542 543 544 545 546 547 548
      return false;
    }
  }

  return true;
}

549
static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray** s1, SArray** s2) {
550
  tscDebug("%p all subqueries retrieve <tid, tags> complete, do tags match", pParentSql);
H
Haojun Liao 已提交
551 552 553 554 555 556 557 558

  SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
  SJoinSupporter* p2 = pParentSql->pSubs[1]->param;

  qsort(p1->pIdTagList, p1->num, p1->tagSize, tscCompareTidTags);
  qsort(p2->pIdTagList, p2->num, p2->tagSize, tscCompareTidTags);

  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
559
  int16_t tagColId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
H
Haojun Liao 已提交
560 561 562

  SSchema* pColSchema = tscGetTableColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);

H
Haojun Liao 已提交
563 564 565
  // int16_t for padding
  *s1 = taosArrayInit(p1->num, p1->tagSize - sizeof(int16_t));
  *s2 = taosArrayInit(p2->num, p2->tagSize - sizeof(int16_t));
H
Haojun Liao 已提交
566

H
Haojun Liao 已提交
567
  if (!(checkForDuplicateTagVal(pQueryInfo, p1, pParentSql) && checkForDuplicateTagVal(pQueryInfo, p2, pParentSql))) {
568
    return TSDB_CODE_QRY_DUP_JOIN_KEY;
H
Haojun Liao 已提交
569 570 571 572 573 574
  }

  int32_t i = 0, j = 0;
  while(i < p1->num && j < p2->num) {
    STidTags* pp1 = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize);
    STidTags* pp2 = (STidTags*) varDataVal(p2->pIdTagList + j * p2->tagSize);
575
    assert(pp1->tid != 0 && pp2->tid != 0);
H
Haojun Liao 已提交
576 577 578

    int32_t ret = doCompare(pp1->tag, pp2->tag, pColSchema->type, pColSchema->bytes);
    if (ret == 0) {
579
      tscDebug("%p tag matched, vgId:%d, val:%d, tid:%d, uid:%"PRIu64", tid:%d, uid:%"PRIu64, pParentSql, pp1->vgId,
H
Haojun Liao 已提交
580 581 582 583 584 585 586 587 588 589 590 591
               *(int*) pp1->tag, pp1->tid, pp1->uid, pp2->tid, pp2->uid);

      taosArrayPush(*s1, pp1);
      taosArrayPush(*s2, pp2);
      j++;
      i++;
    } else if (ret > 0) {
      j++;
    } else {
      i++;
    }
  }
592 593

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
594 595
}

H
Haojun Liao 已提交
596
static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
597
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
598

H
hjxilinx 已提交
599
  SSqlObj* pParentSql = pSupporter->pObj;
H
Haojun Liao 已提交
600

H
hjxilinx 已提交
601 602
  SSqlObj* pSql = (SSqlObj*)tres;
  SSqlCmd* pCmd = &pSql->cmd;
H
Haojun Liao 已提交
603 604 605
  SSqlRes* pRes = &pSql->res;

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
Haojun Liao 已提交
606
  assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY));
H
Haojun Liao 已提交
607

H
Haojun Liao 已提交
608 609 610
  // check for the error code firstly
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    // todo retry if other subqueries are not failed
H
Haojun Liao 已提交
611

H
Haojun Liao 已提交
612 613
    assert(numOfRows < 0 && numOfRows == taos_errno(pSql));
    tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
H
Haojun Liao 已提交
614

H
Haojun Liao 已提交
615 616
    pParentSql->res.code = numOfRows;
    quitAllSubquery(pParentSql, pSupporter);
H
Haojun Liao 已提交
617

H
Haojun Liao 已提交
618 619 620
    tscQueueAsyncRes(pParentSql);
    return;
  }
H
Haojun Liao 已提交
621

H
Haojun Liao 已提交
622 623
  // keep the results in memory
  if (numOfRows > 0) {
624
    size_t validLen = (size_t)(pSupporter->tagSize * pRes->numOfRows);
H
Haojun Liao 已提交
625
    size_t length = pSupporter->totalLen + validLen;
H
Haojun Liao 已提交
626

H
Haojun Liao 已提交
627 628 629 630
    // todo handle memory error
    char* tmp = realloc(pSupporter->pIdTagList, length);
    if (tmp == NULL) {
      tscError("%p failed to malloc memory", pSql);
H
Haojun Liao 已提交
631

H
Haojun Liao 已提交
632 633
      pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
      quitAllSubquery(pParentSql, pSupporter);
H
Haojun Liao 已提交
634

H
Haojun Liao 已提交
635 636 637
      tscQueueAsyncRes(pParentSql);
      return;
    }
H
Haojun Liao 已提交
638

H
Haojun Liao 已提交
639
    pSupporter->pIdTagList = tmp;
H
Haojun Liao 已提交
640

H
Haojun Liao 已提交
641
    memcpy(pSupporter->pIdTagList + pSupporter->totalLen, pRes->data, validLen);
S
Shengliang Guan 已提交
642 643
    pSupporter->totalLen += (int32_t)validLen;
    pSupporter->num += (int32_t)pRes->numOfRows;
H
Haojun Liao 已提交
644

H
Haojun Liao 已提交
645 646 647 648 649 650
    // query not completed, continue to retrieve tid + tag tuples
    if (!pRes->completed) {
      taos_fetch_rows_a(tres, tidTagRetrieveCallback, param);
      return;
    }
  }
H
Haojun Liao 已提交
651

H
Haojun Liao 已提交
652 653 654 655
  // data in current vnode has all returned to client, try next vnode if exits
  // <tid + tag> tuples have been retrieved to client, try <tid + tag> tuples from the next vnode
  if (hasMoreVnodesToTry(pSql)) {
    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
656

H
Haojun Liao 已提交
657 658 659
    int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
    pTableMetaInfo->vgroupIndex += 1;
    assert(pTableMetaInfo->vgroupIndex < totalVgroups);
H
Haojun Liao 已提交
660

661
    tscDebug("%p tid_tag from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%d",
H
Haojun Liao 已提交
662
             pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, pSupporter->num);
H
Haojun Liao 已提交
663

H
Haojun Liao 已提交
664 665
    pCmd->command = TSDB_SQL_SELECT;
    tscResetForNextRetrieve(&pSql->res);
H
Haojun Liao 已提交
666

H
Haojun Liao 已提交
667 668 669 670 671
    // set the callback function
    pSql->fp = tscJoinQueryCallback;
    tscProcessSql(pSql);
    return;
  }
H
Haojun Liao 已提交
672

H
Haojun Liao 已提交
673 674
  // no data exists in next vnode, mark the <tid, tags> query completed
  // only when there is no subquery exits any more, proceeds to get the intersect of the <tid, tags> tuple sets.
H
Haojun Liao 已提交
675
  if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) {
H
Haojun Liao 已提交
676 677
    return;
  }
H
Haojun Liao 已提交
678

H
Haojun Liao 已提交
679
  SArray *s1 = NULL, *s2 = NULL;
680 681 682 683 684
  int32_t code = getIntersectionOfTableTuple(pQueryInfo, pParentSql, &s1, &s2);
  if (code != TSDB_CODE_SUCCESS) {
    freeJoinSubqueryObj(pParentSql);
    pParentSql->res.code = code;
    tscQueueAsyncRes(pParentSql);
685 686 687

    taosArrayDestroy(s1);
    taosArrayDestroy(s2);
688 689 690
    return;
  }

H
Haojun Liao 已提交
691
  if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) {  // no results,return.
692
    tscDebug("%p tag intersect does not generated qualified tables for join, free all sub SqlObj and quit", pParentSql);
H
Haojun Liao 已提交
693
    freeJoinSubqueryObj(pParentSql);
H
Haojun Liao 已提交
694

695 696
    // set no result command
    pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
H
Haojun Liao 已提交
697
    (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
B
Bomin Zhang 已提交
698 699 700 701
  } else {
    // proceed to for ts_comp query
    SSqlCmd* pSubCmd1 = &pParentSql->pSubs[0]->cmd;
    SSqlCmd* pSubCmd2 = &pParentSql->pSubs[1]->cmd;
H
Haojun Liao 已提交
702

B
Bomin Zhang 已提交
703 704 705
    SQueryInfo*     pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0);
    STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo1, 0);
    tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo1, s1);
H
Haojun Liao 已提交
706

B
Bomin Zhang 已提交
707 708 709
    SQueryInfo*     pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0);
    STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0);
    tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2);
H
Haojun Liao 已提交
710

H
Haojun Liao 已提交
711 712
    pParentSql->subState.numOfSub = 2;
    pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub;
H
Haojun Liao 已提交
713

H
Haojun Liao 已提交
714
    for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) {
B
Bomin Zhang 已提交
715 716 717
      SSqlObj* sub = pParentSql->pSubs[m];
      issueTSCompQuery(sub, sub->param, pParentSql);
    }
H
Haojun Liao 已提交
718
  }
B
Bomin Zhang 已提交
719 720 721

  taosArrayDestroy(s1);
  taosArrayDestroy(s2);
H
Haojun Liao 已提交
722
}
H
Haojun Liao 已提交
723

H
Haojun Liao 已提交
724 725
static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
726

H
Haojun Liao 已提交
727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745
  SSqlObj* pParentSql = pSupporter->pObj;

  SSqlObj* pSql = (SSqlObj*)tres;
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  assert(!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE));

  // check for the error code firstly
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    // todo retry if other subqueries are not failed yet 
    assert(numOfRows < 0 && numOfRows == taos_errno(pSql));
    tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);

    pParentSql->res.code = numOfRows;
    quitAllSubquery(pParentSql, pSupporter);

    tscQueueAsyncRes(pParentSql);
746 747
    return;
  }
H
Haojun Liao 已提交
748

H
Haojun Liao 已提交
749
  if (numOfRows > 0) {  // write the compressed timestamp to disk file
750
    fwrite(pRes->data, (size_t)pRes->numOfRows, 1, pSupporter->f);
H
Haojun Liao 已提交
751 752 753 754 755 756 757 758 759 760
    fclose(pSupporter->f);
    pSupporter->f = NULL;

    STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true);
    if (pBuf == NULL) {  // in error process, close the fd
      tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows);

      pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
      tscQueueAsyncRes(pParentSql);

H
hjxilinx 已提交
761 762
      return;
    }
763

H
Haojun Liao 已提交
764
    if (pSupporter->pTSBuf == NULL) {
765
      tscDebug("%p create tmp file for ts block:%s, size:%d bytes", pSql, pBuf->path, numOfRows);
H
Haojun Liao 已提交
766 767 768 769
      pSupporter->pTSBuf = pBuf;
    } else {
      assert(pQueryInfo->numOfTables == 1);  // for subquery, only one
      STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hjxilinx 已提交
770

H
Haojun Liao 已提交
771
      tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex);
H
Haojun Liao 已提交
772
      tsBufDestroy(pBuf);
H
Haojun Liao 已提交
773
    }
H
hjxilinx 已提交
774

H
Haojun Liao 已提交
775 776
    // continue to retrieve ts-comp data from vnode
    if (!pRes->completed) {
S
slguan 已提交
777
      taosGetTmpfilePath("ts-join", pSupporter->path);
H
Haojun Liao 已提交
778
      pSupporter->f = fopen(pSupporter->path, "w");
S
Shengliang Guan 已提交
779
      pRes->row = (int32_t)pRes->numOfRows;
H
hjxilinx 已提交
780

H
Haojun Liao 已提交
781 782
      taos_fetch_rows_a(tres, tsCompRetrieveCallback, param);
      return;
H
hjxilinx 已提交
783
    }
H
Haojun Liao 已提交
784
  }
H
Haojun Liao 已提交
785

H
Haojun Liao 已提交
786 787
  if (hasMoreVnodesToTry(pSql)) {
    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
788

H
Haojun Liao 已提交
789 790 791
    int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
    pTableMetaInfo->vgroupIndex += 1;
    assert(pTableMetaInfo->vgroupIndex < totalVgroups);
H
Haojun Liao 已提交
792

793
    tscDebug("%p results from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%" PRId64,
H
Haojun Liao 已提交
794 795
             pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups,
             pRes->numOfClauseTotal);
H
Haojun Liao 已提交
796

H
Haojun Liao 已提交
797 798
    pCmd->command = TSDB_SQL_SELECT;
    tscResetForNextRetrieve(&pSql->res);
H
Haojun Liao 已提交
799

H
Haojun Liao 已提交
800
    assert(pSupporter->f == NULL);
S
slguan 已提交
801
    taosGetTmpfilePath("ts-join", pSupporter->path);
H
Haojun Liao 已提交
802 803 804
    
    // TODO check for failure
    pSupporter->f = fopen(pSupporter->path, "w");
S
Shengliang Guan 已提交
805
    pRes->row = (int32_t)pRes->numOfRows;
H
Haojun Liao 已提交
806

H
Haojun Liao 已提交
807 808 809 810 811
    // set the callback function
    pSql->fp = tscJoinQueryCallback;
    tscProcessSql(pSql);
    return;
  }
H
Haojun Liao 已提交
812

H
Haojun Liao 已提交
813
  if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) {
H
Haojun Liao 已提交
814 815
    return;
  }
H
hjxilinx 已提交
816

817
  tscDebug("%p all subquery retrieve ts complete, do ts block intersect", pParentSql);
H
hjxilinx 已提交
818

H
Haojun Liao 已提交
819 820 821
  // proceeds to launched secondary query to retrieve final data
  SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
  SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
H
Haojun Liao 已提交
822

H
Haojun Liao 已提交
823 824 825
  STimeWindow win = TSWINDOW_INITIALIZER;
  int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &win);
  if (num <= 0) {  // no result during ts intersect
826
    tscDebug("%p no results generated in ts intersection, free all sub SqlObj and quit", pParentSql);
H
Haojun Liao 已提交
827
    freeJoinSubqueryObj(pParentSql);
828 829 830

    // set no result command
    pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
H
Haojun Liao 已提交
831
    (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
H
Haojun Liao 已提交
832 833 834 835 836 837
    return;
  }

  // launch the query the retrieve actual results from vnode along with the filtered timestamp
  SQueryInfo* pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
  updateQueryTimeRange(pPQueryInfo, &win);
H
Haojun Liao 已提交
838
  tscLaunchRealSubqueries(pParentSql);
H
Haojun Liao 已提交
839
}
H
Haojun Liao 已提交
840

H
Haojun Liao 已提交
841 842
static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfRows) {
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
843

H
Haojun Liao 已提交
844
  SSqlObj* pParentSql = pSupporter->pObj;
H
Haojun Liao 已提交
845

H
Haojun Liao 已提交
846 847 848
  SSqlObj* pSql = (SSqlObj*)tres;
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;
H
Haojun Liao 已提交
849

H
Haojun Liao 已提交
850 851 852
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(numOfRows == taos_errno(pSql));
H
hjxilinx 已提交
853

H
Haojun Liao 已提交
854 855
    pParentSql->res.code = numOfRows;
    tscError("%p retrieve failed, index:%d, code:%s", pSql, pSupporter->subqueryIndex, tstrerror(numOfRows));
H
Haojun Liao 已提交
856 857 858

    tscQueueAsyncRes(pParentSql);
    return;
H
Haojun Liao 已提交
859
  }
H
Haojun Liao 已提交
860

H
Haojun Liao 已提交
861 862 863
  if (numOfRows >= 0) {
    pRes->numOfTotal += pRes->numOfRows;
  }
H
Haojun Liao 已提交
864

H
Haojun Liao 已提交
865
  SSubqueryState* pState = &pParentSql->subState;
H
Haojun Liao 已提交
866 867 868
  if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) {
    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
    assert(pQueryInfo->numOfTables == 1);
H
Haojun Liao 已提交
869

H
Haojun Liao 已提交
870 871 872
    // for projection query, need to try next vnode if current vnode is exhausted
    if ((++pTableMetaInfo->vgroupIndex) < pTableMetaInfo->vgroupList->numOfVgroups) {
      pState->numOfRemain = 1;
H
Haojun Liao 已提交
873
      pState->numOfSub = 1;
H
Haojun Liao 已提交
874 875 876 877 878 879 880 881 882

      pSql->cmd.command = TSDB_SQL_SELECT;
      pSql->fp = tscJoinQueryCallback;
      tscProcessSql(pSql);

      return;
    }
  }

H
Haojun Liao 已提交
883 884
  if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) {
    tscDebug("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pParentSql->subState.numOfRemain, pState->numOfSub);
H
Haojun Liao 已提交
885 886 887
    return;
  }

H
Haojun Liao 已提交
888
  tscDebug("%p all %d secondary subqueries retrieval completed, code:%d", tres, pState->numOfSub, pParentSql->res.code);
H
Haojun Liao 已提交
889 890 891 892 893 894 895

  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    freeJoinSubqueryObj(pParentSql);
    pParentSql->res.completed = true;
  }

  // update the records for each subquery in parent sql object.
H
Haojun Liao 已提交
896
  for (int32_t i = 0; i < pState->numOfSub; ++i) {
H
Haojun Liao 已提交
897 898
    if (pParentSql->pSubs[i] == NULL) {
      continue;
H
hjxilinx 已提交
899
    }
H
Haojun Liao 已提交
900 901 902

    SSqlRes* pRes1 = &pParentSql->pSubs[i]->res;
    pRes1->numOfClauseTotal += pRes1->numOfRows;
H
hjxilinx 已提交
903
  }
H
Haojun Liao 已提交
904 905 906

  // data has retrieved to client, build the join results
  tscBuildResFromSubqueries(pParentSql);
H
hjxilinx 已提交
907 908
}

909
static SJoinSupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch) {
H
hjxilinx 已提交
910
  int32_t notInvolved = 0;
911
  SJoinSupporter* pSupporter = NULL;
H
Haojun Liao 已提交
912
  SSubqueryState* pState = &pSql->subState;
H
hjxilinx 已提交
913
  
H
Haojun Liao 已提交
914
  for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
915 916 917
    if (pSql->pSubs[i] == NULL) {
      notInvolved++;
    } else {
918
      pSupporter = (SJoinSupporter*)pSql->pSubs[i]->param;
H
hjxilinx 已提交
919 920 921
    }
  }
  
H
Haojun Liao 已提交
922
  pState->numOfRemain = numOfFetch;
H
hjxilinx 已提交
923 924 925 926
  return pSupporter;
}

void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
H
Haojun Liao 已提交
927
  assert(pSql->subState.numOfSub >= 1);
H
hjxilinx 已提交
928
  
H
hjxilinx 已提交
929 930
  int32_t numOfFetch = 0;
  bool hasData = true;
H
Haojun Liao 已提交
931
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
932 933 934
    // if the subquery is NULL, it does not involved in the final result generation
    SSqlObj* pSub = pSql->pSubs[i];
    if (pSub == NULL) {
H
hjxilinx 已提交
935 936 937
      continue;
    }
    
H
hjxilinx 已提交
938 939 940
    SSqlRes *pRes = &pSub->res;
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0);

H
Haojun Liao 已提交
941 942 943 944 945 946
    if (!tscHasReachLimitation(pQueryInfo, pRes)) {
      if (pRes->row >= pRes->numOfRows) {
        hasData = false;

        if (!pRes->completed) {
          numOfFetch++;
H
Haojun Liao 已提交
947
        }
H
hjxilinx 已提交
948
      }
H
Haojun Liao 已提交
949 950 951 952 953
    } else {  // has reach the limitation, no data anymore
      if (pRes->row >= pRes->numOfRows) {
        hasData = false;
        break;
      }
H
hjxilinx 已提交
954
    }
H
Haojun Liao 已提交
955
  }
H
hjxilinx 已提交
956

H
hjxilinx 已提交
957 958 959 960 961 962 963 964 965 966 967 968 969 970
  // has data remains in client side, and continue to return data to app
  if (hasData) {
    tscBuildResFromSubqueries(pSql);
    return;
  } else if (numOfFetch <= 0) {
    pSql->res.completed = true;
    freeJoinSubqueryObj(pSql);
    
    if (pSql->res.code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, 0);
    } else {
      tscQueueAsyncRes(pSql);
    }
    
H
hjxilinx 已提交
971 972 973 974
    return;
  }

  // TODO multi-vnode retrieve for projection query with limitation has bugs, since the global limiation is not handled
975
  tscDebug("%p retrieve data from %d subqueries", pSql, numOfFetch);
976
  SJoinSupporter* pSupporter = tscUpdateSubqueryStatus(pSql, numOfFetch);
H
hjxilinx 已提交
977
  
H
Haojun Liao 已提交
978
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
979 980 981 982 983 984 985 986
    SSqlObj* pSql1 = pSql->pSubs[i];
    if (pSql1 == NULL) {
      continue;
    }
    
    SSqlRes* pRes1 = &pSql1->res;
    SSqlCmd* pCmd1 = &pSql1->cmd;

987
    pSupporter = (SJoinSupporter*)pSql1->param;
H
hjxilinx 已提交
988 989 990 991 992 993 994 995

    // wait for all subqueries completed
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd1, 0);
    assert(pRes1->numOfRows >= 0 && pQueryInfo->numOfTables == 1);

    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
    
    if (pRes1->row >= pRes1->numOfRows) {
996
      tscDebug("%p subquery:%p retrieve data from vnode, subquery:%d, vgroupIndex:%d", pSql, pSql1,
H
hjxilinx 已提交
997
               pSupporter->subqueryIndex, pTableMetaInfo->vgroupIndex);
H
hjxilinx 已提交
998 999

      tscResetForNextRetrieve(pRes1);
H
Haojun Liao 已提交
1000
      pSql1->fp = joinRetrieveFinalResCallback;
H
hjxilinx 已提交
1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015

      if (pCmd1->command < TSDB_SQL_LOCAL) {
        pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
      }

      tscProcessSql(pSql1);
    }
  }
}

// all subqueries return, set the result output index
void tscSetupOutputColumnIndex(SSqlObj* pSql) {
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;

H
Haojun Liao 已提交
1016
  tscDebug("%p all subquery response, retrieve data for subclause:%d", pSql, pCmd->clauseIndex);
H
hjxilinx 已提交
1017

H
Haojun Liao 已提交
1018
  // the column transfer support struct has been built
H
hjxilinx 已提交
1019
  if (pRes->pColumnIndex != NULL) {
H
Haojun Liao 已提交
1020
    return;
H
hjxilinx 已提交
1021 1022 1023 1024
  }

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

S
Shengliang Guan 已提交
1025
  int32_t numOfExprs = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
H
Haojun Liao 已提交
1026
  pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * numOfExprs);
H
Haojun Liao 已提交
1027 1028 1029 1030
  if (pRes->pColumnIndex == NULL) {
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return;
  }
H
Haojun Liao 已提交
1031 1032

  for (int32_t i = 0; i < numOfExprs; ++i) {
H
hjxilinx 已提交
1033 1034 1035 1036 1037
    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);

    int32_t tableIndexOfSub = -1;
    for (int32_t j = 0; j < pQueryInfo->numOfTables; ++j) {
      STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, j);
1038
      if (pTableMetaInfo->pTableMeta->id.uid == pExpr->uid) {
H
hjxilinx 已提交
1039 1040 1041 1042 1043 1044 1045 1046 1047 1048
        tableIndexOfSub = j;
        break;
      }
    }

    assert(tableIndexOfSub >= 0 && tableIndexOfSub < pQueryInfo->numOfTables);
    
    SSqlCmd* pSubCmd = &pSql->pSubs[tableIndexOfSub]->cmd;
    SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(pSubCmd, 0);
    
H
Haojun Liao 已提交
1049 1050
    size_t numOfSubExpr = taosArrayGetSize(pSubQueryInfo->exprList);
    for (int32_t k = 0; k < numOfSubExpr; ++k) {
H
hjxilinx 已提交
1051 1052 1053 1054 1055 1056 1057
      SSqlExpr* pSubExpr = tscSqlExprGet(pSubQueryInfo, k);
      if (pExpr->functionId == pSubExpr->functionId && pExpr->colInfo.colId == pSubExpr->colInfo.colId) {
        pRes->pColumnIndex[i] = (SColumnIndex){.tableIndex = tableIndexOfSub, .columnIndex = k};
        break;
      }
    }
  }
H
Haojun Liao 已提交
1058 1059 1060 1061

  // restore the offset value for super table query in case of final result.
  tscRestoreSQLFuncForSTableQuery(pQueryInfo);
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
1062 1063 1064 1065
}

void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
  SSqlObj* pSql = (SSqlObj*)tres;
H
Haojun Liao 已提交
1066

1067
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
1068 1069
  SSqlObj* pParentSql = pSupporter->pObj;

H
hjxilinx 已提交
1070
  // There is only one subquery and table for each subquery.
H
hjxilinx 已提交
1071
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
1072
  assert(pQueryInfo->numOfTables == 1 && pSql->cmd.numOfClause == 1);
H
hjxilinx 已提交
1073

H
Haojun Liao 已提交
1074 1075 1076 1077 1078
  // retrieve actual query results from vnode during the second stage join subquery
  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code, pParentSql->res.code);
    quitAllSubquery(pParentSql, pSupporter);
    tscQueueAsyncRes(pParentSql);
H
hjxilinx 已提交
1079

H
Haojun Liao 已提交
1080 1081
    return;
  }
H
hjxilinx 已提交
1082

H
Haojun Liao 已提交
1083 1084 1085
  // TODO here retry is required, not directly returns to client
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(taos_errno(pSql) == code);
H
hjxilinx 已提交
1086

1087
    tscError("%p abort query, code:%s, global code:%s", pSql, tstrerror(code), tstrerror(pParentSql->res.code));
H
Haojun Liao 已提交
1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112
    pParentSql->res.code = code;

    quitAllSubquery(pParentSql, pSupporter);
    tscQueueAsyncRes(pParentSql);

    return;
  }

  // retrieve <tid, tag> tuples from vnode
  if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) {
    pSql->fp = tidTagRetrieveCallback;
    pSql->cmd.command = TSDB_SQL_FETCH;
    tscProcessSql(pSql);
    return;
  }

  // retrieve ts_comp info from vnode
  if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
    pSql->fp = tsCompRetrieveCallback;
    pSql->cmd.command = TSDB_SQL_FETCH;
    tscProcessSql(pSql);
    return;
  }

  // wait for the other subqueries response from vnode
H
Haojun Liao 已提交
1113
  if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) {
H
Haojun Liao 已提交
1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124
    return;
  }

  tscSetupOutputColumnIndex(pParentSql);
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);

  /**
   * if the query is a continue query (vgroupIndex > 0 for projection query) for next vnode, do the retrieval of
   * data instead of returning to its invoker
   */
  if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
H
Haojun Liao 已提交
1125
    pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub;  // reset the record value
H
Haojun Liao 已提交
1126 1127 1128 1129 1130 1131 1132 1133

    pSql->fp = joinRetrieveFinalResCallback;  // continue retrieve data
    pSql->cmd.command = TSDB_SQL_FETCH;
    tscProcessSql(pSql);
  } else {  // first retrieve from vnode during the secondary stage sub-query
    // set the command flag must be after the semaphore been correctly set.
    if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
      (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
H
hjxilinx 已提交
1134
    } else {
H
Haojun Liao 已提交
1135
      tscQueueAsyncRes(pParentSql);
H
hjxilinx 已提交
1136 1137 1138 1139 1140 1141 1142
    }
  }
}

/////////////////////////////////////////////////////////////////////////////////////////
static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code);

1143
static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
H
hjxilinx 已提交
1144

H
Haojun Liao 已提交
1145
int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) {
H
hjxilinx 已提交
1146 1147 1148 1149
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  
  pSql->res.qhandle = 0x1;
H
Haojun Liao 已提交
1150 1151
  assert(pSql->res.numOfRows == 0);

H
hjxilinx 已提交
1152
  if (pSql->pSubs == NULL) {
H
Haojun Liao 已提交
1153
    pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
H
hjxilinx 已提交
1154
    if (pSql->pSubs == NULL) {
1155
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
1156 1157 1158
    }
  }
  
1159
  SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL);
H
hjxilinx 已提交
1160
  if (pNew == NULL) {
1161
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
1162 1163
  }
  
H
Haojun Liao 已提交
1164 1165
  pSql->pSubs[pSql->subState.numOfRemain++] = pNew;
  assert(pSql->subState.numOfRemain <= pSql->subState.numOfSub);
H
hjxilinx 已提交
1166 1167 1168 1169 1170 1171 1172 1173
  
  if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
    addGroupInfoForSubquery(pSql, pNew, 0, tableIndex);
    
    // refactor as one method
    SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
    assert(pNewQueryInfo != NULL);
    
1174 1175 1176 1177 1178 1179 1180
    // update the table index
    size_t num = taosArrayGetSize(pNewQueryInfo->colList);
    for (int32_t i = 0; i < num; ++i) {
      SColumn* pCol = taosArrayGetP(pNewQueryInfo->colList, i);
      pCol->colIndex.tableIndex = 0;
    }
    
H
hjxilinx 已提交
1181 1182 1183 1184 1185 1186 1187
    pSupporter->colList = pNewQueryInfo->colList;
    pNewQueryInfo->colList = NULL;
    
    pSupporter->exprList = pNewQueryInfo->exprList;
    pNewQueryInfo->exprList = NULL;
    
    pSupporter->fieldsInfo = pNewQueryInfo->fieldsInfo;
H
hjxilinx 已提交
1188
  
H
hjxilinx 已提交
1189 1190
    // this data needs to be transfer to support struct
    memset(&pNewQueryInfo->fieldsInfo, 0, sizeof(SFieldInfo));
H
Haojun Liao 已提交
1191 1192 1193
    if (tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond) != 0) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
1194

H
hjxilinx 已提交
1195
    pNew->cmd.numOfCols = 0;
1196
    pNewQueryInfo->interval.interval = 0;
H
Haojun Liao 已提交
1197 1198 1199 1200 1201
    pSupporter->limit = pNewQueryInfo->limit;

    pNewQueryInfo->limit.limit = -1;
    pNewQueryInfo->limit.offset = 0;

H
hjxilinx 已提交
1202 1203 1204
    // backup the data and clear it in the sqlcmd object
    memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));
    
H
hjxilinx 已提交
1205
    tscInitQueryInfo(pNewQueryInfo);
H
hjxilinx 已提交
1206 1207
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
    
weixin_48148422's avatar
weixin_48148422 已提交
1208
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // return the tableId & tag
H
Haojun Liao 已提交
1209
      SColumnIndex colIndex = {0};
H
Haojun Liao 已提交
1210 1211 1212 1213

      STagCond* pTagCond = &pSupporter->tagCond;
      assert(pTagCond->joinInfo.hasJoin);

1214
      int32_t tagColId = tscGetJoinTagColIdByUid(pTagCond, pTableMetaInfo->pTableMeta->id.uid);
H
Haojun Liao 已提交
1215
      SSchema* s = tscGetTableColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
H
Haojun Liao 已提交
1216

1217 1218 1219 1220 1221
      // get the tag colId column index
      int32_t numOfTags = tscGetNumOfTags(pTableMetaInfo->pTableMeta);
      SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
      for(int32_t i = 0; i < numOfTags; ++i) {
        if (pSchema[i].colId == tagColId) {
H
Haojun Liao 已提交
1222
          colIndex.columnIndex = i;
1223 1224 1225 1226
          break;
        }
      }

H
Haojun Liao 已提交
1227
      int16_t bytes = 0;
H
Haojun Liao 已提交
1228
      int16_t type  = 0;
H
Haojun Liao 已提交
1229 1230
      int32_t inter = 0;

H
Haojun Liao 已提交
1231
      getResultDataInfo(s->type, s->bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0);
H
Haojun Liao 已提交
1232

S
Shengliang Guan 已提交
1233
      SSchema s1 = {.colId = s->colId, .type = (uint8_t)type, .bytes = bytes};
H
Haojun Liao 已提交
1234
      pSupporter->tagSize = s1.bytes;
1235
      assert(isValidDataType(s1.type) && s1.bytes > 0);
H
Haojun Liao 已提交
1236

1237 1238
      // set get tags query type
      TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
H
Haojun Liao 已提交
1239
      tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &colIndex, &s1, TSDB_COL_TAG);
1240
      size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
1241
  
1242
      tscDebug(
1243
          "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to tid_tag query to retrieve (tableId, tags), "
S
TD-1057  
Shengliang Guan 已提交
1244
          "exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, tagIndex:%d, name:%s",
1245
          pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo),
H
Haojun Liao 已提交
1246
          numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, colIndex.columnIndex, pNewQueryInfo->pTableMetaInfo[0]->name);
1247 1248
    } else {
      SSchema      colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
H
Haojun Liao 已提交
1249 1250
      SColumnIndex colIndex = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
      tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &colIndex, &colSchema, TSDB_COL_NORMAL);
1251 1252 1253 1254

      // set the tags value for ts_comp function
      SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0);

H
Haojun Liao 已提交
1255
      if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
1256
        int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
H
Haojun Liao 已提交
1257 1258 1259
        pExpr->param->i64Key = tagColId;
        pExpr->numOfParams = 1;
      }
1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276

      // add the filter tag column
      if (pSupporter->colList != NULL) {
        size_t s = taosArrayGetSize(pSupporter->colList);

        for (int32_t i = 0; i < s; ++i) {
          SColumn *pCol = taosArrayGetP(pSupporter->colList, i);

          if (pCol->numOfFilters > 0) {  // copy to the pNew->cmd.colList if it is filtered.
            SColumn *p = tscColumnClone(pCol);
            taosArrayPush(pNewQueryInfo->colList, &p);
          }
        }
      }

      size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);

1277
      tscDebug(
B
Bomin Zhang 已提交
1278
          "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%u, transfer to ts_comp query to retrieve timestamps, "
S
TD-1057  
Shengliang Guan 已提交
1279
          "exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s",
1280 1281 1282
          pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo),
          numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, pNewQueryInfo->pTableMetaInfo[0]->name);
    }
H
hjxilinx 已提交
1283
  } else {
H
hjxilinx 已提交
1284
    assert(0);
H
hjxilinx 已提交
1285 1286 1287 1288
    SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
    pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
  }

H
Haojun Liao 已提交
1289
  return TSDB_CODE_SUCCESS;
H
hjxilinx 已提交
1290 1291
}

H
Haojun Liao 已提交
1292
void tscHandleMasterJoinQuery(SSqlObj* pSql) {
H
hjxilinx 已提交
1293
  SSqlCmd* pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1294 1295
  SSqlRes* pRes = &pSql->res;

H
hjxilinx 已提交
1296 1297
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0);
1298

H
Haojun Liao 已提交
1299
  int32_t code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1300
  pSql->subState.numOfSub = pQueryInfo->numOfTables;
H
Haojun Liao 已提交
1301

H
Haojun Liao 已提交
1302 1303
  bool hasEmptySub = false;

1304
  tscDebug("%p start subquery, total:%d", pSql, pQueryInfo->numOfTables);
H
hjxilinx 已提交
1305
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
Haojun Liao 已提交
1306
    SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i);
H
hjxilinx 已提交
1307 1308 1309
    
    if (pSupporter == NULL) {  // failed to create support struct, abort current query
      tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i);
H
Haojun Liao 已提交
1310 1311
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      goto _error;
H
hjxilinx 已提交
1312 1313
    }
    
H
Haojun Liao 已提交
1314
    code = tscCreateJoinSubquery(pSql, i, pSupporter);
H
hjxilinx 已提交
1315 1316
    if (code != TSDB_CODE_SUCCESS) {  // failed to create subquery object, quit query
      tscDestroyJoinSupporter(pSupporter);
H
Haojun Liao 已提交
1317 1318 1319 1320 1321 1322 1323
      goto _error;
    }

    SSqlObj* pSub = pSql->pSubs[i];
    STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSub->cmd, 0, 0);
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) && (pTableMetaInfo->vgroupList->numOfVgroups == 0)) {
      hasEmptySub = true;
H
hjxilinx 已提交
1324 1325 1326
      break;
    }
  }
H
Haojun Liao 已提交
1327

H
Haojun Liao 已提交
1328 1329 1330 1331 1332
  if (hasEmptySub) {  // at least one subquery is empty, do nothing and return
    freeJoinSubqueryObj(pSql);
    pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
    (*pSql->fp)(pSql->param, pSql, 0);
  } else {
H
Haojun Liao 已提交
1333
    for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
Haojun Liao 已提交
1334 1335
      SSqlObj* pSub = pSql->pSubs[i];
      if ((code = tscProcessSql(pSub)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1336
        pSql->subState.numOfRemain = i - 1;  // the already sent request will continue and do not go to the error process routine
H
Haojun Liao 已提交
1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348
        break;
      }
    }

    pSql->cmd.command = TSDB_SQL_TABLE_JOIN_RETRIEVE;
  }

  return;

  _error:
  pRes->code = code;
  tscQueueAsyncRes(pSql);
H
hjxilinx 已提交
1349 1350
}

H
Haojun Liao 已提交
1351 1352
static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) {
  assert(numOfSubs <= pSql->subState.numOfSub && numOfSubs >= 0);
H
hjxilinx 已提交
1353 1354 1355 1356 1357 1358 1359
  
  for(int32_t i = 0; i < numOfSubs; ++i) {
    SSqlObj* pSub = pSql->pSubs[i];
    assert(pSub != NULL);
    
    SRetrieveSupport* pSupport = pSub->param;
    
S
Shengliang Guan 已提交
1360 1361
    taosTFree(pSupport->localBuffer);
    taosTFree(pSupport);
H
hjxilinx 已提交
1362
    
1363
    taos_free_result(pSub);
H
hjxilinx 已提交
1364 1365 1366 1367 1368 1369 1370 1371
  }
}

int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
  
  // pRes->code check only serves in launching metric sub-queries
1372
  if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
H
hjxilinx 已提交
1373
    pCmd->command = TSDB_SQL_RETRIEVE_LOCALMERGE;  // enable the abort of kill super table function.
H
hjxilinx 已提交
1374 1375 1376 1377 1378 1379 1380
    return pRes->code;
  }
  
  tExtMemBuffer **  pMemoryBuf = NULL;
  tOrderDescriptor *pDesc = NULL;
  SColumnModel *    pModel = NULL;
  
H
Haojun Liao 已提交
1381
  pRes->qhandle = 0x1;  // hack the qhandle check
H
hjxilinx 已提交
1382
  
1383
  const uint32_t nBufferSize = (1u << 16);  // 64KB
H
hjxilinx 已提交
1384
  
H
Haojun Liao 已提交
1385
  SQueryInfo     *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
1386
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
1387 1388 1389 1390
  SSubqueryState *pState = &pSql->subState;

  pState->numOfSub = pTableMetaInfo->vgroupList->numOfVgroups;
  assert(pState->numOfSub > 0);
H
hjxilinx 已提交
1391 1392 1393
  
  int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize);
  if (ret != 0) {
1394
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
1395
    tscQueueAsyncRes(pSql);
S
Shengliang Guan 已提交
1396
    taosTFree(pMemoryBuf);
H
hjxilinx 已提交
1397
    return ret;
H
hjxilinx 已提交
1398 1399
  }
  
H
Haojun Liao 已提交
1400
  pSql->pSubs = calloc(pState->numOfSub, POINTER_BYTES);
H
Haojun Liao 已提交
1401

H
Haojun Liao 已提交
1402
  tscDebug("%p retrieved query data from %d vnode(s)", pSql, pState->numOfSub);
H
Haojun Liao 已提交
1403

H
Haojun Liao 已提交
1404
  if (pSql->pSubs == NULL) {
H
Haojun Liao 已提交
1405 1406
    taosTFree(pSql->pSubs);
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1407
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub);
H
Haojun Liao 已提交
1408 1409 1410 1411 1412

    tscQueueAsyncRes(pSql);
    return ret;
  }

H
Haojun Liao 已提交
1413
  pState->numOfRemain = pState->numOfSub;
H
hjxilinx 已提交
1414 1415 1416
  pRes->code = TSDB_CODE_SUCCESS;
  
  int32_t i = 0;
H
Haojun Liao 已提交
1417
  for (; i < pState->numOfSub; ++i) {
H
hjxilinx 已提交
1418 1419 1420 1421 1422 1423 1424 1425
    SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport));
    if (trs == NULL) {
      tscError("%p failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
      break;
    }
    
    trs->pExtMemBuffer = pMemoryBuf;
    trs->pOrderDescriptor = pDesc;
H
Haojun Liao 已提交
1426

H
hjxilinx 已提交
1427 1428 1429
    trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage));
    if (trs->localBuffer == NULL) {
      tscError("%p failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
S
Shengliang Guan 已提交
1430
      taosTFree(trs);
H
hjxilinx 已提交
1431 1432 1433
      break;
    }
    
H
Haojun Liao 已提交
1434 1435
    trs->subqueryIndex  = i;
    trs->pParentSql     = pSql;
H
hjxilinx 已提交
1436 1437
    trs->pFinalColModel = pModel;
    
1438
    SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL);
H
hjxilinx 已提交
1439 1440
    if (pNew == NULL) {
      tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
S
Shengliang Guan 已提交
1441 1442
      taosTFree(trs->localBuffer);
      taosTFree(trs);
H
hjxilinx 已提交
1443 1444 1445 1446 1447 1448 1449
      break;
    }
    
    // todo handle multi-vnode situation
    if (pQueryInfo->tsBuf) {
      SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
      pNewQueryInfo->tsBuf = tsBufClone(pQueryInfo->tsBuf);
H
Haojun Liao 已提交
1450
      assert(pNewQueryInfo->tsBuf != NULL);
H
hjxilinx 已提交
1451 1452
    }
    
1453
    tscDebug("%p sub:%p create subquery success. orderOfSub:%d", pSql, pNew, trs->subqueryIndex);
H
hjxilinx 已提交
1454 1455
  }
  
H
Haojun Liao 已提交
1456
  if (i < pState->numOfSub) {
H
hjxilinx 已提交
1457
    tscError("%p failed to prepare subquery structure and launch subqueries", pSql);
1458
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
1459
    
H
Haojun Liao 已提交
1460
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub);
H
Haojun Liao 已提交
1461
    doCleanupSubqueries(pSql, i);
H
hjxilinx 已提交
1462 1463 1464
    return pRes->code;   // free all allocated resource
  }
  
1465
  if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
H
Haojun Liao 已提交
1466
    tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub);
H
Haojun Liao 已提交
1467
    doCleanupSubqueries(pSql, i);
H
hjxilinx 已提交
1468 1469 1470
    return pRes->code;
  }
  
H
Haojun Liao 已提交
1471
  for(int32_t j = 0; j < pState->numOfSub; ++j) {
H
hjxilinx 已提交
1472 1473 1474
    SSqlObj* pSub = pSql->pSubs[j];
    SRetrieveSupport* pSupport = pSub->param;
    
1475
    tscDebug("%p sub:%p launch subquery, orderOfSub:%d.", pSql, pSub, pSupport->subqueryIndex);
H
hjxilinx 已提交
1476 1477
    tscProcessSql(pSub);
  }
H
Haojun Liao 已提交
1478

H
hjxilinx 已提交
1479 1480 1481
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
1482 1483
static void tscFreeRetrieveSup(SSqlObj *pSql) {
  SRetrieveSupport *trsupport = pSql->param;
1484

H
Haojun Liao 已提交
1485 1486 1487 1488 1489 1490 1491
  void* p = atomic_val_compare_exchange_ptr(&pSql->param, trsupport, 0);
  if (p == NULL) {
    tscDebug("%p retrieve supp already released", pSql);
    return;
  }

  tscDebug("%p start to free subquery supp obj:%p", pSql, trsupport);
H
Haojun Liao 已提交
1492 1493
//  int32_t  index = trsupport->subqueryIndex;
//  SSqlObj *pParentSql = trsupport->pParentSql;
1494

H
Haojun Liao 已提交
1495
//  assert(pSql == pParentSql->pSubs[index]);
S
Shengliang Guan 已提交
1496 1497
  taosTFree(trsupport->localBuffer);
  taosTFree(trsupport);
H
hjxilinx 已提交
1498 1499
}

1500
static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows);
1501
static void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows);
H
hjxilinx 已提交
1502

H
Haojun Liao 已提交
1503
static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t code) {
H
hjxilinx 已提交
1504
// set no disk space error info
1505
  tscError("sub:%p failed to flush data to disk, reason:%s", tres, tstrerror(code));
H
Haojun Liao 已提交
1506
  SSqlObj* pParentSql = trsupport->pParentSql;
H
Haojun Liao 已提交
1507 1508

  pParentSql->res.code = code;
H
hjxilinx 已提交
1509
  trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
H
Haojun Liao 已提交
1510
  tscHandleSubqueryError(trsupport, tres, pParentSql->res.code);
H
hjxilinx 已提交
1511 1512
}

H
Haojun Liao 已提交
1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527
/*
 * current query failed, and the retry count is less than the available
 * count, retry query clear previous retrieved data, then launch a new sub query
 */
static int32_t tscReissueSubquery(SRetrieveSupport *trsupport, SSqlObj *pSql, int32_t code) {
  SSqlObj *pParentSql = trsupport->pParentSql;
  int32_t  subqueryIndex = trsupport->subqueryIndex;

  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
  SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];

  tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]);

  // clear local saved number of results
  trsupport->localBuffer->num = 0;
1528
  tscError("%p sub:%p retrieve/query failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSql, pSql,
H
Haojun Liao 已提交
1529 1530
           tstrerror(code), subqueryIndex, trsupport->numOfRetry);

1531
  SSqlObj *pNew = tscCreateSTableSubquery(trsupport->pParentSql, trsupport, pSql);
H
Haojun Liao 已提交
1532
  if (pNew == NULL) {
1533 1534
    tscError("%p sub:%p failed to create new subquery due to error:%s, abort retry, vgId:%d, orderOfSub:%d",
             trsupport->pParentSql, pSql, tstrerror(terrno), pVgroup->vgId, trsupport->subqueryIndex);
H
Haojun Liao 已提交
1535

1536
    pParentSql->res.code = terrno;
H
Haojun Liao 已提交
1537 1538 1539 1540 1541
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;

    return pParentSql->res.code;
  }

1542 1543 1544 1545 1546
  int32_t ret = tscProcessSql(pNew);

  // if failed to process sql, let following code handle the pSql
  if (ret == TSDB_CODE_SUCCESS) {
    taos_free_result(pSql);
H
Haojun Liao 已提交
1547 1548 1549
    return ret;
  } else {
    return ret;
1550
  }
H
Haojun Liao 已提交
1551 1552
}

1553
void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
H
Haojun Liao 已提交
1554 1555 1556 1557 1558
  // it has been freed already
  if (pSql->param != trsupport || pSql->param == NULL) {
    return;
  }

H
Haojun Liao 已提交
1559
  SSqlObj *pParentSql = trsupport->pParentSql;
H
hjxilinx 已提交
1560 1561 1562
  int32_t  subqueryIndex = trsupport->subqueryIndex;
  
  assert(pSql != NULL);
H
Haojun Liao 已提交
1563

H
Haojun Liao 已提交
1564 1565
  SSubqueryState* pState = &pParentSql->subState;
  assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0);
H
Haojun Liao 已提交
1566

1567
  // retrieved in subquery failed. OR query cancelled in retrieve phase.
H
Haojun Liao 已提交
1568 1569
  if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) {

H
hjxilinx 已提交
1570 1571
    /*
     * kill current sub-query connection, which may retrieve data from vnodes;
1572
     * Here we get: pPObj->res.code == TSDB_CODE_TSC_QUERY_CANCELLED
H
hjxilinx 已提交
1573 1574 1575
     */
    pSql->res.numOfRows = 0;
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;  // disable retry efforts
1576 1577
    tscDebug("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%s", pParentSql, pSql,
             subqueryIndex, tstrerror(pParentSql->res.code));
H
hjxilinx 已提交
1578
  }
H
Haojun Liao 已提交
1579

H
hjxilinx 已提交
1580
  if (numOfRows >= 0) {  // current query is successful, but other sub query failed, still abort current query.
1581
    tscDebug("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pParentSql, pSql, numOfRows, subqueryIndex);
1582 1583
    tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%s", pParentSql, pSql,
             subqueryIndex, tstrerror(pParentSql->res.code));
H
hjxilinx 已提交
1584
  } else {
H
Haojun Liao 已提交
1585
    if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pParentSql->res.code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1586
      if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
1587 1588 1589
        return;
      }
    } else {  // reach the maximum retry count, abort
H
Haojun Liao 已提交
1590 1591 1592
      atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows);
      tscError("%p sub:%p retrieve failed,code:%s,orderOfSub:%d failed.no more retry,set global code:%s", pParentSql, pSql,
               tstrerror(numOfRows), subqueryIndex, tstrerror(pParentSql->res.code));
H
hjxilinx 已提交
1593 1594
    }
  }
H
Haojun Liao 已提交
1595

H
Haojun Liao 已提交
1596 1597
  int32_t remain = -1;
  if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) {
1598
    tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex,
H
Haojun Liao 已提交
1599
        pState->numOfSub - remain);
H
Haojun Liao 已提交
1600

H
Haojun Liao 已提交
1601
    tscFreeRetrieveSup(pSql);
S
Shengliang Guan 已提交
1602
    return;
H
hjxilinx 已提交
1603 1604 1605
  }
  
  // all subqueries are failed
H
Haojun Liao 已提交
1606
  tscError("%p retrieve from %d vnode(s) completed,code:%s.FAILED.", pParentSql, pState->numOfSub,
H
Haojun Liao 已提交
1607 1608
      tstrerror(pParentSql->res.code));

H
hjxilinx 已提交
1609 1610
  // release allocated resource
  tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel,
H
Haojun Liao 已提交
1611
                            pState->numOfSub);
H
hjxilinx 已提交
1612
  
H
Haojun Liao 已提交
1613
  tscFreeRetrieveSup(pSql);
1614

H
hjxilinx 已提交
1615
  // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
1616 1617 1618 1619 1620 1621 1622 1623 1624
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0);

  if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
    (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code);
  } else {  // regular super table query
    if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
      tscQueueAsyncRes(pParentSql);
    }
  }
H
hjxilinx 已提交
1625 1626
}

1627 1628
static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* pSql) {
  int32_t           idx = trsupport->subqueryIndex;
H
Haojun Liao 已提交
1629
  SSqlObj *         pParentSql = trsupport->pParentSql;
1630 1631
  tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
  
H
Haojun Liao 已提交
1632
  SSubqueryState* pState = &pParentSql->subState;
1633 1634
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  
1635 1636
  STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
  
1637
  // data in from current vnode is stored in cache and disk
S
Shengliang Guan 已提交
1638
  uint32_t numOfRowsFromSubquery = (uint32_t)(trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num);
H
Haojun Liao 已提交
1639 1640 1641
  SVgroupsInfo* vgroupsInfo = pTableMetaInfo->vgroupList;
  tscDebug("%p sub:%p all data retrieved from ep:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pParentSql, pSql,
           vgroupsInfo->vgroups[0].epAddr[0].fqdn, vgroupsInfo->vgroups[0].vgId, numOfRowsFromSubquery, idx);
1642 1643 1644 1645
  
  tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity);

#ifdef _DEBUG_VIEW
1646
  printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->num);
1647 1648
    SSrcColumnInfo colInfo[256] = {0};
    tscGetSrcColumnInfo(colInfo, pQueryInfo);
1649 1650
    tColModelDisplayEx(pDesc->pColumnModel, trsupport->localBuffer->data, trsupport->localBuffer->num,
                       trsupport->localBuffer->num, colInfo);
1651 1652
#endif
  
H
Haojun Liao 已提交
1653 1654 1655
  if (tsTotalTmpDirGB != 0 && tsAvailTmpDirectorySpace < tsReservedTmpDirectorySpace) {
    tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pParentSql, pSql,
             tsAvailTmpDirectorySpace, tsReservedTmpDirectorySpace);
S
Shengliang Guan 已提交
1656 1657
    tscAbortFurtherRetryRetrieval(trsupport, pSql, TSDB_CODE_TSC_NO_DISKSPACE);
    return;
1658 1659 1660
  }
  
  // each result for a vnode is ordered as an independant list,
H
Haojun Liao 已提交
1661
  // then used as an input of loser tree for disk-based merge
1662 1663
  int32_t code = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pQueryInfo->groupbyExpr.orderType);
  if (code != 0) { // set no disk space error info, and abort retry
S
Shengliang Guan 已提交
1664 1665
    tscAbortFurtherRetryRetrieval(trsupport, pSql, code);
    return;
1666 1667
  }
  
H
Haojun Liao 已提交
1668
  int32_t remain = -1;
H
Haojun Liao 已提交
1669
  if ((remain = atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1)) > 0) {
H
Haojun Liao 已提交
1670
    tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex,
H
Haojun Liao 已提交
1671
        pState->numOfSub - remain);
H
Haojun Liao 已提交
1672

H
Haojun Liao 已提交
1673
    tscFreeRetrieveSup(pSql);
S
Shengliang Guan 已提交
1674
    return;
1675 1676 1677 1678 1679
  }
  
  // all sub-queries are returned, start to local merge process
  pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage;
  
H
Haojun Liao 已提交
1680
  tscDebug("%p retrieve from %d vnodes completed.final NumOfRows:%" PRId64 ",start to build loser tree", pParentSql,
H
Haojun Liao 已提交
1681
           pState->numOfSub, pState->numOfRetrievedRows);
1682
  
H
Haojun Liao 已提交
1683
  SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0);
1684 1685
  tscClearInterpInfo(pPQueryInfo);
  
H
Haojun Liao 已提交
1686
  tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, trsupport->pFinalColModel, pParentSql);
H
Haojun Liao 已提交
1687
  tscDebug("%p build loser tree completed", pParentSql);
1688
  
H
Haojun Liao 已提交
1689 1690 1691
  pParentSql->res.precision = pSql->res.precision;
  pParentSql->res.numOfRows = 0;
  pParentSql->res.row = 0;
1692
  
H
Haojun Liao 已提交
1693
  tscFreeRetrieveSup(pSql);
H
Haojun Liao 已提交
1694

1695 1696 1697 1698 1699 1700 1701
  // set the command flag must be after the semaphore been correctly set.
  pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE;
  if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
    (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
  } else {
    tscQueueAsyncRes(pParentSql);
  }
1702 1703 1704
}

static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
H
Haojun Liao 已提交
1705 1706 1707 1708
  SSqlObj *pSql = (SSqlObj *)tres;
  assert(pSql != NULL);

  // this query has been freed already
H
hjxilinx 已提交
1709
  SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
H
Haojun Liao 已提交
1710 1711 1712 1713 1714 1715
  if (pSql->param == NULL || param == NULL) {
    tscDebug("%p already freed in dnodecallback", pSql);
    assert(pSql->res.code == TSDB_CODE_TSC_QUERY_CANCELLED);
    return;
  }

H
hjxilinx 已提交
1716
  tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
H
Haojun Liao 已提交
1717
  int32_t           idx   = trsupport->subqueryIndex;
H
Haojun Liao 已提交
1718
  SSqlObj *         pParentSql = trsupport->pParentSql;
H
Haojun Liao 已提交
1719

H
Haojun Liao 已提交
1720 1721
  SSubqueryState* pState = &pParentSql->subState;
  assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0);
H
hjxilinx 已提交
1722
  
H
Haojun Liao 已提交
1723
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
1724
  SCMVgroupInfo  *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
H
Haojun Liao 已提交
1725 1726 1727

  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
1728
    tscDebug("%p query cancelled/failed, sub:%p, vgId:%d, orderOfSub:%d, code:%s, global code:%s",
H
Haojun Liao 已提交
1729 1730 1731 1732 1733 1734 1735 1736 1737
             pParentSql, pSql, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(numOfRows), tstrerror(pParentSql->res.code));

    tscHandleSubqueryError(param, tres, numOfRows);
    return;
  }

  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(numOfRows == taos_errno(pSql));

H
Haojun Liao 已提交
1738 1739 1740 1741
    if (numOfRows == TSDB_CODE_TSC_QUERY_CANCELLED) {
      trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
    }

H
Haojun Liao 已提交
1742
    if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
1743
      tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(numOfRows), trsupport->numOfRetry);
H
Haojun Liao 已提交
1744 1745 1746 1747 1748

      if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) {
        return;
      }
    } else {
H
Haojun Liao 已提交
1749
      tscDebug("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(numOfRows));
H
Haojun Liao 已提交
1750 1751 1752 1753 1754
      atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows);  // set global code and abort
    }

    tscHandleSubqueryError(param, tres, numOfRows);
    return;
H
hjxilinx 已提交
1755 1756 1757 1758 1759 1760 1761 1762 1763
  }
  
  SSqlRes *   pRes = &pSql->res;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  
  if (numOfRows > 0) {
    assert(pRes->numOfRows == numOfRows);
    int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows);
    
1764 1765
    tscDebug("%p sub:%p retrieve numOfRows:%" PRId64 " totalNumOfRows:%" PRIu64 " from ep:%s, orderOfSub:%d", pParentSql, pSql,
             pRes->numOfRows, pState->numOfRetrievedRows, pSql->epSet.fqdn[pSql->epSet.inUse], idx);
1766

H
hjxilinx 已提交
1767
    if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
B
Bomin Zhang 已提交
1768
      tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64,
H
Haojun Liao 已提交
1769
               pParentSql, pSql, tsMaxNumOfOrderedResults, num);
S
Shengliang Guan 已提交
1770 1771
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_SORTED_RES_TOO_MANY);
      return;
H
hjxilinx 已提交
1772 1773 1774
    }

#ifdef _DEBUG_VIEW
1775
    printf("received data from vnode: %"PRIu64" rows\n", pRes->numOfRows);
H
hjxilinx 已提交
1776 1777 1778 1779 1780
    SSrcColumnInfo colInfo[256] = {0};

    tscGetSrcColumnInfo(colInfo, pQueryInfo);
    tColModelDisplayEx(pDesc->pColumnModel, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo);
#endif
1781
    
H
Haojun Liao 已提交
1782 1783 1784 1785
    // no disk space for tmp directory
    if (tsTotalTmpDirGB != 0 && tsAvailTmpDirectorySpace < tsReservedTmpDirectorySpace) {
      tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pParentSql, pSql,
               tsAvailTmpDirectorySpace, tsReservedTmpDirectorySpace);
S
Shengliang Guan 已提交
1786 1787
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_NO_DISKSPACE);
      return;
H
hjxilinx 已提交
1788 1789 1790
    }
    
    int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data,
S
Shengliang Guan 已提交
1791
                               (int32_t)pRes->numOfRows, pQueryInfo->groupbyExpr.orderType);
1792
    if (ret != 0) { // set no disk space error info, and abort retry
1793
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_NO_DISKSPACE);
1794 1795 1796 1797
    } else if (pRes->completed) {
      tscAllDataRetrievedFromDnode(trsupport, pSql);
    } else { // continue fetch data from dnode
      taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
H
hjxilinx 已提交
1798
    }
1799
    
1800 1801
  } else { // all data has been retrieved to client
    tscAllDataRetrievedFromDnode(trsupport, pSql);
H
hjxilinx 已提交
1802 1803 1804
  }
}

1805
static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
H
hjxilinx 已提交
1806 1807
  const int32_t table_index = 0;
  
1808
  SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, TSDB_SQL_SELECT, prevSqlObj);
H
hjxilinx 已提交
1809 1810
  if (pNew != NULL) {  // the sub query of two-stage super table query
    SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
1811

H
hjxilinx 已提交
1812
    pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
H
Haojun Liao 已提交
1813
    assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1 && trsupport->subqueryIndex < pSql->subState.numOfSub);
H
hjxilinx 已提交
1814
    
H
hjxilinx 已提交
1815
    // launch subquery for each vnode, so the subquery index equals to the vgroupIndex.
H
hjxilinx 已提交
1816
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index);
H
hjxilinx 已提交
1817
    pTableMetaInfo->vgroupIndex = trsupport->subqueryIndex;
H
hjxilinx 已提交
1818 1819 1820 1821 1822 1823 1824 1825
    
    pSql->pSubs[trsupport->subqueryIndex] = pNew;
  }
  
  return pNew;
}

void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
1826
  SRetrieveSupport *trsupport = (SRetrieveSupport *) param;
H
hjxilinx 已提交
1827
  
H
Haojun Liao 已提交
1828
  SSqlObj*  pParentSql = trsupport->pParentSql;
1829
  SSqlObj*  pSql = (SSqlObj *) tres;
1830

1831 1832
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  assert(pSql->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1);
H
hjxilinx 已提交
1833
  
1834
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
H
Haojun Liao 已提交
1835
  SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[trsupport->subqueryIndex];
H
Haojun Liao 已提交
1836

H
Haojun Liao 已提交
1837
  // stable query killed or other subquery failed, all query stopped
H
Haojun Liao 已提交
1838
  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
1839
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
H
Haojun Liao 已提交
1840
    tscError("%p query cancelled or failed, sub:%p, vgId:%d, orderOfSub:%d, code:%s, global code:%s",
H
Haojun Liao 已提交
1841 1842 1843 1844
        pParentSql, pSql, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(code), tstrerror(pParentSql->res.code));

    tscHandleSubqueryError(param, tres, code);
    return;
H
hjxilinx 已提交
1845 1846 1847
  }
  
  /*
H
Haojun Liao 已提交
1848
   * if a subquery on a vnode failed, all retrieve operations from vnode that occurs later
1849
   * than this one are actually not necessary, we simply call the tscRetrieveFromDnodeCallBack
H
hjxilinx 已提交
1850 1851
   * function to abort current and remain retrieve process.
   *
1852
   * NOTE: thread safe is required.
H
hjxilinx 已提交
1853
   */
H
Haojun Liao 已提交
1854 1855
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(code == taos_errno(pSql));
1856

H
Haojun Liao 已提交
1857
    if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
1858
      tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry);
H
Haojun Liao 已提交
1859
      if (tscReissueSubquery(trsupport, pSql, code) == TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
1860 1861
        return;
      }
1862
    } else {
H
Haojun Liao 已提交
1863
      tscError("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(code));
H
Haojun Liao 已提交
1864
      atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code);  // set global code and abort
1865
    }
H
Haojun Liao 已提交
1866 1867 1868 1869 1870

    tscHandleSubqueryError(param, tres, pParentSql->res.code);
    return;
  }

H
Haojun Liao 已提交
1871
  tscDebug("%p sub:%p query complete, ep:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSql, pSql,
1872
             pVgroup->epAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex);
H
Haojun Liao 已提交
1873 1874 1875 1876 1877

  if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode
    tscRetrieveFromDnodeCallBack(param, pSql, 0);
  } else {
    taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
H
hjxilinx 已提交
1878 1879 1880
  }
}

H
Haojun Liao 已提交
1881
static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) {
H
hjxilinx 已提交
1882 1883
  SInsertSupporter *pSupporter = (SInsertSupporter *)param;
  SSqlObj* pParentObj = pSupporter->pSql;
H
Haojun Liao 已提交
1884

H
Haojun Liao 已提交
1885
  // record the total inserted rows
H
Haojun Liao 已提交
1886 1887
  if (numOfRows > 0) {
    pParentObj->res.numOfRows += numOfRows;
H
Haojun Liao 已提交
1888 1889
  }

H
Haojun Liao 已提交
1890
  if (taos_errno(tres) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1891 1892 1893 1894
    SSqlObj* pSql = (SSqlObj*) tres;
    assert(pSql != NULL && pSql->res.code == numOfRows);
    
    pParentObj->res.code = pSql->res.code;
H
hjxilinx 已提交
1895
  }
H
Haojun Liao 已提交
1896

S
Shengliang Guan 已提交
1897
  taosTFree(pSupporter);
H
Haojun Liao 已提交
1898

H
Haojun Liao 已提交
1899
  if (atomic_sub_fetch_32(&pParentObj->subState.numOfRemain, 1) > 0) {
H
hjxilinx 已提交
1900 1901 1902
    return;
  }
  
1903
  tscDebug("%p Async insertion completed, total inserted:%" PRId64, pParentObj, pParentObj->res.numOfRows);
H
Haojun Liao 已提交
1904

H
hjxilinx 已提交
1905 1906
  // restore user defined fp
  pParentObj->fp = pParentObj->fetchFp;
1907 1908

  // todo remove this parameter in async callback function definition.
H
hjxilinx 已提交
1909
  // all data has been sent to vnode, call user function
S
Shengliang Guan 已提交
1910
  int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows;
1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924
  (*pParentObj->fp)(pParentObj->param, pParentObj, v);
}

/**
 * it is a subquery, so after parse the sql string, copy the submit block to payload of itself
 * @param pSql
 * @return
 */
int32_t tscHandleInsertRetry(SSqlObj* pSql) {
  assert(pSql != NULL && pSql->param != NULL);
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;

  SInsertSupporter* pSupporter = (SInsertSupporter*) pSql->param;
H
Haojun Liao 已提交
1925
  assert(pSupporter->index < pSupporter->pSql->subState.numOfSub);
1926 1927

  STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, pSupporter->index);
H
Haojun Liao 已提交
1928 1929 1930 1931 1932
  int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock);

  if ((pRes->code = code)!= TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return code;  // here the pSql may have been released already.
1933 1934 1935
  }

  return tscProcessSql(pSql);
H
hjxilinx 已提交
1936 1937 1938 1939
}

int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1940 1941
  SSqlRes *pRes = &pSql->res;

H
Haojun Liao 已提交
1942 1943
  pSql->subState.numOfSub = (uint16_t)taosArrayGetSize(pCmd->pDataBlocks);
  assert(pSql->subState.numOfSub > 0);
H
Haojun Liao 已提交
1944 1945

  pRes->code = TSDB_CODE_SUCCESS;
1946

H
Haojun Liao 已提交
1947 1948 1949
  // the number of already initialized subqueries
  int32_t numOfSub = 0;

H
Haojun Liao 已提交
1950 1951
  pSql->subState.numOfRemain = pSql->subState.numOfSub;
  pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
H
Haojun Liao 已提交
1952 1953 1954 1955
  if (pSql->pSubs == NULL) {
    goto _error;
  }

H
Haojun Liao 已提交
1956
  tscDebug("%p submit data to %d vnode(s)", pSql, pSql->subState.numOfSub);
1957

H
Haojun Liao 已提交
1958
  while(numOfSub < pSql->subState.numOfSub) {
H
Haojun Liao 已提交
1959
    SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter));
H
Haojun Liao 已提交
1960 1961 1962 1963
    if (pSupporter == NULL) {
      goto _error;
    }

1964 1965 1966 1967
    pSupporter->pSql   = pSql;
    pSupporter->index  = numOfSub;

    SSqlObj *pNew = createSimpleSubObj(pSql, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT);
H
hjxilinx 已提交
1968
    if (pNew == NULL) {
H
Haojun Liao 已提交
1969
      tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, numOfSub, strerror(errno));
H
Haojun Liao 已提交
1970
      goto _error;
H
hjxilinx 已提交
1971
    }
1972 1973 1974
  
    /*
     * assign the callback function to fetchFp to make sure that the error process function can restore
H
Haojun Liao 已提交
1975
     * the callback function (multiVnodeInsertFinalize) correctly.
1976 1977
     */
    pNew->fetchFp = pNew->fp;
H
Haojun Liao 已提交
1978
    pSql->pSubs[numOfSub] = pNew;
H
Haojun Liao 已提交
1979

1980 1981
    STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, numOfSub);
    pRes->code = tscCopyDataBlockToPayload(pNew, pTableDataBlock);
H
Haojun Liao 已提交
1982
    if (pRes->code == TSDB_CODE_SUCCESS) {
1983
      tscDebug("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, numOfSub);
1984
      numOfSub++;
H
Haojun Liao 已提交
1985
    } else {
H
Haojun Liao 已提交
1986
      tscDebug("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%s", pSql, numOfSub,
H
Haojun Liao 已提交
1987
               pSql->subState.numOfSub, tstrerror(pRes->code));
H
Haojun Liao 已提交
1988
      goto _error;
H
Haojun Liao 已提交
1989
    }
H
hjxilinx 已提交
1990 1991
  }
  
H
Haojun Liao 已提交
1992
  if (numOfSub < pSql->subState.numOfSub) {
H
hjxilinx 已提交
1993
    tscError("%p failed to prepare subObj structure and launch sub-insertion", pSql);
1994
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1995
    goto _error;
H
hjxilinx 已提交
1996
  }
H
Haojun Liao 已提交
1997

1998 1999
  pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);

H
Haojun Liao 已提交
2000 2001
  // use the local variable
  for (int32_t j = 0; j < numOfSub; ++j) {
H
hjxilinx 已提交
2002
    SSqlObj *pSub = pSql->pSubs[j];
2003
    tscDebug("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, j);
H
hjxilinx 已提交
2004 2005
    tscProcessSql(pSub);
  }
H
Haojun Liao 已提交
2006

H
hjxilinx 已提交
2007
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2008 2009 2010

  _error:
  return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
2011
}
H
hjxilinx 已提交
2012

H
Haojun Liao 已提交
2013 2014 2015
static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t columnIndex, int16_t* bytes) {
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

H
Haojun Liao 已提交
2016
  SInternalField* pInfo = (SInternalField*) TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, columnIndex);
H
Haojun Liao 已提交
2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030
  assert(pInfo->pSqlExpr != NULL);

  *bytes = pInfo->pSqlExpr->resBytes;
  char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows;

  return pData;
}

static void doBuildResFromSubqueries(SSqlObj* pSql) {
  SSqlRes* pRes = &pSql->res;

  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);

  int32_t numOfRes = INT32_MAX;
H
Haojun Liao 已提交
2031
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
Haojun Liao 已提交
2032 2033 2034 2035
    if (pSql->pSubs[i] == NULL) {
      continue;
    }

S
Shengliang Guan 已提交
2036
    numOfRes = (int32_t)(MIN(numOfRes, pSql->pSubs[i]->res.numOfRows));
H
Haojun Liao 已提交
2037 2038
  }

H
Haojun Liao 已提交
2039 2040 2041 2042
  if (numOfRes == 0) {
    return;
  }

H
Haojun Liao 已提交
2043
  int32_t totalSize = tscGetResRowLength(pQueryInfo->exprList);
H
Haojun Liao 已提交
2044 2045 2046 2047 2048 2049 2050 2051 2052 2053

  assert(numOfRes * totalSize > 0);
  char* tmp = realloc(pRes->pRsp, numOfRes * totalSize);
  if (tmp == NULL) {
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return;
  } else {
    pRes->pRsp = tmp;
  }

H
Haojun Liao 已提交
2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075
  pRes->data = pRes->pRsp;

  char* data = pRes->data;
  int16_t bytes = 0;

  size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
  for(int32_t i = 0; i < numOfExprs; ++i) {
    SColumnIndex* pIndex = &pRes->pColumnIndex[i];
    SSqlRes *pRes1 = &pSql->pSubs[pIndex->tableIndex]->res;
    SSqlCmd *pCmd1 = &pSql->pSubs[pIndex->tableIndex]->cmd;

    char* pData = getResultBlockPosition(pCmd1, pRes1, pIndex->columnIndex, &bytes);
    memcpy(data, pData, bytes * numOfRes);

    data += bytes * numOfRes;
    pRes1->row = numOfRes;
  }

  pRes->numOfRows = numOfRes;
  pRes->numOfClauseTotal += numOfRes;
}

H
hjxilinx 已提交
2076
void tscBuildResFromSubqueries(SSqlObj *pSql) {
H
Haojun Liao 已提交
2077 2078
  SSqlRes* pRes = &pSql->res;

H
hjxilinx 已提交
2079 2080 2081 2082
  if (pRes->code != TSDB_CODE_SUCCESS) {
    tscQueueAsyncRes(pSql);
    return;
  }
H
Haojun Liao 已提交
2083 2084 2085 2086

  if (pRes->tsrow == NULL) {
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);

H
hjxilinx 已提交
2087
    size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
H
Haojun Liao 已提交
2088 2089 2090 2091
    pRes->tsrow  = calloc(numOfExprs, POINTER_BYTES);
    pRes->buffer = calloc(numOfExprs, POINTER_BYTES);
    pRes->length = calloc(numOfExprs, sizeof(int32_t));

H
Haojun Liao 已提交
2092 2093 2094 2095 2096 2097
    if (pRes->tsrow == NULL || pRes->buffer == NULL || pRes->length == NULL) {
      pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      tscQueueAsyncRes(pSql);
      return;
    }

H
Haojun Liao 已提交
2098 2099 2100 2101
    tscRestoreSQLFuncForSTableQuery(pQueryInfo);
  }

  while (1) {
H
Haojun Liao 已提交
2102
    assert (pRes->row >= pRes->numOfRows);
H
Haojun Liao 已提交
2103 2104

    doBuildResFromSubqueries(pSql);
S
TD-1057  
Shengliang Guan 已提交
2105
    tsem_post(&pSql->rspSem);
H
Haojun Liao 已提交
2106
    return;
H
hjxilinx 已提交
2107 2108 2109 2110 2111 2112
  }
}

static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pField) {
  SSqlRes *pRes = &pSql->res;
  
H
Haojun Liao 已提交
2113
  if (pRes->tsrow[columnIndex] != NULL && pField->type == TSDB_DATA_TYPE_NCHAR) {
H
hjxilinx 已提交
2114 2115 2116 2117 2118 2119 2120 2121
    // convert unicode to native code in a temporary buffer extra one byte for terminated symbol
    if (pRes->buffer[columnIndex] == NULL) {
      pRes->buffer[columnIndex] = malloc(pField->bytes + TSDB_NCHAR_SIZE);
    }
    
    /* string terminated char for binary data*/
    memset(pRes->buffer[columnIndex], 0, pField->bytes + TSDB_NCHAR_SIZE);
    
2122 2123
    int32_t length = taosUcs4ToMbs(pRes->tsrow[columnIndex], pRes->length[columnIndex], pRes->buffer[columnIndex]);
    if ( length >= 0 ) {
H
hjxilinx 已提交
2124
      pRes->tsrow[columnIndex] = pRes->buffer[columnIndex];
2125
      pRes->length[columnIndex] = length;
H
hjxilinx 已提交
2126
    } else {
B
Bomin Zhang 已提交
2127
      tscError("%p charset:%s to %s. val:%s convert failed.", pSql, DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)pRes->tsrow[columnIndex]);
H
hjxilinx 已提交
2128
      pRes->tsrow[columnIndex] = NULL;
2129
      pRes->length[columnIndex] = 0;
H
hjxilinx 已提交
2130 2131 2132 2133
    }
  }
}

2134 2135 2136 2137 2138 2139 2140 2141
static char *getArithemicInputSrc(void *param, const char *name, int32_t colId) {
  SArithmeticSupport *pSupport = (SArithmeticSupport *) param;

  int32_t index = -1;
  SSqlExpr* pExpr = NULL;
  
  for (int32_t i = 0; i < pSupport->numOfCols; ++i) {
    pExpr = taosArrayGetP(pSupport->exprList, i);
B
Bomin Zhang 已提交
2142
    if (strncmp(name, pExpr->aliasName, sizeof(pExpr->aliasName) - 1) == 0) {
2143 2144 2145 2146 2147 2148 2149 2150 2151
      index = i;
      break;
    }
  }

  assert(index >= 0 && index < pSupport->numOfCols);
  return pSupport->data[index] + pSupport->offset * pExpr->resBytes;
}

H
hjxilinx 已提交
2152 2153 2154
void **doSetResultRowData(SSqlObj *pSql, bool finalResult) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
H
Haojun Liao 已提交
2155

H
hjxilinx 已提交
2156 2157
  assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows);
  if (pRes->row >= pRes->numOfRows) {  // all the results has returned to invoker
S
Shengliang Guan 已提交
2158
    taosTFree(pRes->tsrow);
H
hjxilinx 已提交
2159 2160
    return pRes->tsrow;
  }
H
Haojun Liao 已提交
2161

H
hjxilinx 已提交
2162
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
Haojun Liao 已提交
2163

H
Haojun Liao 已提交
2164 2165
  size_t size = tscNumOfFields(pQueryInfo);
  for (int i = 0; i < size; ++i) {
H
Haojun Liao 已提交
2166
    SInternalField* pSup = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
H
hjxilinx 已提交
2167
    if (pSup->pSqlExpr != NULL) {
2168
      tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i);
H
hjxilinx 已提交
2169
    }
H
Haojun Liao 已提交
2170

H
hjxilinx 已提交
2171
    // primary key column cannot be null in interval query, no need to check
2172
    if (i == 0 && pQueryInfo->interval.interval > 0) {
H
hjxilinx 已提交
2173 2174
      continue;
    }
H
Haojun Liao 已提交
2175

H
Haojun Liao 已提交
2176
    TAOS_FIELD *pField = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
H
Haojun Liao 已提交
2177 2178 2179 2180
    if (pRes->tsrow[i] != NULL && pField->type == TSDB_DATA_TYPE_NCHAR) {
      transferNcharData(pSql, i, pField);
    }

H
hjxilinx 已提交
2181 2182
    // calculate the result from several other columns
    if (pSup->pArithExprInfo != NULL) {
2183
      if (pRes->pArithSup == NULL) {
H
Haojun Liao 已提交
2184
        pRes->pArithSup = (SArithmeticSupport*)calloc(1, sizeof(SArithmeticSupport));
2185
      }
H
Haojun Liao 已提交
2186

H
Haojun Liao 已提交
2187 2188 2189 2190 2191 2192
      pRes->pArithSup->offset     = 0;
      pRes->pArithSup->pArithExpr = pSup->pArithExprInfo;
      pRes->pArithSup->numOfCols  = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
      pRes->pArithSup->exprList   = pQueryInfo->exprList;
      pRes->pArithSup->data       = calloc(pRes->pArithSup->numOfCols, POINTER_BYTES);

2193 2194 2195 2196 2197 2198 2199 2200 2201
      if (pRes->buffer[i] == NULL) {
        TAOS_FIELD* field = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
        pRes->buffer[i] = malloc(field->bytes);
      }

      for(int32_t k = 0; k < pRes->pArithSup->numOfCols; ++k) {
        SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k);
        pRes->pArithSup->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes;
      }
H
Haojun Liao 已提交
2202

2203 2204 2205
      tExprTreeCalcTraverse(pRes->pArithSup->pArithExpr->pExpr, 1, pRes->buffer[i], pRes->pArithSup,
          TSDB_ORDER_ASC, getArithemicInputSrc);
      pRes->tsrow[i] = pRes->buffer[i];
H
hjxilinx 已提交
2206 2207
    }
  }
H
Haojun Liao 已提交
2208

H
hjxilinx 已提交
2209 2210 2211 2212
  pRes->row++;  // index increase one-step
  return pRes->tsrow;
}

H
Haojun Liao 已提交
2213
static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
H
hjxilinx 已提交
2214 2215 2216 2217 2218 2219 2220
  bool     hasData = true;
  SSqlCmd *pCmd = &pSql->cmd;
  
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
    bool allSubqueryExhausted = true;
    
H
Haojun Liao 已提交
2221
    for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246
      if (pSql->pSubs[i] == NULL) {
        continue;
      }
      
      SSqlRes *pRes1 = &pSql->pSubs[i]->res;
      SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd;
      
      SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(pCmd1, pCmd1->clauseIndex);
      assert(pQueryInfo1->numOfTables == 1);
      
      STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo1, 0);
      
      /*
       * if the global limitation is not reached, and current result has not exhausted, or next more vnodes are
       * available, goes on
       */
      if (pTableMetaInfo->vgroupIndex < pTableMetaInfo->vgroupList->numOfVgroups && pRes1->row < pRes1->numOfRows &&
          (!tscHasReachLimitation(pQueryInfo1, pRes1))) {
        allSubqueryExhausted = false;
        break;
      }
    }
    
    hasData = !allSubqueryExhausted;
  } else {  // otherwise, in case inner join, if any subquery exhausted, query completed.
H
Haojun Liao 已提交
2247
    for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
2248 2249 2250 2251 2252 2253 2254 2255
      if (pSql->pSubs[i] == 0) {
        continue;
      }
      
      SSqlRes *   pRes1 = &pSql->pSubs[i]->res;
      SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0);
      
      if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pQueryInfo1, pRes1) &&
H
Haojun Liao 已提交
2256
          tscIsProjectionQuery(pQueryInfo1)) || (pRes1->numOfRows == 0)) {
H
hjxilinx 已提交
2257 2258 2259 2260 2261 2262 2263 2264
        hasData = false;
        break;
      }
    }
  }
  
  return hasData;
}