tscSubquery.c 118.7 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
H
Hui Li 已提交
15
#define _GNU_SOURCE
16

H
Haojun Liao 已提交
17
#include "os.h"
H
hjxilinx 已提交
18

H
Haojun Liao 已提交
19
#include "texpr.h"
H
Haojun Liao 已提交
20
#include "qTsbuf.h"
H
Haojun Liao 已提交
21
#include "tcompare.h"
S
slguan 已提交
22
#include "tscLog.h"
H
Haojun Liao 已提交
23
#include "tscSubquery.h"
H
Haojun Liao 已提交
24
#include "qTableMeta.h"
25
#include "tsclient.h"
26
#include "qUtil.h"
H
Haojun Liao 已提交
27
#include "qPlan.h"
H
hjxilinx 已提交
28 29 30

typedef struct SInsertSupporter {
  SSqlObj*  pSql;
31
  int32_t   index;
H
hjxilinx 已提交
32 33
} SInsertSupporter;

H
hjxilinx 已提交
34
static void freeJoinSubqueryObj(SSqlObj* pSql);
H
Haojun Liao 已提交
35
static bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql);
H
hjxilinx 已提交
36

H
Haojun Liao 已提交
37 38 39 40 41
static int32_t tsCompare(int32_t order, int64_t left, int64_t right) {
  if (left == right) {
    return 0;
  }

42
  if (order == TSDB_ORDER_ASC) {
H
Haojun Liao 已提交
43
    return left < right? -1:1;
H
hjxilinx 已提交
44
  } else {
H
Haojun Liao 已提交
45
    return left > right? -1:1;
H
hjxilinx 已提交
46 47 48
  }
}

49
static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) {
D
fix bug  
dapan1121 已提交
50 51 52 53 54 55 56
  STSElem el1 = tsBufGetElem(pTSBuf);

  int32_t res = tVariantCompare(el1.tag, tag1);
  if (res != 0) { // it is a record with new tag
    return;
  }

57
  while (tsBufNextPos(pTSBuf)) {
58
    el1 = tsBufGetElem(pTSBuf);
59

60
    res = tVariantCompare(el1.tag, tag1);
61 62 63 64 65 66
    if (res != 0) { // it is a record with new tag
      return;
    }
  }
}

67
static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, int8_t state) {
H
Haojun Liao 已提交
68 69
  assert(idx < subState->numOfSub && subState->states != NULL);
  tscDebug("subquery:0x%"PRIx64",%d state set to %d", pSql->self, idx, state);
70

D
fix bug  
dapan1121 已提交
71
  pthread_mutex_lock(&subState->mutex);
72
  subState->states[idx] = state;
D
fix bug  
dapan1121 已提交
73
  pthread_mutex_unlock(&subState->mutex);
74 75
}

D
fix bug  
dapan1121 已提交
76
static bool allSubqueryDone(SSqlObj *pParentSql) {
77
  bool done = true;
D
fix bug  
dapan1121 已提交
78
  SSubqueryState *subState = &pParentSql->subState;
79 80

  //lock in caller
H
Haojun Liao 已提交
81
  tscDebug("0x%"PRIx64" total subqueries: %d", pParentSql->self, subState->numOfSub);
82
  for (int i = 0; i < subState->numOfSub; i++) {
83
    SSqlObj* pSub = pParentSql->pSubs[i];
84
    if (0 == subState->states[i]) {
H
Haojun Liao 已提交
85
      tscDebug("0x%"PRIx64" subquery:0x%"PRIx64", index: %d NOT finished yet", pParentSql->self, pSub->self, i);
86 87 88
      done = false;
      break;
    } else {
89 90 91 92 93
      if (pSub != NULL) {
        tscDebug("0x%"PRIx64" subquery:0x%"PRIx64", index: %d finished", pParentSql->self, pSub->self, i);
      } else {
        tscDebug("0x%"PRIx64" subquery:%p, index: %d finished", pParentSql->self, pSub, i);
      }
94 95 96 97 98 99
    }
  }

  return done;
}

H
Haojun Liao 已提交
100
bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
D
fix bug  
dapan1121 已提交
101
  SSubqueryState *subState = &pParentSql->subState;
102 103
  assert(idx < subState->numOfSub);

D
fix bug  
dapan1121 已提交
104
  pthread_mutex_lock(&subState->mutex);
105

H
Haojun Liao 已提交
106 107 108 109 110 111
//  bool done = allSubqueryDone(pParentSql);
//  if (done) {
//    tscDebug("0x%"PRIx64" subquery:0x%"PRIx64",%d all subs already done", pParentSql->self, pSql->self, idx);
//    pthread_mutex_unlock(&subState->mutex);
//    return false;
//  }
D
fix bug  
dapan1121 已提交
112

H
Haojun Liao 已提交
113
  tscDebug("0x%"PRIx64" subquery:0x%"PRIx64", index:%d state set to 1", pParentSql->self, pSql->self, idx);
114 115
  subState->states[idx] = 1;

H
Haojun Liao 已提交
116
  bool done = allSubqueryDone(pParentSql);
D
fix bug  
dapan1121 已提交
117
  pthread_mutex_unlock(&subState->mutex);
118 119 120
  return done;
}

H
hjxilinx 已提交
121

122

D
dapan1121 已提交
123
static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
H
Haojun Liao 已提交
124
  SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
H
Haojun Liao 已提交
125 126 127

  win->skey = INT64_MAX;
  win->ekey = INT64_MIN;
H
hjxilinx 已提交
128 129 130

  SLimitVal* pLimit = &pQueryInfo->limit;
  int32_t    order = pQueryInfo->order.order;
D
dapan1121 已提交
131
  int32_t    joinNum = pSql->subState.numOfSub;
D
dapan1121 已提交
132
  SMergeTsCtx ctxlist[TSDB_MAX_JOIN_TABLE_NUM] = {{0}};
D
dapan1121 已提交
133
  SMergeTsCtx* ctxStack[TSDB_MAX_JOIN_TABLE_NUM] = {0};
D
fix bug  
dapan1121 已提交
134
  int32_t slot = 0;
D
dapan1121 已提交
135
  size_t tableNum = 0;
D
fix bug  
dapan1121 已提交
136 137 138 139
  int16_t* tableMIdx = 0;
  int32_t equalNum = 0;
  int32_t stackidx = 0;
  SMergeTsCtx* ctx = NULL;
H
Haojun Liao 已提交
140
  SMergeTsCtx* pctx = NULL;
D
fix bug  
dapan1121 已提交
141 142 143 144 145 146
  SMergeTsCtx* mainCtx = NULL;
  STSElem cur;
  STSElem prev;
  SArray*   tsCond = NULL;
  int32_t mergeDone = 0;

D
dapan1121 已提交
147 148
  for (int32_t i = 0; i < joinNum; ++i) {
    STSBuf* output = tsBufCreate(true, pQueryInfo->order.order);
H
Haojun Liao 已提交
149
    SQueryInfo* pSubQueryInfo = tscGetQueryInfo(&pSql->pSubs[i]->cmd);
D
dapan1121 已提交
150 151

    pSubQueryInfo->tsBuf = output;
H
Haojun Liao 已提交
152

D
dapan1121 已提交
153
    SJoinSupporter* pSupporter = pSql->pSubs[i]->param;
H
Haojun Liao 已提交
154

D
fix bug  
dapan1121 已提交
155
    if (pSupporter->pTSBuf == NULL) {
H
Haojun Liao 已提交
156
      tscDebug("0x%"PRIx64" at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql->self);
D
dapan1121 已提交
157 158
      return 0;
    }
H
Haojun Liao 已提交
159

D
fix bug  
dapan1121 已提交
160
    tsBufResetPos(pSupporter->pTSBuf);
H
hjxilinx 已提交
161

D
fix bug  
dapan1121 已提交
162
    if (!tsBufNextPos(pSupporter->pTSBuf)) {
H
Haojun Liao 已提交
163
      tscDebug("0x%"PRIx64" input1 is empty, 0 for secondary query after ts blocks intersecting", pSql->self);
D
dapan1121 已提交
164 165
      return 0;
    }
H
Haojun Liao 已提交
166

H
Haojun Liao 已提交
167 168
    tscDebug("0x%"PRIx64" sub:0x%"PRIx64" table idx:%d, input group number:%d", pSql->self,
        pSql->pSubs[i]->self, i, pSupporter->pTSBuf->numOfGroups);
D
dapan1121 已提交
169

D
dapan1121 已提交
170 171
    ctxlist[i].p = pSupporter;
    ctxlist[i].res = output;
172 173
  }

D
dapan1121 已提交
174
  TSKEY st = taosGetTimestampUs();
H
hjxilinx 已提交
175

D
dapan1121 已提交
176 177 178 179 180
  for (int16_t tidx = 0; tidx < joinNum; tidx++) {
    pctx = &ctxlist[tidx];
    if (pctx->compared) {
      continue;
    }
H
hjxilinx 已提交
181

D
fix bug  
dapan1121 已提交
182
    assert(pctx->numOfInput == 0);
H
hjxilinx 已提交
183

D
fix bug  
dapan1121 已提交
184
    tsCond = pQueryInfo->tagCond.joinInfo.joinTables[tidx]->tsJoin;
H
hjxilinx 已提交
185

D
fix bug  
dapan1121 已提交
186 187
    tableNum = taosArrayGetSize(tsCond);
    assert(tableNum >= 2);
H
hjxilinx 已提交
188

D
fix bug  
dapan1121 已提交
189 190 191 192 193
    for (int32_t i = 0; i < tableNum; ++i) {
      tableMIdx = taosArrayGet(tsCond, i);
      SMergeTsCtx* tctx = &ctxlist[*tableMIdx];
      tctx->compared = 1;
    }
H
hjxilinx 已提交
194

D
fix bug  
dapan1121 已提交
195 196
    tableMIdx = taosArrayGet(tsCond, 0);
    pctx = &ctxlist[*tableMIdx];
H
hjxilinx 已提交
197

D
fix bug  
dapan1121 已提交
198
    mainCtx = pctx;
H
hjxilinx 已提交
199

H
Haojun Liao 已提交
200
    while (1) {
D
fix bug  
dapan1121 已提交
201
      pctx = mainCtx;
H
hjxilinx 已提交
202

D
fix bug  
dapan1121 已提交
203
      prev = tsBufGetElem(pctx->p->pTSBuf);
D
dapan1121 已提交
204

D
fix bug  
dapan1121 已提交
205
      ctxStack[stackidx++] = pctx;
D
dapan1121 已提交
206

D
fix bug  
dapan1121 已提交
207 208 209
      if (!tsBufIsValidElem(&prev)) {
        break;
      }
D
dapan1121 已提交
210

D
fix bug  
dapan1121 已提交
211 212
      tVariant tag = {0};
      tVariantAssign(&tag, prev.tag);
D
dapan1121 已提交
213

D
fix bug  
dapan1121 已提交
214
      int32_t skipped = 0;
D
dapan1121 已提交
215

H
Haojun Liao 已提交
216
      for (int32_t i = 1; i < tableNum; ++i) {
D
fix bug  
dapan1121 已提交
217
        SMergeTsCtx* tctx = &ctxlist[i];
H
Haojun Liao 已提交
218

D
fix bug  
dapan1121 已提交
219 220
        // find the data in supporter2 with the same tag value
        STSElem e2 = tsBufFindElemStartPosByTag(tctx->p->pTSBuf, &tag);
D
dapan1121 已提交
221

D
fix bug  
dapan1121 已提交
222 223 224
        if (!tsBufIsValidElem(&e2)) {
          skipRemainValue(pctx->p->pTSBuf, &tag);
          skipped = 1;
D
dapan1121 已提交
225 226
          break;
        }
D
fix bug  
dapan1121 已提交
227
      }
D
dapan1121 已提交
228

D
fix bug  
dapan1121 已提交
229 230
      if (skipped) {
        slot = 0;
D
dapan1121 已提交
231
        stackidx = 0;
D
fix bug  
dapan1121 已提交
232
        continue;
D
dapan1121 已提交
233
      }
H
Haojun Liao 已提交
234

D
fix bug  
dapan1121 已提交
235 236
      tableMIdx = taosArrayGet(tsCond, ++slot);
      equalNum = 1;
237

H
Haojun Liao 已提交
238
      while (1) {
D
fix bug  
dapan1121 已提交
239 240 241 242
        ctx = &ctxlist[*tableMIdx];

        prev = tsBufGetElem(pctx->p->pTSBuf);
        cur = tsBufGetElem(ctx->p->pTSBuf);
H
Haojun Liao 已提交
243

244
        // data with current are exhausted
D
fix bug  
dapan1121 已提交
245
        if (!tsBufIsValidElem(&prev) || tVariantCompare(prev.tag, &tag) != 0) {
246 247 248
          break;
        }

D
fix bug  
dapan1121 已提交
249
        if (!tsBufIsValidElem(&cur) || tVariantCompare(cur.tag, &tag) != 0) { // ignore all records with the same tag
250 251 252
          break;
        }

D
fix bug  
dapan1121 已提交
253 254 255 256 257 258
        ctxStack[stackidx++] = ctx;

        int32_t ret = tsCompare(order, prev.ts, cur.ts);
        if (ret == 0) {
          if (++equalNum < tableNum) {
            pctx = ctx;
H
Haojun Liao 已提交
259

D
fix bug  
dapan1121 已提交
260 261
            if (++slot >= tableNum) {
              slot = 0;
D
dapan1121 已提交
262 263
            }

D
fix bug  
dapan1121 已提交
264 265 266
            tableMIdx = taosArrayGet(tsCond, slot);
            continue;
          }
H
Haojun Liao 已提交
267

D
fix bug  
dapan1121 已提交
268 269
          assert(stackidx == tableNum);

H
Haojun Liao 已提交
270
          if (pLimit->offset == 0 || pQueryInfo->interval.interval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
D
fix bug  
dapan1121 已提交
271 272
            if (win->skey > prev.ts) {
              win->skey = prev.ts;
H
Haojun Liao 已提交
273 274
            }

D
fix bug  
dapan1121 已提交
275 276
            if (win->ekey < prev.ts) {
              win->ekey = prev.ts;
H
Haojun Liao 已提交
277 278
            }

D
fix bug  
dapan1121 已提交
279 280 281
            for (int32_t i = 0; i < stackidx; ++i) {
              SMergeTsCtx* tctx = ctxStack[i];
              prev = tsBufGetElem(tctx->p->pTSBuf);
H
Haojun Liao 已提交
282 283

              tsBufAppend(tctx->res, prev.id, prev.tag, (const char*)&prev.ts, sizeof(prev.ts));
D
fix bug  
dapan1121 已提交
284
            }
H
Haojun Liao 已提交
285
          } else {
286
            pLimit->offset -= 1;//offset apply to projection?
H
Haojun Liao 已提交
287 288
          }

D
fix bug  
dapan1121 已提交
289 290
          for (int32_t i = 0; i < stackidx; ++i) {
            SMergeTsCtx* tctx = ctxStack[i];
H
Haojun Liao 已提交
291

D
dapan1121 已提交
292
            if (!tsBufNextPos(tctx->p->pTSBuf) && tctx == mainCtx) {
D
fix bug  
dapan1121 已提交
293 294
              mergeDone = 1;
            }
H
Haojun Liao 已提交
295
            tctx->numOfInput++;
D
fix bug  
dapan1121 已提交
296 297 298 299 300 301 302
          }

          if (mergeDone) {
            break;
          }

          stackidx = 0;
H
Haojun Liao 已提交
303
          equalNum = 1;
D
fix bug  
dapan1121 已提交
304 305 306

          ctxStack[stackidx++] = pctx;
        } else if (ret > 0) {
D
dapan1121 已提交
307
          if (!tsBufNextPos(ctx->p->pTSBuf) && ctx == mainCtx) {
D
fix bug  
dapan1121 已提交
308 309 310
            mergeDone = 1;
            break;
          }
H
Haojun Liao 已提交
311

D
fix bug  
dapan1121 已提交
312 313
          ctx->numOfInput++;
          stackidx--;
H
Haojun Liao 已提交
314
        } else {
D
fix bug  
dapan1121 已提交
315
          stackidx--;
H
Haojun Liao 已提交
316

D
fix bug  
dapan1121 已提交
317 318
          for (int32_t i = 0; i < stackidx; ++i) {
            SMergeTsCtx* tctx = ctxStack[i];
H
Haojun Liao 已提交
319

D
dapan1121 已提交
320
            if (!tsBufNextPos(tctx->p->pTSBuf) && tctx == mainCtx) {
D
fix bug  
dapan1121 已提交
321 322 323 324 325 326 327 328
              mergeDone = 1;
            }
            tctx->numOfInput++;
          }

          if (mergeDone) {
            break;
          }
D
dapan1121 已提交
329

H
Haojun Liao 已提交
330
          stackidx = 0;
D
fix bug  
dapan1121 已提交
331
          equalNum = 1;
H
Haojun Liao 已提交
332

D
fix bug  
dapan1121 已提交
333
          ctxStack[stackidx++] = pctx;
H
Haojun Liao 已提交
334
        }
D
fix bug  
dapan1121 已提交
335 336 337 338 339

      }

      if (mergeDone) {
        break;
H
hjxilinx 已提交
340
      }
D
fix bug  
dapan1121 已提交
341 342 343

      slot = 0;
      stackidx = 0;
H
Haojun Liao 已提交
344

D
fix bug  
dapan1121 已提交
345
      skipRemainValue(mainCtx->p->pTSBuf, &tag);
H
hjxilinx 已提交
346
    }
D
fix bug  
dapan1121 已提交
347 348 349 350

    stackidx = 0;
    slot = 0;
    mergeDone = 0;
H
hjxilinx 已提交
351 352 353 354 355 356 357
  }

  /*
   * failed to set the correct ts order yet in two cases:
   * 1. only one element
   * 2. only one element for each tag.
   */
D
fix bug  
dapan1121 已提交
358 359 360 361
  if (ctxlist[0].res->tsOrder == -1) {
    for (int32_t i = 0; i < joinNum; ++i) {
      ctxlist[i].res->tsOrder = TSDB_ORDER_ASC;
    }
H
hjxilinx 已提交
362 363
  }

D
fix bug  
dapan1121 已提交
364 365
  for (int32_t i = 0; i < joinNum; ++i) {
    tsBufFlush(ctxlist[i].res);
H
hjxilinx 已提交
366

D
fix bug  
dapan1121 已提交
367 368 369
    tsBufDestroy(ctxlist[i].p->pTSBuf);
    ctxlist[i].p->pTSBuf = NULL;
  }
D
fix bug  
dapan1121 已提交
370
    
H
Haojun Liao 已提交
371
  TSKEY et = taosGetTimestampUs();
H
hjxilinx 已提交
372

D
fix bug  
dapan1121 已提交
373
  for (int32_t i = 0; i < joinNum; ++i) {
H
Haojun Liao 已提交
374
    tscDebug("0x%"PRIx64" sub:0x%"PRIx64" tblidx:%d, input:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks "
D
fix bug  
dapan1121 已提交
375
             "intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us",
H
Haojun Liao 已提交
376
             pSql->self, pSql->pSubs[i]->self, i, ctxlist[i].numOfInput, ctxlist[i].res->numOfTotal, ctxlist[i].res->numOfGroups, win->skey, win->ekey,
D
fix bug  
dapan1121 已提交
377
             tsBufGetNumOfGroup(ctxlist[i].res), et - st);
H
Haojun Liao 已提交
378
  }
D
fix bug  
dapan1121 已提交
379 380

  return ctxlist[0].res->numOfTotal;
H
hjxilinx 已提交
381 382
}

D
dapan1121 已提交
383

H
hjxilinx 已提交
384
// todo handle failed to create sub query
H
Haojun Liao 已提交
385
SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, int32_t index) {
386
  SJoinSupporter* pSupporter = calloc(1, sizeof(SJoinSupporter));
H
hjxilinx 已提交
387 388 389 390 391 392 393
  if (pSupporter == NULL) {
    return NULL;
  }

  pSupporter->pObj = pSql;

  pSupporter->subqueryIndex = index;
H
Haojun Liao 已提交
394
  SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
H
hjxilinx 已提交
395
  
396
  memcpy(&pSupporter->interval, &pQueryInfo->interval, sizeof(pSupporter->interval));
H
hjxilinx 已提交
397 398
  pSupporter->limit = pQueryInfo->limit;

H
Haojun Liao 已提交
399
  STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, index);
400
  pSupporter->uid = pTableMetaInfo->pTableMeta->id.uid;
H
hjxilinx 已提交
401 402
  assert (pSupporter->uid != 0);

S
slguan 已提交
403
  taosGetTmpfilePath("join-", pSupporter->path);
H
hjxilinx 已提交
404

D
fix bug  
dapan1121 已提交
405 406
  // do NOT create file here to reduce crash generated file left issue
  pSupporter->f = NULL;
H
hjxilinx 已提交
407 408 409 410

  return pSupporter;
}

H
Haojun Liao 已提交
411
static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
H
hjxilinx 已提交
412 413 414 415
  if (pSupporter == NULL) {
    return;
  }

H
hjxilinx 已提交
416
  if (pSupporter->exprList != NULL) {
H
Haojun Liao 已提交
417 418
    tscExprDestroy(pSupporter->exprList);
    pSupporter->exprList = NULL;
H
hjxilinx 已提交
419 420 421 422 423
  }
  
  if (pSupporter->colList != NULL) {
    tscColumnListDestroy(pSupporter->colList);
  }
H
hjxilinx 已提交
424

H
Haojun Liao 已提交
425
//  tscFieldInfoClear(&pSupporter->fieldsInfo);
H
hjxilinx 已提交
426

D
fix bug  
dapan1121 已提交
427 428 429 430 431 432 433
  if (pSupporter->pTSBuf != NULL) {
    tsBufDestroy(pSupporter->pTSBuf);
    pSupporter->pTSBuf = NULL;
  }

  unlink(pSupporter->path);
  
H
hjxilinx 已提交
434 435
  if (pSupporter->f != NULL) {
    fclose(pSupporter->f);
H
hjxilinx 已提交
436
    pSupporter->f = NULL;
H
hjxilinx 已提交
437 438
  }

439 440 441 442 443
  if (pSupporter->pVgroupTables != NULL) {
    taosArrayDestroy(pSupporter->pVgroupTables);
    pSupporter->pVgroupTables = NULL;
  }

S
TD-1848  
Shengliang Guan 已提交
444
  tfree(pSupporter->pIdTagList);
H
hjxilinx 已提交
445 446 447 448
  tscTagCondRelease(&pSupporter->tagCond);
  free(pSupporter);
}

H
Haojun Liao 已提交
449
static void filterVgroupTables(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
450 451
  int32_t  num = 0;
  int32_t* list = NULL;
H
Haojun Liao 已提交
452
  tsBufGetGroupIdList(pQueryInfo->tsBuf, &num, &list);
453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477

  // The virtual node, of which all tables are disqualified after the timestamp intersection,
  // is removed to avoid next stage query.
  // TODO: If tables from some vnodes are not qualified for next stage query, discard them.
  for (int32_t k = 0; k < taosArrayGetSize(pVgroupTables);) {
    SVgroupTableInfo* p = taosArrayGet(pVgroupTables, k);

    bool found = false;
    for (int32_t f = 0; f < num; ++f) {
      if (p->vgInfo.vgId == list[f]) {
        found = true;
        break;
      }
    }

    if (!found) {
      tscRemoveVgroupTableGroup(pVgroupTables, k);
    } else {
      k++;
    }
  }

  assert(taosArrayGetSize(pVgroupTables) > 0);
  TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);

S
TD-1848  
Shengliang Guan 已提交
478
  tfree(list);
479 480
}

H
Haojun Liao 已提交
481
static SArray* buildVgroupTableByResult(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
482 483
  int32_t  num = 0;
  int32_t* list = NULL;
H
Haojun Liao 已提交
484
  tsBufGetGroupIdList(pQueryInfo->tsBuf, &num, &list);
485

S
Shengliang Guan 已提交
486
  size_t numOfGroups = taosArrayGetSize(pVgroupTables);
487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504

  SArray* pNew = taosArrayInit(num, sizeof(SVgroupTableInfo));

  SVgroupTableInfo info;
  for (int32_t i = 0; i < num; ++i) {
    int32_t vnodeId = list[i];

    for (int32_t j = 0; j < numOfGroups; ++j) {
      SVgroupTableInfo* p1 = taosArrayGet(pVgroupTables, j);
      if (p1->vgInfo.vgId == vnodeId) {
        tscVgroupTableCopy(&info, p1);
        break;
      }
    }

    taosArrayPush(pNew, &info);
  }

S
TD-1848  
Shengliang Guan 已提交
505
  tfree(list);
506 507 508 509 510
  TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);

  return pNew;
}

H
hjxilinx 已提交
511 512 513
/*
 * launch secondary stage query to fetch the result that contains timestamp in set
 */
H
Haojun Liao 已提交
514
static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
H
hjxilinx 已提交
515
  int32_t         numOfSub = 0;
516
  SJoinSupporter* pSupporter = NULL;
H
hjxilinx 已提交
517
  
H
Haojun Liao 已提交
518
  //If the columns are not involved in the final select clause, the corresponding query will not be issued.
H
Haojun Liao 已提交
519
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
520
    pSupporter = pSql->pSubs[i]->param;
H
hjxilinx 已提交
521
    if (taosArrayGetSize(pSupporter->exprList) > 0) {
H
hjxilinx 已提交
522 523 524 525 526 527 528
      ++numOfSub;
    }
  }
  
  assert(numOfSub > 0);
  
  // scan all subquery, if one sub query has only ts, ignore it
H
Haojun Liao 已提交
529
  tscDebug("0x%"PRIx64" start to launch secondary subqueries, %d out of %d needs to query", pSql->self, numOfSub, pSql->subState.numOfSub);
H
hjxilinx 已提交
530 531 532

  bool success = true;
  
H
Haojun Liao 已提交
533
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
534 535 536 537 538
    SSqlObj *pPrevSub = pSql->pSubs[i];
    pSql->pSubs[i] = NULL;
    
    pSupporter = pPrevSub->param;
  
H
hjxilinx 已提交
539
    if (taosArrayGetSize(pSupporter->exprList) == 0) {
H
Haojun Liao 已提交
540
      tscDebug("0x%"PRIx64" subIndex: %d, no need to launch query, ignore it", pSql->self, i);
H
hjxilinx 已提交
541 542
    
      tscDestroyJoinSupporter(pSupporter);
543
      taos_free_result(pPrevSub);
H
hjxilinx 已提交
544 545 546 547 548
    
      pSql->pSubs[i] = NULL;
      continue;
    }
  
H
Haojun Liao 已提交
549
    SQueryInfo *pSubQueryInfo = tscGetQueryInfo(&pPrevSub->cmd);
H
Haojun Liao 已提交
550
    STSBuf     *pTsBuf = pSubQueryInfo->tsBuf;
H
hjxilinx 已提交
551 552 553
    pSubQueryInfo->tsBuf = NULL;
  
    // free result for async object will also free sqlObj
H
Haojun Liao 已提交
554
    assert(tscNumOfExprs(pSubQueryInfo) == 1); // ts_comp query only requires one result columns
H
hjxilinx 已提交
555 556
    taos_free_result(pPrevSub);
  
557
    SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL);
H
hjxilinx 已提交
558 559 560 561 562
    if (pNew == NULL) {
      tscDestroyJoinSupporter(pSupporter);
      success = false;
      break;
    }
B
Bomin Zhang 已提交
563

H
hjxilinx 已提交
564 565 566
    tscClearSubqueryInfo(&pNew->cmd);
    pSql->pSubs[i] = pNew;
  
H
Haojun Liao 已提交
567
    SQueryInfo *pQueryInfo = tscGetQueryInfo(&pNew->cmd);
H
Haojun Liao 已提交
568
    pQueryInfo->tsBuf = pTsBuf;  // transfer the ownership of timestamp comp-z data to the new created object
B
Bomin Zhang 已提交
569

H
hjxilinx 已提交
570
    // set the second stage sub query for join process
H
hjxilinx 已提交
571
    TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE);
572
    memcpy(&pQueryInfo->interval, &pSupporter->interval, sizeof(pQueryInfo->interval));
B
Bomin Zhang 已提交
573

H
hjxilinx 已提交
574
    tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond);
B
Bomin Zhang 已提交
575

H
Haojun Liao 已提交
576 577 578
    pQueryInfo->colList     = pSupporter->colList;
    pQueryInfo->exprList    = pSupporter->exprList;
    pQueryInfo->fieldsInfo  = pSupporter->fieldsInfo;
H
Haojun Liao 已提交
579
    pQueryInfo->groupbyExpr = pSupporter->groupInfo;
H
Haojun Liao 已提交
580
    pQueryInfo->pUpstream   = taosArrayInit(4, sizeof(POINTER_BYTES));
H
Haojun Liao 已提交
581

H
Haojun Liao 已提交
582
    assert(pNew->subState.numOfSub == 0 && pQueryInfo->numOfTables == 1);
H
hjxilinx 已提交
583
  
584
    tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
585
  
586
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
587
    pTableMetaInfo->pVgroupTables = pSupporter->pVgroupTables;
H
Haojun Liao 已提交
588 589

    pSupporter->exprList = NULL;
590
    pSupporter->colList  = NULL;
591
    pSupporter->pVgroupTables = NULL;
H
Haojun Liao 已提交
592
    memset(&pSupporter->fieldsInfo, 0, sizeof(SFieldInfo));
H
Haojun Liao 已提交
593
    memset(&pSupporter->groupInfo, 0, sizeof(SGroupbyExpr));
H
Haojun Liao 已提交
594

H
hjxilinx 已提交
595 596 597 598 599
    /*
     * When handling the projection query, the offset value will be modified for table-table join, which is changed
     * during the timestamp intersection.
     */
    pSupporter->limit = pQueryInfo->limit;
H
Haojun Liao 已提交
600 601 602
    SColumnIndex index = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
    SSchema* s = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, 0);

H
Haojun Liao 已提交
603
    SExprInfo* pExpr = tscExprGet(pQueryInfo, 0);
H
Haojun Liao 已提交
604
    int16_t funcId = pExpr->base.functionId;
H
Haojun Liao 已提交
605

H
Haojun Liao 已提交
606
    // add the invisible timestamp column
H
Haojun Liao 已提交
607
    if ((pExpr->base.colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) ||
H
Haojun Liao 已提交
608 609 610 611
        (funcId != TSDB_FUNC_TS && funcId != TSDB_FUNC_TS_DUMMY && funcId != TSDB_FUNC_PRJ)) {

      int16_t functionId = tscIsProjectionQuery(pQueryInfo)? TSDB_FUNC_PRJ : TSDB_FUNC_TS;

H
Haojun Liao 已提交
612
      tscAddFuncInSelectClause(pQueryInfo, 0, functionId, &index, s, TSDB_COL_NORMAL, getNewResColId(&pNew->cmd));
H
Haojun Liao 已提交
613
      tscPrintSelNodeList(pNew, 0);
614
      tscFieldInfoUpdateOffset(pQueryInfo);
H
Haojun Liao 已提交
615

H
Haojun Liao 已提交
616
      pExpr = tscExprGet(pQueryInfo, 0);
H
Haojun Liao 已提交
617 618
    }

619
    // set the join condition tag column info, todo extract method
H
Haojun Liao 已提交
620 621
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
      assert(pQueryInfo->tagCond.joinInfo.hasJoin);
622
      int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
H
Haojun Liao 已提交
623

624
      // set the tag column id for executor to extract correct tag value
H
Haojun Liao 已提交
625
#ifndef _TD_NINGSI_60
H
Haojun Liao 已提交
626
      pExpr->base.param[0] = (tVariant) {.i64 = colId, .nType = TSDB_DATA_TYPE_BIGINT, .nLen = sizeof(int64_t)};
L
lichuang 已提交
627 628 629 630 631
#else
      pExpr->base.param[0].i64 = colId;
      pExpr->base.param[0].nType = TSDB_DATA_TYPE_BIGINT;
      pExpr->base.param[0].nLen = sizeof(int64_t);
#endif
H
Haojun Liao 已提交
632
      pExpr->base.numOfParams = 1;
H
Haojun Liao 已提交
633 634
    }

635 636 637 638 639 640 641 642
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
      assert(pTableMetaInfo->pVgroupTables != NULL);
      if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
        SArray* p = buildVgroupTableByResult(pQueryInfo, pTableMetaInfo->pVgroupTables);
        tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables);
        pTableMetaInfo->pVgroupTables = p;
      } else {
        filterVgroupTables(pQueryInfo, pTableMetaInfo->pVgroupTables);
H
Haojun Liao 已提交
643 644 645
      }
    }

646
    subquerySetState(pNew, &pSql->subState, i, 0);
D
fix bug  
dapan1121 已提交
647
    
648
    size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
649 650
    tscDebug("0x%"PRIx64" subquery:0x%"PRIx64" tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s",
             pSql->self, pNew->self, 0, pTableMetaInfo->vgroupIndex, pQueryInfo->type, taosArrayGetSize(pQueryInfo->exprList),
651
             numOfCols, pQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name));
H
hjxilinx 已提交
652 653 654 655
  }
  
  //prepare the subqueries object failed, abort
  if (!success) {
656
    pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
D
dapan1121 已提交
657
    tscError("0x%"PRIx64" failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d", pSql->self,
H
Haojun Liao 已提交
658
        pSql->subState.numOfSub, pSql->res.code);
H
hjxilinx 已提交
659
    freeJoinSubqueryObj(pSql);
H
hjxilinx 已提交
660 661 662 663
    
    return pSql->res.code;
  }
  
H
Haojun Liao 已提交
664
  for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
Haojun Liao 已提交
665
    if (pSql->pSubs[i] == NULL) {
H
hjxilinx 已提交
666 667
      continue;
    }
H
Haojun Liao 已提交
668

H
Haojun Liao 已提交
669
    SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->pSubs[i]->cmd);
H
Haojun Liao 已提交
670
    executeQuery(pSql->pSubs[i], pQueryInfo);
H
hjxilinx 已提交
671 672 673 674 675
  }

  return TSDB_CODE_SUCCESS;
}

H
hjxilinx 已提交
676
void freeJoinSubqueryObj(SSqlObj* pSql) {
H
Haojun Liao 已提交
677
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
678 679 680 681 682
    SSqlObj* pSub = pSql->pSubs[i];
    if (pSub == NULL) {
      continue;
    }
    
683
    SJoinSupporter* p = pSub->param;
H
hjxilinx 已提交
684
    tscDestroyJoinSupporter(p);
H
hjxilinx 已提交
685

686 687
    taos_free_result(pSub);
    pSql->pSubs[i] = NULL;
H
hjxilinx 已提交
688 689
  }

D
fix bug  
dapan1121 已提交
690 691 692 693
  if (pSql->subState.states) {
    pthread_mutex_destroy(&pSql->subState.mutex);
  }
  
694
  tfree(pSql->subState.states);
H
Haojun Liao 已提交
695
  pSql->subState.numOfSub = 0;
H
hjxilinx 已提交
696 697
}

D
fix bug  
dapan1121 已提交
698
static int32_t quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
D
fix bug  
dapan1121 已提交
699
  if (subAndCheckDone(pSqlSub, pSqlObj, pSupporter->subqueryIndex)) {
H
Haojun Liao 已提交
700
    tscError("0x%"PRIx64" all subquery return and query failed, global code:%s", pSqlObj->self, tstrerror(pSqlObj->res.code));
H
hjxilinx 已提交
701
    freeJoinSubqueryObj(pSqlObj);
D
fix bug  
dapan1121 已提交
702
    return 0;
H
hjxilinx 已提交
703
  }
D
fix bug  
dapan1121 已提交
704

D
fix bug  
dapan1121 已提交
705
  return 1;
D
TD-2516  
dapan1121 已提交
706
  //tscDestroyJoinSupporter(pSupporter);
H
hjxilinx 已提交
707 708 709
}

// update the query time range according to the join results on timestamp
H
Haojun Liao 已提交
710
static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) {
H
Haojun Liao 已提交
711 712
  assert(pQueryInfo->window.skey <= win->skey && pQueryInfo->window.ekey >= win->ekey);
  pQueryInfo->window = *win;
H
hjxilinx 已提交
713 714
}

H
Haojun Liao 已提交
715 716 717
int32_t tidTagsCompar(const void* p1, const void* p2) {
  const STidTags* t1 = (const STidTags*) (p1);
  const STidTags* t2 = (const STidTags*) (p2);
718 719
  
  if (t1->vgId != t2->vgId) {
weixin_48148422's avatar
weixin_48148422 已提交
720 721
    return (t1->vgId > t2->vgId) ? 1 : -1;
  }
722

H
Haojun Liao 已提交
723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741
  tstr* tag1 = (tstr*) t1->tag;
  tstr* tag2 = (tstr*) t2->tag;

  if (tag1->len != tag2->len) {
    return (tag1->len > tag2->len)? 1: -1;
  }

  return strncmp(tag1->data, tag2->data, tag1->len);
}

int32_t tagValCompar(const void* p1, const void* p2) {
  const STidTags* t1 = (const STidTags*) varDataVal(p1);
  const STidTags* t2 = (const STidTags*) varDataVal(p2);

  tstr* tag1 = (tstr*) t1->tag;
  tstr* tag2 = (tstr*) t2->tag;

  if (tag1->len != tag2->len) {
    return (tag1->len > tag2->len)? 1: -1;
742
  }
H
Haojun Liao 已提交
743

H
Haojun Liao 已提交
744
  return memcmp(tag1->data, tag2->data, tag1->len);
745 746
}

H
Haojun Liao 已提交
747
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables) {
H
Haojun Liao 已提交
748 749
  SArray*   result = taosArrayInit(4, sizeof(SVgroupTableInfo));
  SArray*   vgTables = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
750 751
  STidTags* prev = NULL;

H
Haojun Liao 已提交
752 753 754
  size_t numOfTables = taosArrayGetSize(tables);
  for (size_t i = 0; i < numOfTables; i++) {
    STidTags* tt = taosArrayGet(tables, i);
weixin_48148422's avatar
weixin_48148422 已提交
755

H
Haojun Liao 已提交
756
    if (prev == NULL || tt->vgId != prev->vgId) {
757
      SVgroupsInfo* pvg = pTableMetaInfo->vgroupList;
weixin_48148422's avatar
weixin_48148422 已提交
758

H
Haojun Liao 已提交
759 760 761
      SVgroupTableInfo info = {{0}};
      for (int32_t m = 0; m < pvg->numOfVgroups; ++m) {
        if (tt->vgId == pvg->vgroups[m].vgId) {
S
TD-1732  
Shengliang Guan 已提交
762
          tscSVgroupInfoCopy(&info.vgInfo, &pvg->vgroups[m]);
763 764 765
          break;
        }
      }
766
      assert(info.vgInfo.numOfEps != 0);
weixin_48148422's avatar
weixin_48148422 已提交
767

H
Haojun Liao 已提交
768
      vgTables = taosArrayInit(4, sizeof(STableIdInfo));
weixin_48148422's avatar
weixin_48148422 已提交
769
      info.itemList = vgTables;
H
Haojun Liao 已提交
770 771 772

      if (taosArrayGetSize(result) > 0) {
        SVgroupTableInfo* prevGroup = taosArrayGet(result, taosArrayGetSize(result) - 1);
H
Haojun Liao 已提交
773
        tscDebug("0x%"PRIx64" vgId:%d, tables:%"PRIzu, pSql->self, prevGroup->vgInfo.vgId, taosArrayGetSize(prevGroup->itemList));
H
Haojun Liao 已提交
774 775
      }

H
Haojun Liao 已提交
776
      taosArrayPush(result, &info);
777
    }
weixin_48148422's avatar
weixin_48148422 已提交
778

H
Haojun Liao 已提交
779 780
    STableIdInfo item = {.uid = tt->uid, .tid = tt->tid, .key = INT64_MIN};
    taosArrayPush(vgTables, &item);
H
Haojun Liao 已提交
781

D
dapan1121 已提交
782
    tscTrace("0x%"PRIx64" tid:%d, uid:%"PRIu64",vgId:%d added", pSql->self, tt->tid, tt->uid, tt->vgId);
weixin_48148422's avatar
weixin_48148422 已提交
783
    prev = tt;
784
  }
weixin_48148422's avatar
weixin_48148422 已提交
785

H
Haojun Liao 已提交
786
  pTableMetaInfo->vgroupIndex = 0;
D
fix bug  
dapan1121 已提交
787 788 789 790 791 792
  
  if (taosArrayGetSize(result) <= 0) {
    pTableMetaInfo->pVgroupTables = NULL;
    taosArrayDestroy(result);
  } else {
    pTableMetaInfo->pVgroupTables = result;
H
Haojun Liao 已提交
793 794

    SVgroupTableInfo* g = taosArrayGet(result, taosArrayGetSize(result) - 1);
H
Haojun Liao 已提交
795
    tscDebug("0x%"PRIx64" vgId:%d, tables:%"PRIzu, pSql->self, g->vgInfo.vgId, taosArrayGetSize(g->itemList));
H
Haojun Liao 已提交
796
  }
797 798
}

799
static void issueTsCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* pParent) {
800 801 802
  SSqlCmd* pCmd = &pSql->cmd;
  tscClearSubqueryInfo(pCmd);
  tscFreeSqlResult(pSql);
H
Haojun Liao 已提交
803

H
Haojun Liao 已提交
804
  SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
H
Haojun Liao 已提交
805 806
  assert(pQueryInfo->numOfTables == 1);

807
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
808
  STimeWindow window = pQueryInfo->window;
809 810
  tscInitQueryInfo(pQueryInfo);

811
  pQueryInfo->window = window;
812 813 814 815 816 817 818 819 820
  TSDB_QUERY_CLEAR_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
  TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
  
  pCmd->command = TSDB_SQL_SELECT;
  pSql->fp = tscJoinQueryCallback;
  
  SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
  
  SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
H
Haojun Liao 已提交
821
  tscAddFuncInSelectClause(pQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL, getNewResColId(pCmd));
822 823
  
  // set the tags value for ts_comp function
H
Haojun Liao 已提交
824
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
Haojun Liao 已提交
825
    SExprInfo *pExpr = tscExprGet(pQueryInfo, 0);
826
    int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
H
Haojun Liao 已提交
827 828 829
    pExpr->base.param[0].i64 = tagColId;
    pExpr->base.param[0].nLen = sizeof(int64_t);
    pExpr->base.param[0].nType = TSDB_DATA_TYPE_BIGINT;
H
Haojun Liao 已提交
830
    pExpr->base.numOfParams = 1;
H
Haojun Liao 已提交
831 832
  }

833 834 835 836 837 838 839
  // add the filter tag column
  if (pSupporter->colList != NULL) {
    size_t s = taosArrayGetSize(pSupporter->colList);
    
    for (int32_t i = 0; i < s; ++i) {
      SColumn *pCol = taosArrayGetP(pSupporter->colList, i);
      
840
      if (pCol->info.flist.numOfFilters > 0) {  // copy to the pNew->cmd.colList if it is filtered.
841 842 843 844 845 846 847 848
        SColumn *p = tscColumnClone(pCol);
        taosArrayPush(pQueryInfo->colList, &p);
      }
    }
  }
  
  size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
  
849
  tscDebug(
H
Haojun Liao 已提交
850
      "0x%"PRIx64" subquery:0x%"PRIx64" tableIndex:%d, vgroupIndex:%d, numOfVgroups:%d, type:%d, ts_comp query to retrieve timestamps, "
S
TD-1057  
Shengliang Guan 已提交
851
      "numOfExpr:%" PRIzu ", colList:%" PRIzu ", numOfOutputFields:%d, name:%s",
H
Haojun Liao 已提交
852
      pParent->self, pSql->self, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pQueryInfo->type,
H
Haojun Liao 已提交
853
      tscNumOfExprs(pQueryInfo), numOfCols, pQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name));
854
  
855
  tscBuildAndSendRequest(pSql, NULL);
856 857
}

H
Haojun Liao 已提交
858
static bool checkForDuplicateTagVal(SSchema* pColSchema, SJoinSupporter* p1, SSqlObj* pPSqlObj) {
H
Haojun Liao 已提交
859 860 861
  for(int32_t i = 1; i < p1->num; ++i) {
    STidTags* prev = (STidTags*) varDataVal(p1->pIdTagList + (i - 1) * p1->tagSize);
    STidTags* p = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize);
862
    assert(prev->vgId >= 1 && p->vgId >= 1);
H
Haojun Liao 已提交
863 864

    if (doCompare(prev->tag, p->tag, pColSchema->type, pColSchema->bytes) == 0) {
D
dapan1121 已提交
865
      tscError("0x%"PRIx64" join tags have same value for different table, free all sub SqlObj and quit", pPSqlObj->self);
H
Haojun Liao 已提交
866
      pPSqlObj->res.code = TSDB_CODE_QRY_DUP_JOIN_KEY;
H
Haojun Liao 已提交
867 868 869 870 871 872 873 874
      return false;
    }
  }

  return true;
}


D
dapan1121 已提交
875 876
static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray* resList) {
  int16_t joinNum = pParentSql->subState.numOfSub;
H
Haojun Liao 已提交
877
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
878
  int16_t tagColId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
D
dapan1121 已提交
879
  SJoinSupporter* p0 = pParentSql->pSubs[0]->param;
D
dapan1121 已提交
880
  SMergeCtx ctxlist[TSDB_MAX_JOIN_TABLE_NUM] = {{0}};
D
dapan1121 已提交
881 882 883 884
  SMergeCtx* ctxStack[TSDB_MAX_JOIN_TABLE_NUM] = {0};

  // int16_t for padding
  int32_t size = p0->tagSize - sizeof(int16_t);
H
Haojun Liao 已提交
885

H
Haojun Liao 已提交
886
  SSchema* pColSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
D
dapan1121 已提交
887
  
H
Haojun Liao 已提交
888
  tscDebug("0x%"PRIx64" all subquery retrieve <tid, tags> complete, do tags match", pParentSql->self);
H
Haojun Liao 已提交
889

D
dapan1121 已提交
890 891
  for (int32_t i = 0; i < joinNum; i++) {
    SJoinSupporter* p = pParentSql->pSubs[i]->param;
H
Haojun Liao 已提交
892

D
dapan1121 已提交
893
    ctxlist[i].p = p;
D
fix bug  
dapan1121 已提交
894
    ctxlist[i].res = taosArrayInit(p->num, size);
H
Haojun Liao 已提交
895

D
dapan1121 已提交
896
    tscDebug("Join %d - num:%d", i, p->num);
H
Haojun Liao 已提交
897

D
dapan1121 已提交
898 899
    // sort according to the tag valu
    qsort(p->pIdTagList, p->num, p->tagSize, tagValCompar);
H
Haojun Liao 已提交
900

D
dapan1121 已提交
901 902 903 904 905
    if (!checkForDuplicateTagVal(pColSchema, p, pParentSql)) {
      for (int32_t j = 0; j <= i; j++) {
        taosArrayDestroy(ctxlist[j].res);
      }
      return TSDB_CODE_QRY_DUP_JOIN_KEY;
H
Haojun Liao 已提交
906 907
    }
  }
908

D
dapan1121 已提交
909
  int32_t slot = 0;
D
dapan1121 已提交
910
  size_t tableNum = 0;
D
dapan1121 已提交
911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931
  int16_t* tableMIdx = 0;
  int32_t equalNum = 0;
  int32_t stackidx = 0;
  int32_t mergeDone = 0;
  SMergeCtx* ctx = NULL;
  SMergeCtx* pctx = NULL;
  STidTags* cur = NULL;
  STidTags* prev = NULL;
  SArray*   tagCond = NULL;

  for (int16_t tidx = 0; tidx < joinNum; tidx++) {
    pctx = &ctxlist[tidx];
    if (pctx->compared) {
      continue;
    }

    assert(pctx->idx == 0 && taosArrayGetSize(pctx->res) == 0);

    tagCond = pQueryInfo->tagCond.joinInfo.joinTables[tidx]->tagJoin;

    tableNum = taosArrayGetSize(tagCond);
D
fix bug  
dapan1121 已提交
932 933 934 935 936 937 938
    assert(tableNum >= 2);

    for (int32_t i = 0; i < tableNum; ++i) {
      tableMIdx = taosArrayGet(tagCond, i);
      SMergeCtx* tctx = &ctxlist[*tableMIdx];
      tctx->compared = 1;
    }
D
dapan1121 已提交
939 940 941 942 943 944 945 946 947 948 949 950 951 952

    for (int32_t i = 0; i < tableNum; ++i) {
      tableMIdx = taosArrayGet(tagCond, i);
      SMergeCtx* tctx = &ctxlist[*tableMIdx];
      if (tctx->p->num <= 0 || tctx->p->pIdTagList == NULL) {
        mergeDone = 1;
        break;
      }
    }

    if (mergeDone) {
      mergeDone = 0;
      continue;
    }
H
Haojun Liao 已提交
953

D
fix bug  
dapan1121 已提交
954
    tableMIdx = taosArrayGet(tagCond, slot);
H
Haojun Liao 已提交
955

D
fix bug  
dapan1121 已提交
956 957
    pctx = &ctxlist[*tableMIdx];

D
dapan1121 已提交
958 959 960
    prev = (STidTags*) varDataVal(pctx->p->pIdTagList + pctx->idx * pctx->p->tagSize);

    ctxStack[stackidx++] = pctx;
H
Haojun Liao 已提交
961

D
dapan1121 已提交
962 963 964
    tableMIdx = taosArrayGet(tagCond, ++slot);

    equalNum = 1;
H
Haojun Liao 已提交
965

D
dapan1121 已提交
966 967
    while (1) {
      ctx = &ctxlist[*tableMIdx];
H
Haojun Liao 已提交
968

D
dapan1121 已提交
969 970 971 972 973 974 975 976 977 978 979
      cur = (STidTags*) varDataVal(ctx->p->pIdTagList + ctx->idx * ctx->p->tagSize);

      assert(cur->tid != 0 && prev->tid != 0);

      ctxStack[stackidx++] = ctx;

      int32_t ret = doCompare(prev->tag, cur->tag, pColSchema->type, pColSchema->bytes);
      if (ret == 0) {
        if (++equalNum < tableNum) {
          prev = cur;
          pctx = ctx;
H
Haojun Liao 已提交
980

D
dapan1121 已提交
981 982 983 984 985 986 987 988
          if (++slot >= tableNum) {
            slot = 0;
          }

          tableMIdx = taosArrayGet(tagCond, slot);
          continue;
        }
        
H
Haojun Liao 已提交
989
        tscDebug("0x%"PRIx64" tag matched, vgId:%d, val:%d, tid:%d, uid:%"PRIu64", tid:%d, uid:%"PRIu64, pParentSql->self, prev->vgId,
D
dapan1121 已提交
990 991 992
                 *(int*) prev->tag, prev->tid, prev->uid, cur->tid, cur->uid);

        assert(stackidx == tableNum);
H
Haojun Liao 已提交
993

D
dapan1121 已提交
994 995 996 997
        for (int32_t i = 0; i < stackidx; ++i) {
          SMergeCtx* tctx = ctxStack[i];
          prev = (STidTags*) varDataVal(tctx->p->pIdTagList + tctx->idx * tctx->p->tagSize);

D
fix bug  
dapan1121 已提交
998
          taosArrayPush(tctx->res, prev);
D
dapan1121 已提交
999 1000 1001 1002
        }

        for (int32_t i = 0; i < stackidx; ++i) {
          SMergeCtx* tctx = ctxStack[i];
H
Haojun Liao 已提交
1003

D
dapan1121 已提交
1004 1005 1006 1007 1008 1009 1010 1011 1012
          if (++tctx->idx >= tctx->p->num) {
            mergeDone = 1;
            break;
          }
        }

        if (mergeDone) {
          break;
        }
H
Haojun Liao 已提交
1013

D
dapan1121 已提交
1014 1015
        stackidx = 0;
        equalNum = 1;
H
Haojun Liao 已提交
1016

D
dapan1121 已提交
1017 1018 1019 1020
        prev = (STidTags*) varDataVal(pctx->p->pIdTagList + pctx->idx * pctx->p->tagSize);

        ctxStack[stackidx++] = pctx;
      } else if (ret > 0) {
D
fix bug  
dapan1121 已提交
1021
        stackidx--;
H
Haojun Liao 已提交
1022

D
dapan1121 已提交
1023 1024 1025 1026
        if (++ctx->idx >= ctx->p->num) {
          break;
        }
      } else {
D
fix bug  
dapan1121 已提交
1027
        stackidx--;
H
Haojun Liao 已提交
1028

D
dapan1121 已提交
1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040
        for (int32_t i = 0; i < stackidx; ++i) {
          SMergeCtx* tctx = ctxStack[i];
          if (++tctx->idx >= tctx->p->num) {
            mergeDone = 1;
            break;
          }
        }

        if (mergeDone) {
          break;
        }

H
Haojun Liao 已提交
1041
        stackidx = 0;
D
dapan1121 已提交
1042
        equalNum = 1;
H
Haojun Liao 已提交
1043

D
dapan1121 已提交
1044 1045 1046 1047
        prev = (STidTags*) varDataVal(pctx->p->pIdTagList + pctx->idx * pctx->p->tagSize);
        ctxStack[stackidx++] = pctx;
      }

H
Haojun Liao 已提交
1048
    }
D
dapan1121 已提交
1049 1050 1051

    slot = 0;
    mergeDone = 0;
D
fix bug  
dapan1121 已提交
1052
    stackidx = 0;
H
Haojun Liao 已提交
1053 1054
  }

D
dapan1121 已提交
1055 1056 1057 1058
  for (int32_t i = 0; i < joinNum; ++i) {
    // reorganize the tid-tag value according to both the vgroup id and tag values
    // sort according to the tag value
    size_t num = taosArrayGetSize(ctxlist[i].res);
H
Haojun Liao 已提交
1059

D
dapan1121 已提交
1060
    qsort((ctxlist[i].res)->pData, num, size, tidTagsCompar);
H
Haojun Liao 已提交
1061

D
dapan1121 已提交
1062
    taosArrayPush(resList, &ctxlist[i].res);
H
Haojun Liao 已提交
1063

H
Haojun Liao 已提交
1064
    tscDebug("0x%"PRIx64" tags match complete, result num: %"PRIzu, pParentSql->self, num);
H
Haojun Liao 已提交
1065 1066
  }

1067
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1068 1069
}

D
dapan1121 已提交
1070
bool emptyTagList(SArray* resList, int32_t size) {
D
dapan1121 已提交
1071
  size_t rsize = taosArrayGetSize(resList);
D
dapan1121 已提交
1072 1073
  if (rsize != size) {
    return true;
H
Haojun Liao 已提交
1074 1075
  }

D
dapan1121 已提交
1076 1077 1078 1079 1080 1081 1082 1083
  for (int32_t i = 0; i < size; ++i) {
    SArray** s = taosArrayGet(resList, i);
    if (taosArrayGetSize(*s) <= 0) {
      return true;
    }
  }

  return false;
H
Haojun Liao 已提交
1084 1085
}

H
Haojun Liao 已提交
1086
static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
1087
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
1088

H
hjxilinx 已提交
1089
  SSqlObj* pParentSql = pSupporter->pObj;
H
Haojun Liao 已提交
1090

H
hjxilinx 已提交
1091 1092
  SSqlObj* pSql = (SSqlObj*)tres;
  SSqlCmd* pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1093 1094
  SSqlRes* pRes = &pSql->res;

H
Haojun Liao 已提交
1095
  SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
H
Haojun Liao 已提交
1096 1097

  // todo, the type may not include TSDB_QUERY_TYPE_TAG_FILTER_QUERY
H
Haojun Liao 已提交
1098
  assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY));
H
Haojun Liao 已提交
1099

D
dapan1121 已提交
1100
  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
1101
    tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, numOfRows, pParentSql->res.code);
D
fix bug  
dapan1121 已提交
1102 1103 1104
    if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
      return;
    }
D
dapan1121 已提交
1105 1106 1107 1108 1109 1110

    tscAsyncResultOnError(pParentSql);

    return;
  }

H
Haojun Liao 已提交
1111 1112 1113
  // check for the error code firstly
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    // todo retry if other subqueries are not failed
H
Haojun Liao 已提交
1114

H
Haojun Liao 已提交
1115
    assert(numOfRows < 0 && numOfRows == taos_errno(pSql));
D
dapan1121 已提交
1116
    tscError("0x%"PRIx64" sub query failed, code:%s, index:%d", pSql->self, tstrerror(numOfRows), pSupporter->subqueryIndex);
H
Haojun Liao 已提交
1117

H
Haojun Liao 已提交
1118
    pParentSql->res.code = numOfRows;
D
fix bug  
dapan1121 已提交
1119 1120 1121
    if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
      return;
    }
H
Haojun Liao 已提交
1122

H
Haojun Liao 已提交
1123
    tscAsyncResultOnError(pParentSql);
H
Haojun Liao 已提交
1124 1125
    return;
  }
H
Haojun Liao 已提交
1126

H
Haojun Liao 已提交
1127 1128
  // keep the results in memory
  if (numOfRows > 0) {
1129
    size_t validLen = (size_t)(pSupporter->tagSize * pRes->numOfRows);
H
Haojun Liao 已提交
1130
    size_t length = pSupporter->totalLen + validLen;
H
Haojun Liao 已提交
1131

H
Haojun Liao 已提交
1132 1133 1134
    // todo handle memory error
    char* tmp = realloc(pSupporter->pIdTagList, length);
    if (tmp == NULL) {
D
dapan1121 已提交
1135
      tscError("0x%"PRIx64" failed to malloc memory", pSql->self);
H
Haojun Liao 已提交
1136

H
Haojun Liao 已提交
1137
      pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
D
fix bug  
dapan1121 已提交
1138 1139 1140
      if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
        return;
      }
H
Haojun Liao 已提交
1141

H
Haojun Liao 已提交
1142
      tscAsyncResultOnError(pParentSql);
H
Haojun Liao 已提交
1143 1144
      return;
    }
H
Haojun Liao 已提交
1145

H
Haojun Liao 已提交
1146
    pSupporter->pIdTagList = tmp;
H
Haojun Liao 已提交
1147

H
Haojun Liao 已提交
1148
    memcpy(pSupporter->pIdTagList + pSupporter->totalLen, pRes->data, validLen);
S
Shengliang Guan 已提交
1149 1150
    pSupporter->totalLen += (int32_t)validLen;
    pSupporter->num += (int32_t)pRes->numOfRows;
H
Haojun Liao 已提交
1151

H
Haojun Liao 已提交
1152 1153 1154 1155 1156 1157
    // query not completed, continue to retrieve tid + tag tuples
    if (!pRes->completed) {
      taos_fetch_rows_a(tres, tidTagRetrieveCallback, param);
      return;
    }
  }
H
Haojun Liao 已提交
1158

H
Haojun Liao 已提交
1159 1160 1161 1162
  // data in current vnode has all returned to client, try next vnode if exits
  // <tid + tag> tuples have been retrieved to client, try <tid + tag> tuples from the next vnode
  if (hasMoreVnodesToTry(pSql)) {
    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
1163

H
Haojun Liao 已提交
1164 1165 1166
    int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
    pTableMetaInfo->vgroupIndex += 1;
    assert(pTableMetaInfo->vgroupIndex < totalVgroups);
H
Haojun Liao 已提交
1167

H
Haojun Liao 已提交
1168 1169
    tscDebug("0x%"PRIx64" tid_tag from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%d",
             pSql->self, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, pSupporter->num);
H
Haojun Liao 已提交
1170

H
Haojun Liao 已提交
1171 1172
    pCmd->command = TSDB_SQL_SELECT;
    tscResetForNextRetrieve(&pSql->res);
H
Haojun Liao 已提交
1173

H
Haojun Liao 已提交
1174 1175
    // set the callback function
    pSql->fp = tscJoinQueryCallback;
1176
    tscBuildAndSendRequest(pSql, NULL);
H
Haojun Liao 已提交
1177 1178
    return;
  }
H
Haojun Liao 已提交
1179

H
Haojun Liao 已提交
1180 1181
  // no data exists in next vnode, mark the <tid, tags> query completed
  // only when there is no subquery exits any more, proceeds to get the intersect of the <tid, tags> tuple sets.
D
fix bug  
dapan1121 已提交
1182
  if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
H
Haojun Liao 已提交
1183
    tscDebug("0x%"PRIx64" tagRetrieve:%p,%d completed, total:%d", pParentSql->self, tres, pSupporter->subqueryIndex, pParentSql->subState.numOfSub);
H
Haojun Liao 已提交
1184
    return;
1185
  }  
H
Haojun Liao 已提交
1186

D
dapan1121 已提交
1187 1188 1189
  SArray* resList = taosArrayInit(pParentSql->subState.numOfSub, sizeof(SArray *));

  int32_t code = getIntersectionOfTableTuple(pQueryInfo, pParentSql, resList);
1190 1191 1192
  if (code != TSDB_CODE_SUCCESS) {
    freeJoinSubqueryObj(pParentSql);
    pParentSql->res.code = code;
H
Haojun Liao 已提交
1193
    tscAsyncResultOnError(pParentSql);
1194

D
dapan1121 已提交
1195
    taosArrayDestroy(resList);
1196 1197 1198
    return;
  }

D
dapan1121 已提交
1199
  if (emptyTagList(resList, pParentSql->subState.numOfSub)) {  // no results,return.
H
Haojun Liao 已提交
1200 1201
    assert(pParentSql->fp != tscJoinQueryCallback);

H
Haojun Liao 已提交
1202 1203
    tscDebug("0x%"PRIx64" tag intersect does not generated qualified tables for join, free all sub SqlObj and quit",
        pParentSql->self);
H
Haojun Liao 已提交
1204
    freeJoinSubqueryObj(pParentSql);
H
Haojun Liao 已提交
1205

1206 1207
    // set no result command
    pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
H
Haojun Liao 已提交
1208 1209
    assert(pParentSql->fp != tscJoinQueryCallback);

H
Haojun Liao 已提交
1210
    (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
B
Bomin Zhang 已提交
1211
  } else {
H
Haojun Liao 已提交
1212
    for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) {
D
dapan1121 已提交
1213 1214 1215
      // proceed to for ts_comp query
      SSqlCmd* pSubCmd = &pParentSql->pSubs[m]->cmd;
      SArray** s = taosArrayGet(resList, m);
H
Haojun Liao 已提交
1216

H
Haojun Liao 已提交
1217
      SQueryInfo*     pQueryInfo1 = tscGetQueryInfo(pSubCmd);
H
Haojun Liao 已提交
1218
      STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo1, 0);
D
dapan1121 已提交
1219
      tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo, *s);
H
Haojun Liao 已提交
1220

D
dapan1121 已提交
1221 1222
      SSqlObj* psub = pParentSql->pSubs[m];
      ((SJoinSupporter*)psub->param)->pVgroupTables =  tscVgroupTableInfoDup(pTableMetaInfo->pVgroupTables);
H
Haojun Liao 已提交
1223

D
dapan1121 已提交
1224
      memset(pParentSql->subState.states, 0, sizeof(pParentSql->subState.states[0]) * pParentSql->subState.numOfSub);
H
Haojun Liao 已提交
1225
      tscDebug("0x%"PRIx64" reset all sub states to 0", pParentSql->self);
D
dapan1121 已提交
1226 1227
      
      issueTsCompQuery(psub, psub->param, pParentSql);
B
Bomin Zhang 已提交
1228
    }
H
Haojun Liao 已提交
1229
  }
H
Haojun Liao 已提交
1230

D
dapan1121 已提交
1231
  size_t rsize = taosArrayGetSize(resList);
D
dapan1121 已提交
1232 1233 1234 1235
  for (int32_t i = 0; i < rsize; ++i) {
    SArray** s = taosArrayGet(resList, i);
    if (*s) {
      taosArrayDestroy(*s);
B
Bomin Zhang 已提交
1236
    }
H
Haojun Liao 已提交
1237
  }
B
Bomin Zhang 已提交
1238

D
dapan1121 已提交
1239
  taosArrayDestroy(resList);
H
Haojun Liao 已提交
1240
}
H
Haojun Liao 已提交
1241

H
Haojun Liao 已提交
1242 1243
static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
1244

H
Haojun Liao 已提交
1245 1246 1247 1248 1249 1250
  SSqlObj* pParentSql = pSupporter->pObj;

  SSqlObj* pSql = (SSqlObj*)tres;
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;

H
Haojun Liao 已提交
1251
  SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
H
Haojun Liao 已提交
1252 1253
  assert(!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE));

D
dapan1121 已提交
1254
  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
1255
    tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, numOfRows, pParentSql->res.code);
D
fix bug  
dapan1121 已提交
1256 1257 1258
    if (quitAllSubquery(pSql, pParentSql, pSupporter)){
      return;
    }
D
dapan1121 已提交
1259 1260 1261 1262 1263 1264

    tscAsyncResultOnError(pParentSql);

    return;
  }

H
Haojun Liao 已提交
1265 1266 1267 1268
  // check for the error code firstly
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    // todo retry if other subqueries are not failed yet 
    assert(numOfRows < 0 && numOfRows == taos_errno(pSql));
D
dapan1121 已提交
1269
    tscError("0x%"PRIx64" sub query failed, code:%s, index:%d", pSql->self, tstrerror(numOfRows), pSupporter->subqueryIndex);
H
Haojun Liao 已提交
1270 1271

    pParentSql->res.code = numOfRows;
D
fix bug  
dapan1121 已提交
1272 1273 1274
    if (quitAllSubquery(pSql, pParentSql, pSupporter)){
      return;
    }
H
Haojun Liao 已提交
1275

H
Haojun Liao 已提交
1276
    tscAsyncResultOnError(pParentSql);
1277 1278
    return;
  }
H
Haojun Liao 已提交
1279

H
Haojun Liao 已提交
1280
  if (numOfRows > 0) {  // write the compressed timestamp to disk file
D
fix bug  
dapan1121 已提交
1281
    if(pSupporter->f == NULL) {
S
TD-1207  
Shengliang Guan 已提交
1282
      pSupporter->f = fopen(pSupporter->path, "wb");
D
fix bug  
dapan1121 已提交
1283 1284

      if (pSupporter->f == NULL) {
D
dapan1121 已提交
1285
        tscError("0x%"PRIx64" failed to create tmp file:%s, reason:%s", pSql->self, pSupporter->path, strerror(errno));
D
fix bug  
dapan1121 已提交
1286 1287
        pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);

D
fix bug  
dapan1121 已提交
1288 1289 1290
        if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
          return;
        }
D
fix bug  
dapan1121 已提交
1291 1292 1293 1294 1295 1296 1297
        
        tscAsyncResultOnError(pParentSql);

        return;
      }
    }
      
1298
    fwrite(pRes->data, (size_t)pRes->numOfRows, 1, pSupporter->f);
H
Haojun Liao 已提交
1299 1300 1301 1302 1303
    fclose(pSupporter->f);
    pSupporter->f = NULL;

    STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true);
    if (pBuf == NULL) {  // in error process, close the fd
D
dapan1121 已提交
1304
      tscError("0x%"PRIx64" invalid ts comp file from vnode, abort subquery, file size:%d", pSql->self, numOfRows);
H
Haojun Liao 已提交
1305 1306

      pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
D
fix bug  
dapan1121 已提交
1307 1308 1309
      if (quitAllSubquery(pSql, pParentSql, pSupporter)){
        return;
      }
D
fix bug  
dapan1121 已提交
1310
      
H
Haojun Liao 已提交
1311
      tscAsyncResultOnError(pParentSql);
H
Haojun Liao 已提交
1312

H
hjxilinx 已提交
1313 1314
      return;
    }
1315

H
Haojun Liao 已提交
1316
    if (pSupporter->pTSBuf == NULL) {
H
Haojun Liao 已提交
1317
      tscDebug("0x%"PRIx64" create tmp file for ts block:%s, size:%d bytes", pSql->self, pBuf->path, numOfRows);
H
Haojun Liao 已提交
1318 1319 1320
      pSupporter->pTSBuf = pBuf;
    } else {
      assert(pQueryInfo->numOfTables == 1);  // for subquery, only one
H
Haojun Liao 已提交
1321
      tsBufMerge(pSupporter->pTSBuf, pBuf);
H
Haojun Liao 已提交
1322
      tsBufDestroy(pBuf);
H
Haojun Liao 已提交
1323
    }
H
hjxilinx 已提交
1324

H
Haojun Liao 已提交
1325 1326
    // continue to retrieve ts-comp data from vnode
    if (!pRes->completed) {
S
slguan 已提交
1327
      taosGetTmpfilePath("ts-join", pSupporter->path);
S
TD-1207  
Shengliang Guan 已提交
1328
      pSupporter->f = fopen(pSupporter->path, "wb");
H
Haojun Liao 已提交
1329
      pRes->row = pRes->numOfRows;
H
hjxilinx 已提交
1330

H
Haojun Liao 已提交
1331 1332
      taos_fetch_rows_a(tres, tsCompRetrieveCallback, param);
      return;
H
hjxilinx 已提交
1333
    }
H
Haojun Liao 已提交
1334
  }
H
Haojun Liao 已提交
1335

H
Haojun Liao 已提交
1336 1337
  if (hasMoreVnodesToTry(pSql)) {
    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
1338

H
Haojun Liao 已提交
1339 1340 1341
    int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
    pTableMetaInfo->vgroupIndex += 1;
    assert(pTableMetaInfo->vgroupIndex < totalVgroups);
H
Haojun Liao 已提交
1342

H
Haojun Liao 已提交
1343 1344
    tscDebug("0x%"PRIx64" results from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%" PRId64,
             pSql->self, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups,
H
Haojun Liao 已提交
1345
             pRes->numOfClauseTotal);
H
Haojun Liao 已提交
1346

H
Haojun Liao 已提交
1347 1348
    pCmd->command = TSDB_SQL_SELECT;
    tscResetForNextRetrieve(&pSql->res);
H
Haojun Liao 已提交
1349

H
Haojun Liao 已提交
1350
    assert(pSupporter->f == NULL);
S
slguan 已提交
1351
    taosGetTmpfilePath("ts-join", pSupporter->path);
H
Haojun Liao 已提交
1352 1353
    
    // TODO check for failure
S
TD-1207  
Shengliang Guan 已提交
1354
    pSupporter->f = fopen(pSupporter->path, "wb");
H
Haojun Liao 已提交
1355
    pRes->row = pRes->numOfRows;
H
Haojun Liao 已提交
1356

H
Haojun Liao 已提交
1357 1358
    // set the callback function
    pSql->fp = tscJoinQueryCallback;
1359
    tscBuildAndSendRequest(pSql, NULL);
H
Haojun Liao 已提交
1360 1361
    return;
  }
H
Haojun Liao 已提交
1362

D
fix bug  
dapan1121 已提交
1363
  if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
H
Haojun Liao 已提交
1364
    return;
1365
  }  
H
hjxilinx 已提交
1366

H
Haojun Liao 已提交
1367
  tscDebug("0x%"PRIx64" all subquery retrieve ts complete, do ts block intersect", pParentSql->self);
H
hjxilinx 已提交
1368

H
Haojun Liao 已提交
1369
  STimeWindow win = TSWINDOW_INITIALIZER;
D
dapan1121 已提交
1370
  int64_t num = doTSBlockIntersect(pParentSql, &win);
H
Haojun Liao 已提交
1371
  if (num <= 0) {  // no result during ts intersect
H
Haojun Liao 已提交
1372
    tscDebug("0x%"PRIx64" no results generated in ts intersection, free all sub SqlObj and quit", pParentSql->self);
H
Haojun Liao 已提交
1373
    freeJoinSubqueryObj(pParentSql);
1374 1375 1376

    // set no result command
    pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
H
Haojun Liao 已提交
1377
    (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
H
Haojun Liao 已提交
1378 1379 1380 1381
    return;
  }

  // launch the query the retrieve actual results from vnode along with the filtered timestamp
H
Haojun Liao 已提交
1382
  SQueryInfo* pPQueryInfo = tscGetQueryInfo(&pParentSql->cmd);
H
Haojun Liao 已提交
1383
  updateQueryTimeRange(pPQueryInfo, &win);
H
Haojun Liao 已提交
1384 1385

  //update the vgroup that involved in real data query
H
Haojun Liao 已提交
1386
  tscLaunchRealSubqueries(pParentSql);
H
Haojun Liao 已提交
1387
}
H
Haojun Liao 已提交
1388

H
Haojun Liao 已提交
1389 1390
static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfRows) {
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
1391

H
Haojun Liao 已提交
1392
  SSqlObj* pParentSql = pSupporter->pObj;
H
Haojun Liao 已提交
1393

H
Haojun Liao 已提交
1394 1395 1396
  SSqlObj* pSql = (SSqlObj*)tres;
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;
H
Haojun Liao 已提交
1397

H
Haojun Liao 已提交
1398
  SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
D
dapan1121 已提交
1399 1400

  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
1401
    tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, numOfRows, pParentSql->res.code);
D
fix bug  
dapan1121 已提交
1402 1403 1404 1405
    if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
      return;
    }
    
D
dapan1121 已提交
1406 1407 1408 1409 1410
    tscAsyncResultOnError(pParentSql);
    return;
  }

  
H
Haojun Liao 已提交
1411 1412
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(numOfRows == taos_errno(pSql));
H
hjxilinx 已提交
1413

H
Haojun Liao 已提交
1414
    pParentSql->res.code = numOfRows;
D
dapan1121 已提交
1415
    tscError("0x%"PRIx64" retrieve failed, index:%d, code:%s", pSql->self, pSupporter->subqueryIndex, tstrerror(numOfRows));
H
Haojun Liao 已提交
1416

H
Haojun Liao 已提交
1417
    tscAsyncResultOnError(pParentSql);
H
Haojun Liao 已提交
1418
    return;
H
Haojun Liao 已提交
1419
  }
H
Haojun Liao 已提交
1420

H
Haojun Liao 已提交
1421 1422 1423
  if (numOfRows >= 0) {
    pRes->numOfTotal += pRes->numOfRows;
  }
H
Haojun Liao 已提交
1424

H
Haojun Liao 已提交
1425
  SSubqueryState* pState = &pParentSql->subState;
H
Haojun Liao 已提交
1426 1427 1428
  if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) {
    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
    assert(pQueryInfo->numOfTables == 1);
H
Haojun Liao 已提交
1429

H
Haojun Liao 已提交
1430
    // for projection query, need to try next vnode if current vnode is exhausted
H
Haojun Liao 已提交
1431 1432
    int32_t numOfVgroups = 0;  // TODO refactor
    if (pTableMetaInfo->pVgroupTables != NULL) {
S
Shengliang Guan 已提交
1433
      numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
H
Haojun Liao 已提交
1434 1435 1436 1437 1438
    } else {
      numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
    }

    if ((++pTableMetaInfo->vgroupIndex) < numOfVgroups) {
H
Haojun Liao 已提交
1439
      tscDebug("0x%"PRIx64" no result in current vnode anymore, try next vnode, vgIndex:%d", pSql->self, pTableMetaInfo->vgroupIndex);
H
Haojun Liao 已提交
1440 1441
      pSql->cmd.command = TSDB_SQL_SELECT;
      pSql->fp = tscJoinQueryCallback;
H
Haojun Liao 已提交
1442

1443
      tscBuildAndSendRequest(pSql, NULL);
H
Haojun Liao 已提交
1444
      return;
H
Haojun Liao 已提交
1445
    } else {
H
Haojun Liao 已提交
1446
      tscDebug("0x%"PRIx64" no result in current subquery anymore", pSql->self);
H
Haojun Liao 已提交
1447 1448 1449
    }
  }

D
fix bug  
dapan1121 已提交
1450
  if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
H
Haojun Liao 已提交
1451
    tscDebug("0x%"PRIx64" sub:0x%"PRIx64",%d completed, total:%d", pParentSql->self, pSql->self, pSupporter->subqueryIndex, pState->numOfSub);
H
Haojun Liao 已提交
1452 1453 1454
    return;
  }

H
Haojun Liao 已提交
1455
  tscDebug("0x%"PRIx64" all %d secondary subqueries retrieval completed, code:%d", pSql->self, pState->numOfSub, pParentSql->res.code);
H
Haojun Liao 已提交
1456 1457 1458 1459 1460 1461 1462

  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    freeJoinSubqueryObj(pParentSql);
    pParentSql->res.completed = true;
  }

  // update the records for each subquery in parent sql object.
1463
  bool stableQuery = tscIsTwoStageSTableQuery(pQueryInfo, 0);
H
Haojun Liao 已提交
1464
  for (int32_t i = 0; i < pState->numOfSub; ++i) {
H
Haojun Liao 已提交
1465
    if (pParentSql->pSubs[i] == NULL) {
H
Haojun Liao 已提交
1466
      tscDebug("0x%"PRIx64" %p sub:%d not retrieve data", pParentSql->self, NULL, i);
H
Haojun Liao 已提交
1467
      continue;
H
hjxilinx 已提交
1468
    }
H
Haojun Liao 已提交
1469 1470

    SSqlRes* pRes1 = &pParentSql->pSubs[i]->res;
H
Haojun Liao 已提交
1471

D
dapan1121 已提交
1472 1473
    pParentSql->res.precision = pRes1->precision;

H
Haojun Liao 已提交
1474
    if (pRes1->row > 0 && pRes1->numOfRows > 0) {
H
Haojun Liao 已提交
1475 1476
      tscDebug("0x%"PRIx64" sub:0x%"PRIx64" index:%d numOfRows:%d total:%"PRId64 " (not retrieve)", pParentSql->self,
          pParentSql->pSubs[i]->self, i, pRes1->numOfRows, pRes1->numOfTotal);
H
Haojun Liao 已提交
1477 1478
      assert(pRes1->row < pRes1->numOfRows);
    } else {
1479
      if (!stableQuery) {
1480 1481 1482
        pRes1->numOfClauseTotal += pRes1->numOfRows;
      }

H
Haojun Liao 已提交
1483 1484
      tscDebug("0x%"PRIx64" sub:0x%"PRIx64" index:%d numOfRows:%d total:%"PRId64, pParentSql->self,
          pParentSql->pSubs[i]->self, i, pRes1->numOfRows, pRes1->numOfTotal);
H
Haojun Liao 已提交
1485
    }
H
hjxilinx 已提交
1486
  }
H
Haojun Liao 已提交
1487 1488 1489

  // data has retrieved to client, build the join results
  tscBuildResFromSubqueries(pParentSql);
H
hjxilinx 已提交
1490 1491
}

1492
void tscFetchDatablockForSubquery(SSqlObj* pSql) {
H
Haojun Liao 已提交
1493
  assert(pSql->subState.numOfSub >= 1);
H
hjxilinx 已提交
1494
  
H
hjxilinx 已提交
1495
  int32_t numOfFetch = 0;
H
Haojun Liao 已提交
1496 1497
  bool    hasData = true;
  bool    reachLimit = false;
H
Haojun Liao 已提交
1498 1499

  // if the subquery is NULL, it does not involved in the final result generation
H
Haojun Liao 已提交
1500
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
1501 1502
    SSqlObj* pSub = pSql->pSubs[i];
    if (pSub == NULL) {
H
hjxilinx 已提交
1503 1504
      continue;
    }
H
Haojun Liao 已提交
1505

H
hjxilinx 已提交
1506
    SSqlRes *pRes = &pSub->res;
H
Haojun Liao 已提交
1507

H
Haojun Liao 已提交
1508
    SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd);
H
Haojun Liao 已提交
1509 1510
    if (!tscHasReachLimitation(pQueryInfo, pRes)) {
      if (pRes->row >= pRes->numOfRows) {
H
Haojun Liao 已提交
1511
        // no data left in current result buffer
H
Haojun Liao 已提交
1512 1513
        hasData = false;

H
Haojun Liao 已提交
1514 1515
        // The current query is completed for the active vnode, try next vnode if exists
        // If it is completed, no need to fetch anymore.
H
Haojun Liao 已提交
1516 1517
        if (!pRes->completed) {
          numOfFetch++;
H
Haojun Liao 已提交
1518
        }
H
hjxilinx 已提交
1519
      }
H
Haojun Liao 已提交
1520 1521
    } else {  // has reach the limitation, no data anymore
      if (pRes->row >= pRes->numOfRows) {
H
Haojun Liao 已提交
1522 1523
        reachLimit = true;
        hasData    = false;
H
Haojun Liao 已提交
1524 1525
        break;
      }
H
hjxilinx 已提交
1526
    }
H
Haojun Liao 已提交
1527
  }
H
hjxilinx 已提交
1528

H
hjxilinx 已提交
1529 1530 1531 1532
  // has data remains in client side, and continue to return data to app
  if (hasData) {
    tscBuildResFromSubqueries(pSql);
    return;
H
Haojun Liao 已提交
1533
  }
H
Haojun Liao 已提交
1534

H
Haojun Liao 已提交
1535 1536
  // If at least one subquery is completed in current vnode, try the next vnode in case of multi-vnode
  // super table projection query.
1537 1538 1539 1540 1541 1542 1543
  if (reachLimit) {
    pSql->res.completed = true;
    freeJoinSubqueryObj(pSql);

    if (pSql->res.code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, 0);
    } else {
H
Haojun Liao 已提交
1544
      tscAsyncResultOnError(pSql);
1545 1546 1547 1548 1549 1550
    }

    return;
  }

  if (numOfFetch <= 0) {
H
Haojun Liao 已提交
1551 1552
    bool tryNextVnode = false;

1553 1554 1555 1556 1557 1558 1559
    bool orderedPrjQuery = false;
    for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
      SSqlObj* pSub = pSql->pSubs[i];
      if (pSub == NULL) {
        continue;
      }

H
Haojun Liao 已提交
1560
      SQueryInfo* p = tscGetQueryInfo(&pSub->cmd);
1561 1562 1563 1564 1565
      orderedPrjQuery = tscNonOrderedProjectionQueryOnSTable(p, 0);
      if (orderedPrjQuery) {
        break;
      }
    }
H
Haojun Liao 已提交
1566

D
fix bug  
dapan1121 已提交
1567

1568
    if (orderedPrjQuery) {
1569
      for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
Haojun Liao 已提交
1570 1571
        SSqlObj* pSub = pSql->pSubs[i];
        if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) {
D
fix bug  
dapan1121 已提交
1572
          subquerySetState(pSub, &pSql->subState, i, 0);
H
Haojun Liao 已提交
1573 1574 1575
        }
      }
    }
D
fix bug  
dapan1121 已提交
1576
    
H
Haojun Liao 已提交
1577 1578 1579 1580 1581 1582 1583

    for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
      SSqlObj* pSub = pSql->pSubs[i];
      if (pSub == NULL) {
        continue;
      }

H
Haojun Liao 已提交
1584
      SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd);
H
Haojun Liao 已提交
1585

1586 1587
      if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && pSub->res.row >= pSub->res.numOfRows &&
          pSub->res.completed) {
H
Haojun Liao 已提交
1588 1589 1590 1591 1592 1593
        STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
        assert(pQueryInfo->numOfTables == 1);

        // for projection query, need to try next vnode if current vnode is exhausted
        int32_t numOfVgroups = 0;  // TODO refactor
        if (pTableMetaInfo->pVgroupTables != NULL) {
S
Shengliang Guan 已提交
1594
          numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
H
Haojun Liao 已提交
1595 1596 1597 1598 1599
        } else {
          numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
        }

        if ((++pTableMetaInfo->vgroupIndex) < numOfVgroups) {
H
Haojun Liao 已提交
1600
          tscDebug("0x%"PRIx64" no result in current vnode anymore, try next vnode, vgIndex:%d", pSub->self,
H
Haojun Liao 已提交
1601 1602 1603 1604
                   pTableMetaInfo->vgroupIndex);
          pSub->cmd.command = TSDB_SQL_SELECT;
          pSub->fp = tscJoinQueryCallback;

1605
          tscBuildAndSendRequest(pSub, NULL);
H
Haojun Liao 已提交
1606 1607
          tryNextVnode = true;
        } else {
H
Haojun Liao 已提交
1608
          tscDebug("0x%"PRIx64" no result in current subquery anymore", pSub->self);
H
Haojun Liao 已提交
1609 1610 1611 1612 1613 1614 1615
        }
      }
    }

    if (tryNextVnode) {
      return;
    }
H
Haojun Liao 已提交
1616 1617 1618 1619

    pSql->res.completed = true;
    freeJoinSubqueryObj(pSql);

H
hjxilinx 已提交
1620 1621 1622
    if (pSql->res.code == TSDB_CODE_SUCCESS) {
      (*pSql->fp)(pSql->param, pSql, 0);
    } else {
H
Haojun Liao 已提交
1623
      tscAsyncResultOnError(pSql);
H
hjxilinx 已提交
1624
    }
1625

H
hjxilinx 已提交
1626 1627 1628 1629
    return;
  }

  // TODO multi-vnode retrieve for projection query with limitation has bugs, since the global limiation is not handled
H
Haojun Liao 已提交
1630
  // retrieve data from current vnode.
H
Haojun Liao 已提交
1631
  tscDebug("0x%"PRIx64" retrieve data from %d subqueries", pSql->self, numOfFetch);
H
Haojun Liao 已提交
1632
  SJoinSupporter* pSupporter = NULL;
1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644

  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
    SSqlObj* pSql1 = pSql->pSubs[i];
    if (pSql1 == NULL) {
      continue;
    }

    SSqlRes* pRes1 = &pSql1->res;
    if (pRes1->row >= pRes1->numOfRows) {
      subquerySetState(pSql1, &pSql->subState, i, 0);
    }
  }
H
Haojun Liao 已提交
1645

H
Haojun Liao 已提交
1646
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
1647 1648 1649 1650
    SSqlObj* pSql1 = pSql->pSubs[i];
    if (pSql1 == NULL) {
      continue;
    }
H
Haojun Liao 已提交
1651

H
hjxilinx 已提交
1652 1653 1654
    SSqlRes* pRes1 = &pSql1->res;
    SSqlCmd* pCmd1 = &pSql1->cmd;

1655
    pSupporter = (SJoinSupporter*)pSql1->param;
H
hjxilinx 已提交
1656 1657

    // wait for all subqueries completed
H
Haojun Liao 已提交
1658
    SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd1);
H
hjxilinx 已提交
1659 1660 1661 1662 1663
    assert(pRes1->numOfRows >= 0 && pQueryInfo->numOfTables == 1);

    STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
    
    if (pRes1->row >= pRes1->numOfRows) {
H
Haojun Liao 已提交
1664
      tscDebug("0x%"PRIx64" subquery:0x%"PRIx64" retrieve data from vnode, subquery:%d, vgroupIndex:%d", pSql->self, pSql1->self,
H
hjxilinx 已提交
1665
               pSupporter->subqueryIndex, pTableMetaInfo->vgroupIndex);
H
hjxilinx 已提交
1666 1667

      tscResetForNextRetrieve(pRes1);
H
Haojun Liao 已提交
1668
      pSql1->fp = joinRetrieveFinalResCallback;
H
hjxilinx 已提交
1669 1670 1671 1672 1673

      if (pCmd1->command < TSDB_SQL_LOCAL) {
        pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
      }

1674
      tscBuildAndSendRequest(pSql1, NULL);
H
hjxilinx 已提交
1675 1676 1677 1678 1679 1680 1681 1682 1683
    }
  }
}

// all subqueries return, set the result output index
void tscSetupOutputColumnIndex(SSqlObj* pSql) {
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;

H
Haojun Liao 已提交
1684
  // the column transfer support struct has been built
H
hjxilinx 已提交
1685
  if (pRes->pColumnIndex != NULL) {
H
Haojun Liao 已提交
1686
    return;
H
hjxilinx 已提交
1687 1688
  }

H
Haojun Liao 已提交
1689
  SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
H
hjxilinx 已提交
1690

H
Haojun Liao 已提交
1691
  int32_t numOfExprs = (int32_t)tscNumOfExprs(pQueryInfo);
H
Haojun Liao 已提交
1692
  pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * numOfExprs);
H
Haojun Liao 已提交
1693 1694 1695 1696
  if (pRes->pColumnIndex == NULL) {
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return;
  }
H
Haojun Liao 已提交
1697 1698

  for (int32_t i = 0; i < numOfExprs; ++i) {
H
Haojun Liao 已提交
1699
    SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
H
hjxilinx 已提交
1700 1701 1702 1703

    int32_t tableIndexOfSub = -1;
    for (int32_t j = 0; j < pQueryInfo->numOfTables; ++j) {
      STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, j);
H
Haojun Liao 已提交
1704
      if (pTableMetaInfo->pTableMeta->id.uid == pExpr->base.uid) {
H
hjxilinx 已提交
1705 1706 1707 1708 1709 1710 1711 1712
        tableIndexOfSub = j;
        break;
      }
    }

    assert(tableIndexOfSub >= 0 && tableIndexOfSub < pQueryInfo->numOfTables);
    
    SSqlCmd* pSubCmd = &pSql->pSubs[tableIndexOfSub]->cmd;
H
Haojun Liao 已提交
1713
    SQueryInfo* pSubQueryInfo = tscGetQueryInfo(pSubCmd);
H
hjxilinx 已提交
1714
    
H
Haojun Liao 已提交
1715 1716
    size_t numOfSubExpr = taosArrayGetSize(pSubQueryInfo->exprList);
    for (int32_t k = 0; k < numOfSubExpr; ++k) {
H
Haojun Liao 已提交
1717
      SExprInfo* pSubExpr = tscExprGet(pSubQueryInfo, k);
H
Haojun Liao 已提交
1718
      if (pExpr->base.functionId == pSubExpr->base.functionId && pExpr->base.colInfo.colId == pSubExpr->base.colInfo.colId) {
H
hjxilinx 已提交
1719 1720 1721 1722 1723
        pRes->pColumnIndex[i] = (SColumnIndex){.tableIndex = tableIndexOfSub, .columnIndex = k};
        break;
      }
    }
  }
H
Haojun Liao 已提交
1724 1725

  // restore the offset value for super table query in case of final result.
1726 1727
//  tscRestoreFuncForSTableQuery(pQueryInfo);
//  tscFieldInfoUpdateOffset(pQueryInfo);
H
hjxilinx 已提交
1728 1729 1730 1731
}

void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
  SSqlObj* pSql = (SSqlObj*)tres;
H
Haojun Liao 已提交
1732

1733
  SJoinSupporter* pSupporter = (SJoinSupporter*)param;
H
Haojun Liao 已提交
1734
  SSqlObj* pParentSql = pSupporter->pObj;
D
fix bug  
dapan1121 已提交
1735
  
H
hjxilinx 已提交
1736
  // There is only one subquery and table for each subquery.
H
Haojun Liao 已提交
1737
  SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
D
fix bug  
dapan1121 已提交
1738 1739
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);

H
Haojun Liao 已提交
1740
  assert(pQueryInfo->numOfTables == 1);
H
hjxilinx 已提交
1741

H
Haojun Liao 已提交
1742 1743
  // retrieve actual query results from vnode during the second stage join subquery
  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
1744
    tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, code, pParentSql->res.code);
D
fix bug  
dapan1121 已提交
1745 1746 1747
    if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
      return;
    }
D
fix bug  
dapan1121 已提交
1748

H
Haojun Liao 已提交
1749
    tscAsyncResultOnError(pParentSql);
H
hjxilinx 已提交
1750

H
Haojun Liao 已提交
1751 1752
    return;
  }
H
hjxilinx 已提交
1753

H
Haojun Liao 已提交
1754 1755 1756
  // TODO here retry is required, not directly returns to client
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(taos_errno(pSql) == code);
H
hjxilinx 已提交
1757

D
dapan1121 已提交
1758
    tscError("0x%"PRIx64" abort query, code:%s, global code:%s", pSql->self, tstrerror(code), tstrerror(pParentSql->res.code));
H
Haojun Liao 已提交
1759 1760
    pParentSql->res.code = code;

D
fix bug  
dapan1121 已提交
1761 1762 1763 1764
    if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
      return;
    }
    
H
Haojun Liao 已提交
1765
    tscAsyncResultOnError(pParentSql);
H
Haojun Liao 已提交
1766 1767 1768 1769 1770 1771 1772 1773

    return;
  }

  // retrieve <tid, tag> tuples from vnode
  if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) {
    pSql->fp = tidTagRetrieveCallback;
    pSql->cmd.command = TSDB_SQL_FETCH;
1774
    tscBuildAndSendRequest(pSql, NULL);
H
Haojun Liao 已提交
1775 1776 1777 1778 1779 1780 1781
    return;
  }

  // retrieve ts_comp info from vnode
  if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
    pSql->fp = tsCompRetrieveCallback;
    pSql->cmd.command = TSDB_SQL_FETCH;
1782
    tscBuildAndSendRequest(pSql, NULL);
H
Haojun Liao 已提交
1783 1784 1785
    return;
  }

1786 1787
  // In case of consequence query from other vnode, do not wait for other query response here.
  if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) {
D
fix bug  
dapan1121 已提交
1788
    if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
1789
      return;
1790
    }      
H
Haojun Liao 已提交
1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801
  }

  tscSetupOutputColumnIndex(pParentSql);

  /**
   * if the query is a continue query (vgroupIndex > 0 for projection query) for next vnode, do the retrieval of
   * data instead of returning to its invoker
   */
  if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
    pSql->fp = joinRetrieveFinalResCallback;  // continue retrieve data
    pSql->cmd.command = TSDB_SQL_FETCH;
1802
    
1803
    tscBuildAndSendRequest(pSql, NULL);
H
Haojun Liao 已提交
1804 1805 1806 1807
  } else {  // first retrieve from vnode during the secondary stage sub-query
    // set the command flag must be after the semaphore been correctly set.
    if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
      (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
H
hjxilinx 已提交
1808
    } else {
H
Haojun Liao 已提交
1809
      tscAsyncResultOnError(pParentSql);
H
hjxilinx 已提交
1810 1811 1812 1813 1814 1815 1816
    }
  }
}

/////////////////////////////////////////////////////////////////////////////////////////
static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code);

1817
static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
H
hjxilinx 已提交
1818

H
Haojun Liao 已提交
1819
int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) {
H
hjxilinx 已提交
1820
  SSqlCmd *   pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1821
  SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
H
hjxilinx 已提交
1822
  
H
Haojun Liao 已提交
1823
  pSql->res.qId = 0x1;
H
Haojun Liao 已提交
1824 1825
  assert(pSql->res.numOfRows == 0);

H
hjxilinx 已提交
1826
  if (pSql->pSubs == NULL) {
H
Haojun Liao 已提交
1827
    pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
H
hjxilinx 已提交
1828
    if (pSql->pSubs == NULL) {
1829
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
1830 1831 1832
    }
  }
  
1833
  SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL);
H
hjxilinx 已提交
1834
  if (pNew == NULL) {
1835
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
1836 1837
  }
  
1838
  pSql->pSubs[tableIndex] = pNew;
H
hjxilinx 已提交
1839 1840 1841 1842 1843
  
  if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
    addGroupInfoForSubquery(pSql, pNew, 0, tableIndex);
    
    // refactor as one method
H
Haojun Liao 已提交
1844
    SQueryInfo *pNewQueryInfo = tscGetQueryInfo(&pNew->cmd);
H
hjxilinx 已提交
1845
    assert(pNewQueryInfo != NULL);
H
Haojun Liao 已提交
1846

H
hjxilinx 已提交
1847 1848 1849 1850 1851 1852 1853
    pSupporter->colList = pNewQueryInfo->colList;
    pNewQueryInfo->colList = NULL;
    
    pSupporter->exprList = pNewQueryInfo->exprList;
    pNewQueryInfo->exprList = NULL;
    
    pSupporter->fieldsInfo = pNewQueryInfo->fieldsInfo;
H
hjxilinx 已提交
1854
  
H
hjxilinx 已提交
1855 1856
    // this data needs to be transfer to support struct
    memset(&pNewQueryInfo->fieldsInfo, 0, sizeof(SFieldInfo));
H
Haojun Liao 已提交
1857 1858 1859
    if (tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond) != 0) {
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
1860

H
Haojun Liao 已提交
1861
    pSupporter->groupInfo = pNewQueryInfo->groupbyExpr;
H
Haojun Liao 已提交
1862
    memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SGroupbyExpr));
H
Haojun Liao 已提交
1863

H
hjxilinx 已提交
1864
    pNew->cmd.numOfCols = 0;
1865
    pNewQueryInfo->interval.interval = 0;
H
Haojun Liao 已提交
1866 1867 1868 1869
    pSupporter->limit = pNewQueryInfo->limit;

    pNewQueryInfo->limit.limit = -1;
    pNewQueryInfo->limit.offset = 0;
H
Haojun Liao 已提交
1870
    taosArrayDestroy(pNewQueryInfo->pUpstream);
H
Haojun Liao 已提交
1871

D
dapan1121 已提交
1872
    pNewQueryInfo->order.orderColId = INT32_MIN;
H
Haojun Liao 已提交
1873

H
hjxilinx 已提交
1874
    // backup the data and clear it in the sqlcmd object
H
Haojun Liao 已提交
1875
    memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SGroupbyExpr));
1876 1877

    STimeWindow range = pNewQueryInfo->window;
H
hjxilinx 已提交
1878
    tscInitQueryInfo(pNewQueryInfo);
1879 1880

    pNewQueryInfo->window = range;
H
hjxilinx 已提交
1881 1882
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
    
weixin_48148422's avatar
weixin_48148422 已提交
1883
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // return the tableId & tag
H
Haojun Liao 已提交
1884
      SColumnIndex colIndex = {0};
H
Haojun Liao 已提交
1885 1886 1887 1888

      STagCond* pTagCond = &pSupporter->tagCond;
      assert(pTagCond->joinInfo.hasJoin);

1889
      int32_t tagColId = tscGetJoinTagColIdByUid(pTagCond, pTableMetaInfo->pTableMeta->id.uid);
H
Haojun Liao 已提交
1890 1891 1892
      SSchema* s = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);

      colIndex.columnIndex = tscGetTagColIndexById(pTableMetaInfo->pTableMeta, tagColId);
1893

H
Haojun Liao 已提交
1894
      int16_t bytes = 0;
H
Haojun Liao 已提交
1895
      int16_t type  = 0;
H
Haojun Liao 已提交
1896 1897
      int32_t inter = 0;

H
Haojun Liao 已提交
1898
      getResultDataInfo(s->type, s->bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0);
H
Haojun Liao 已提交
1899

S
Shengliang Guan 已提交
1900
      SSchema s1 = {.colId = s->colId, .type = (uint8_t)type, .bytes = bytes};
H
Haojun Liao 已提交
1901
      pSupporter->tagSize = s1.bytes;
1902
      assert(isValidDataType(s1.type) && s1.bytes > 0);
H
Haojun Liao 已提交
1903

1904 1905
      // set get tags query type
      TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
H
Haojun Liao 已提交
1906
      tscAddFuncInSelectClause(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &colIndex, &s1, TSDB_COL_TAG, getNewResColId(pCmd));
1907
      size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
1908
  
1909
      tscDebug(
1910
          "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to tid_tag query to retrieve (tableId, tags), "
S
TD-1057  
Shengliang Guan 已提交
1911
          "exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, tagIndex:%d, name:%s",
H
Haojun Liao 已提交
1912
          pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscNumOfExprs(pNewQueryInfo),
1913
          numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, colIndex.columnIndex, tNameGetTableName(&pNewQueryInfo->pTableMetaInfo[0]->name));
1914 1915
    } else {
      SSchema      colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
H
Haojun Liao 已提交
1916
      SColumnIndex colIndex = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
H
Haojun Liao 已提交
1917
      tscAddFuncInSelectClause(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &colIndex, &colSchema, TSDB_COL_NORMAL, getNewResColId(pCmd));
1918 1919

      // set the tags value for ts_comp function
H
Haojun Liao 已提交
1920
      SExprInfo *pExpr = tscExprGet(pNewQueryInfo, 0);
1921

H
Haojun Liao 已提交
1922
      if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
1923
        int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
H
Haojun Liao 已提交
1924 1925
        pExpr->base.param->i64 = tagColId;
        pExpr->base.numOfParams = 1;
H
Haojun Liao 已提交
1926
      }
1927 1928 1929 1930 1931 1932 1933 1934

      // add the filter tag column
      if (pSupporter->colList != NULL) {
        size_t s = taosArrayGetSize(pSupporter->colList);

        for (int32_t i = 0; i < s; ++i) {
          SColumn *pCol = taosArrayGetP(pSupporter->colList, i);

1935
          if (pCol->info.flist.numOfFilters > 0) {  // copy to the pNew->cmd.colList if it is filtered.
1936 1937 1938 1939 1940 1941 1942 1943
            SColumn *p = tscColumnClone(pCol);
            taosArrayPush(pNewQueryInfo->colList, &p);
          }
        }
      }

      size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);

1944
      tscDebug(
B
Bomin Zhang 已提交
1945
          "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%u, transfer to ts_comp query to retrieve timestamps, "
S
TD-1057  
Shengliang Guan 已提交
1946
          "exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s",
H
Haojun Liao 已提交
1947
          pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscNumOfExprs(pNewQueryInfo),
1948
          numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pNewQueryInfo->pTableMetaInfo[0]->name));
1949
    }
H
hjxilinx 已提交
1950
  } else {
H
hjxilinx 已提交
1951
    assert(0);
H
Haojun Liao 已提交
1952
    SQueryInfo *pNewQueryInfo = tscGetQueryInfo(&pNew->cmd);
H
hjxilinx 已提交
1953 1954 1955
    pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
  }

H
Haojun Liao 已提交
1956
  return TSDB_CODE_SUCCESS;
H
hjxilinx 已提交
1957 1958
}

H
Haojun Liao 已提交
1959
void tscHandleMasterJoinQuery(SSqlObj* pSql) {
H
hjxilinx 已提交
1960
  SSqlCmd* pCmd = &pSql->cmd;
H
Haojun Liao 已提交
1961 1962
  SSqlRes* pRes = &pSql->res;

H
Haojun Liao 已提交
1963
  SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
H
hjxilinx 已提交
1964
  assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0);
1965

H
Haojun Liao 已提交
1966
  int32_t code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1967
  pSql->subState.numOfSub = pQueryInfo->numOfTables;
H
Haojun Liao 已提交
1968

1969 1970 1971 1972 1973 1974
  if (pSql->subState.states == NULL) {
    pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states));
    if (pSql->subState.states == NULL) {
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      goto _error;
    }
D
fix bug  
dapan1121 已提交
1975 1976
    
    pthread_mutex_init(&pSql->subState.mutex, NULL);
1977
  }
D
dapan1121 已提交
1978 1979

  memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub);
1980
  tscDebug("0x%"PRIx64" reset all sub states to 0, start subquery, total:%d", pSql->self, pQueryInfo->numOfTables);
H
Haojun Liao 已提交
1981

H
Haojun Liao 已提交
1982
  for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
H
Haojun Liao 已提交
1983
    SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i);
H
hjxilinx 已提交
1984
    if (pSupporter == NULL) {  // failed to create support struct, abort current query
D
dapan1121 已提交
1985
      tscError("0x%"PRIx64" tableIndex:%d, failed to allocate join support object, abort further query", pSql->self, i);
H
Haojun Liao 已提交
1986 1987
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      goto _error;
H
hjxilinx 已提交
1988 1989
    }
    
H
Haojun Liao 已提交
1990
    code = tscCreateJoinSubquery(pSql, i, pSupporter);
H
hjxilinx 已提交
1991 1992
    if (code != TSDB_CODE_SUCCESS) {  // failed to create subquery object, quit query
      tscDestroyJoinSupporter(pSupporter);
H
Haojun Liao 已提交
1993 1994 1995 1996
      goto _error;
    }

    SSqlObj* pSub = pSql->pSubs[i];
H
Haojun Liao 已提交
1997
    STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSub->cmd, 0);
H
Haojun Liao 已提交
1998
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) && (pTableMetaInfo->vgroupList->numOfVgroups == 0)) {
H
Haojun Liao 已提交
1999
      pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
H
hjxilinx 已提交
2000 2001 2002
      break;
    }
  }
H
Haojun Liao 已提交
2003

H
Haojun Liao 已提交
2004
  if (pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) {  // at least one subquery is empty, do nothing and return
H
Haojun Liao 已提交
2005 2006 2007
    freeJoinSubqueryObj(pSql);
    (*pSql->fp)(pSql->param, pSql, 0);
  } else {
D
fix bug  
dapan1121 已提交
2008
    int fail = 0;
H
Haojun Liao 已提交
2009
    for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
Haojun Liao 已提交
2010
      SSqlObj* pSub = pSql->pSubs[i];
D
fix bug  
dapan1121 已提交
2011 2012 2013 2014 2015
      if (fail) {
        (*pSub->fp)(pSub->param, pSub, 0);
        continue;
      }
      
2016
      if ((code = tscBuildAndSendRequest(pSub, NULL)) != TSDB_CODE_SUCCESS) {
D
fix bug  
dapan1121 已提交
2017 2018
        pRes->code = code;
        (*pSub->fp)(pSub->param, pSub, 0);
D
fix bug  
dapan1121 已提交
2019
        fail = 1;
H
Haojun Liao 已提交
2020 2021 2022
      }
    }

D
fix bug  
dapan1121 已提交
2023 2024 2025 2026
    if(fail) {
      return;
    }

H
Haojun Liao 已提交
2027 2028 2029 2030 2031 2032 2033
    pSql->cmd.command = TSDB_SQL_TABLE_JOIN_RETRIEVE;
  }

  return;

  _error:
  pRes->code = code;
H
Haojun Liao 已提交
2034
  tscAsyncResultOnError(pSql);
H
hjxilinx 已提交
2035 2036
}

H
Haojun Liao 已提交
2037 2038
static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) {
  assert(numOfSubs <= pSql->subState.numOfSub && numOfSubs >= 0);
H
hjxilinx 已提交
2039 2040 2041 2042 2043 2044 2045
  
  for(int32_t i = 0; i < numOfSubs; ++i) {
    SSqlObj* pSub = pSql->pSubs[i];
    assert(pSub != NULL);
    
    SRetrieveSupport* pSupport = pSub->param;
    
S
TD-1848  
Shengliang Guan 已提交
2046 2047
    tfree(pSupport->localBuffer);
    tfree(pSupport);
H
hjxilinx 已提交
2048
    
2049
    taos_free_result(pSub);
H
hjxilinx 已提交
2050 2051 2052
  }
}

D
TD-2516  
dapan1121 已提交
2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069
void tscLockByThread(int64_t *lockedBy) {
  int64_t tid = taosGetSelfPthreadId();
  int     i = 0;
  while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) {
    if (++i % 100 == 0) {
      sched_yield();
    }
  }
}

void tscUnlockByThread(int64_t *lockedBy) {
  int64_t tid = taosGetSelfPthreadId();
  if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) {
    assert(false);
  }
}

2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081
typedef struct SFirstRoundQuerySup {
  SSqlObj  *pParent;
  int32_t   numOfRows;
  SArray   *pColsInfo;
  int32_t   tagLen;
  STColumn *pTagCols;
  SArray   *pResult;   // SArray<SInterResult>
  int64_t   interval;
  char*     buf;
  int32_t   bufLen;
} SFirstRoundQuerySup;

H
Haojun Liao 已提交
2082
void doAppendData(SInterResult* pInterResult, TAOS_ROW row, int32_t numOfCols, SQueryInfo* pQueryInfo) {
2083 2084
  TSKEY key = INT64_MIN;
  for(int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
2085
    SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
H
Haojun Liao 已提交
2086
    if (TSDB_COL_IS_TAG(pExpr->base.colInfo.flag) || pExpr->base.functionId == TSDB_FUNC_PRJ) {
2087 2088 2089
      continue;
    }

H
Haojun Liao 已提交
2090
    if (pExpr->base.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
2091 2092 2093
      key = *(TSKEY*) row[i];
      continue;
    }
D
TD-2516  
dapan1121 已提交
2094

2095 2096 2097 2098 2099 2100 2101
    double v = 0;
    if (row[i] != NULL) {
      v = *(double*) row[i];
    } else {
      SET_DOUBLE_NULL(&v);
    }

H
Haojun Liao 已提交
2102
    int32_t id = pExpr->base.colInfo.colId;
H
Haojun Liao 已提交
2103
    int32_t numOfQueriedCols = (int32_t) taosArrayGetSize(pInterResult->pResult);
2104 2105 2106 2107 2108 2109 2110 2111 2112 2113

    SArray* p = NULL;
    for(int32_t j = 0; j < numOfQueriedCols; ++j) {
      SStddevInterResult* pColRes = taosArrayGet(pInterResult->pResult, j);
      if (pColRes->colId == id) {
        p = pColRes->pResult;
        break;
      }
    }

D
fix bug  
dapan1121 已提交
2114 2115 2116 2117 2118 2119 2120
    if (p && taosArrayGetSize(p) > 0) {
      SResPair *l = taosArrayGetLast(p);
      if (l->key == key && key == INT64_MIN) {
        continue;
      }
    }

2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132
    //append a new column
    if (p == NULL) {
      SStddevInterResult t = {.colId = id, .pResult = taosArrayInit(10, sizeof(SResPair)),};
      taosArrayPush(pInterResult->pResult, &t);
      p = t.pResult;
    }

    SResPair pair = {.avg = v, .key = key};
    taosArrayPush(p, &pair);
  }
}

2133 2134 2135 2136 2137 2138
static void destroySup(SFirstRoundQuerySup* pSup) {
  taosArrayDestroyEx(pSup->pResult, freeInterResult);
  taosArrayDestroy(pSup->pColsInfo);
  tfree(pSup);
}

2139 2140 2141 2142 2143 2144
void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
  SSqlObj* pSql = (SSqlObj*)tres;
  SSqlRes* pRes = &pSql->res;

  SFirstRoundQuerySup* pSup = param;

2145
  SSqlObj*     pParent = pSup->pParent;
H
Haojun Liao 已提交
2146
  SQueryInfo*  pQueryInfo = tscGetQueryInfo(&pSql->cmd);
2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157

  int32_t code = taos_errno(pSql);
  if (code != TSDB_CODE_SUCCESS) {
    destroySup(pSup);
    taos_free_result(pSql);
    pParent->res.code = code;
    tscAsyncResultOnError(pParent);
    return;
  }

  if (numOfRows > 0) {  // the number is not correct for group by column in super table query
2158 2159 2160 2161 2162 2163 2164 2165 2166
    TAOS_ROW row = NULL;
    int32_t  numOfCols = taos_field_count(tres);

    if (pSup->tagLen == 0) {  // no tags, all rows belong to one group
      SInterResult interResult = {.tags = NULL, .pResult = taosArrayInit(4, sizeof(SStddevInterResult))};
      taosArrayPush(pSup->pResult, &interResult);

      while ((row = taos_fetch_row(tres)) != NULL) {
        doAppendData(&interResult, row, numOfCols, pQueryInfo);
2167
        pSup->numOfRows += 1;
2168 2169 2170 2171 2172 2173 2174 2175 2176 2177
      }
    } else {  // tagLen > 0
      char* p = calloc(1, pSup->tagLen);

      while ((row = taos_fetch_row(tres)) != NULL) {
        int32_t* length = taos_fetch_lengths(tres);
        memset(p, 0, pSup->tagLen);

        int32_t offset = 0;
        for (int32_t i = 0; i < numOfCols && offset < pSup->tagLen; ++i) {
H
Haojun Liao 已提交
2178
          SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
2179 2180

          // tag or group by column
H
Haojun Liao 已提交
2181
          if (TSDB_COL_IS_TAG(pExpr->base.colInfo.flag) || pExpr->base.functionId == TSDB_FUNC_PRJ) {
D
fix bug  
dapan1121 已提交
2182
            if (row[i] == NULL) {
H
Haojun Liao 已提交
2183
              setNull(p + offset, pExpr->base.resType, pExpr->base.resBytes);
D
fix bug  
dapan1121 已提交
2184 2185 2186
            } else {
              memcpy(p + offset, row[i], length[i]);
            }
H
Haojun Liao 已提交
2187
            offset += pExpr->base.resBytes;
2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213
          }
        }

        assert(offset == pSup->tagLen);
        size_t size = taosArrayGetSize(pSup->pResult);

        if (size > 0) {
          SInterResult* pInterResult = taosArrayGetLast(pSup->pResult);
          if (memcmp(pInterResult->tags, p, pSup->tagLen) == 0) {  // belongs to the same group
            doAppendData(pInterResult, row, numOfCols, pQueryInfo);
          } else {
            char* tags = malloc( pSup->tagLen);
            memcpy(tags, p, pSup->tagLen);

            SInterResult interResult = {.tags = tags, .pResult = taosArrayInit(4, sizeof(SStddevInterResult))};
            taosArrayPush(pSup->pResult, &interResult);
            doAppendData(&interResult, row, numOfCols, pQueryInfo);
          }
        } else {
          char* tags = malloc(pSup->tagLen);
          memcpy(tags, p, pSup->tagLen);

          SInterResult interResult = {.tags = tags, .pResult = taosArrayInit(4, sizeof(SStddevInterResult))};
          taosArrayPush(pSup->pResult, &interResult);
          doAppendData(&interResult, row, numOfCols, pQueryInfo);
        }
2214 2215

        pSup->numOfRows += 1;
2216 2217 2218 2219 2220 2221
      }

      tfree(p);
    }
  }

2222
  if (!pRes->completed && numOfRows > 0) {
2223 2224 2225 2226 2227 2228
    taos_fetch_rows_a(tres, tscFirstRoundRetrieveCallback, param);
    return;
  }

  // set the parameters for the second round query process
  SSqlCmd    *pPCmd   = &pParent->cmd;
H
Haojun Liao 已提交
2229
  SQueryInfo *pQueryInfo1 = tscGetQueryInfo(pPCmd);
D
fix bug  
dapan1121 已提交
2230 2231
  int32_t resRows = pSup->numOfRows;
  
2232 2233 2234 2235
  if (pSup->numOfRows > 0) {
    SBufferWriter bw = tbufInitWriter(NULL, false);
    interResToBinary(&bw, pSup->pResult, pSup->tagLen);

H
Haojun Liao 已提交
2236
    pQueryInfo1->bufLen = (int32_t) tbufTell(&bw);
2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248
    pQueryInfo1->buf = tbufGetData(&bw, true);

    // set the serialized binary string as the parameter of arithmetic expression
    tbufCloseWriter(&bw);
  }

  taosArrayDestroyEx(pSup->pResult, freeInterResult);
  taosArrayDestroy(pSup->pColsInfo);
  tfree(pSup);

  taos_free_result(pSql);

D
fix bug  
dapan1121 已提交
2249 2250 2251 2252 2253 2254
  if (resRows == 0) {
    pParent->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
    (*pParent->fp)(pParent->param, pParent, 0);
    return;
  }

2255
  pQueryInfo1->round = 1;
H
Haojun Liao 已提交
2256
  executeQuery(pParent, pQueryInfo1);
2257 2258 2259
}

void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) {
2260 2261 2262 2263 2264
  SFirstRoundQuerySup* pSup = (SFirstRoundQuerySup*) param;

  SSqlObj* pSql = (SSqlObj*) tres;
  int32_t c = taos_errno(pSql);

2265
  if (c != TSDB_CODE_SUCCESS) {
2266 2267 2268 2269 2270 2271 2272
    SSqlObj* parent = pSup->pParent;

    destroySup(pSup);
    taos_free_result(pSql);
    parent->res.code = code;
    tscAsyncResultOnError(parent);
    return;
2273 2274 2275 2276 2277 2278
  }

  taos_fetch_rows_a(tres, tscFirstRoundRetrieveCallback, param);
}

int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
H
Haojun Liao 已提交
2279 2280
  SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
  STableMetaInfo* pTableMetaInfo1 = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0);
2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291

  SFirstRoundQuerySup *pSup = calloc(1, sizeof(SFirstRoundQuerySup));

  pSup->pParent  = pSql;
  pSup->interval = pQueryInfo->interval.interval;
  pSup->pResult  = taosArrayInit(6, sizeof(SStddevInterResult));
  pSup->pColsInfo = taosArrayInit(6, sizeof(int16_t)); // result column id

  SSqlObj *pNew = createSubqueryObj(pSql, 0, tscFirstRoundCallback, pSup, TSDB_SQL_SELECT, NULL);
  SSqlCmd *pCmd = &pNew->cmd;

H
Haojun Liao 已提交
2292
  SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pCmd);
2293 2294
  assert(pQueryInfo->numOfTables == 1);

2295 2296 2297 2298 2299 2300
  SArray* pColList = pNewQueryInfo->colList;
  pNewQueryInfo->colList = NULL;

  tscClearSubqueryInfo(pCmd);
  tscFreeSqlResult(pSql);

2301 2302 2303
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);

  tscInitQueryInfo(pNewQueryInfo);
2304 2305

  // add the group cond
2306 2307 2308 2309 2310
  pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr;
  if (pQueryInfo->groupbyExpr.columnInfo != NULL) {
    pNewQueryInfo->groupbyExpr.columnInfo = taosArrayDup(pQueryInfo->groupbyExpr.columnInfo);
    if (pNewQueryInfo->groupbyExpr.columnInfo == NULL) {
      terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
2311
      goto _error;
2312 2313 2314
    }
  }

2315
  // add the tag filter cond
2316 2317
  if (tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond) != 0) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
2318
    goto _error;
2319 2320
  }

2321
  pNewQueryInfo->window   = pQueryInfo->window;
2322
  pNewQueryInfo->interval = pQueryInfo->interval;
2323
  pNewQueryInfo->sessionWindow = pQueryInfo->sessionWindow;
2324 2325 2326 2327

  pCmd->command = TSDB_SQL_SELECT;
  pNew->fp = tscFirstRoundCallback;

H
Haojun Liao 已提交
2328
  int32_t numOfExprs = (int32_t) tscNumOfExprs(pQueryInfo);
2329 2330 2331

  int32_t index = 0;
  for(int32_t i = 0; i < numOfExprs; ++i) {
H
Haojun Liao 已提交
2332
    SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
H
Haojun Liao 已提交
2333 2334
    if (pExpr->base.functionId == TSDB_FUNC_TS && pQueryInfo->interval.interval > 0) {
      taosArrayPush(pSup->pColsInfo, &pExpr->base.resColId);
2335 2336

      SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
H
Haojun Liao 已提交
2337
      SSchema* schema = tscGetColumnSchemaById(pTableMetaInfo1->pTableMeta, pExpr->base.colInfo.colId);
2338

H
Haojun Liao 已提交
2339
      SExprInfo* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_TS, &colIndex, schema, TSDB_COL_NORMAL, getNewResColId(pCmd));
H
Haojun Liao 已提交
2340 2341 2342
      p->base.resColId = pExpr->base.resColId;  // update the result column id
    } else if (pExpr->base.functionId == TSDB_FUNC_STDDEV_DST) {
      taosArrayPush(pSup->pColsInfo, &pExpr->base.resColId);
2343

H
Haojun Liao 已提交
2344
      SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = pExpr->base.colInfo.colIndex};
2345
      SSchema schema = {.type = TSDB_DATA_TYPE_DOUBLE, .bytes = sizeof(double)};
H
Haojun Liao 已提交
2346
      tstrncpy(schema.name, pExpr->base.aliasName, tListLen(schema.name));
2347

H
Haojun Liao 已提交
2348
      SExprInfo* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_AVG, &colIndex, &schema, TSDB_COL_NORMAL, getNewResColId(pCmd));
H
Haojun Liao 已提交
2349 2350 2351 2352
      p->base.resColId = pExpr->base.resColId;  // update the result column id
    } else if (pExpr->base.functionId == TSDB_FUNC_TAG) {
      pSup->tagLen += pExpr->base.resBytes;
      SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = pExpr->base.colInfo.colIndex};
2353 2354

      SSchema* schema = NULL;
H
Haojun Liao 已提交
2355 2356
      if (pExpr->base.colInfo.colId != TSDB_TBNAME_COLUMN_INDEX) {
        schema = tscGetColumnSchemaById(pTableMetaInfo1->pTableMeta, pExpr->base.colInfo.colId);
2357 2358 2359 2360
      } else {
        schema = tGetTbnameColumnSchema();
      }

H
Haojun Liao 已提交
2361
      SExprInfo* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_TAG, &colIndex, schema, TSDB_COL_TAG, getNewResColId(pCmd));
H
Haojun Liao 已提交
2362 2363
      p->base.resColId = pExpr->base.resColId;
    } else if (pExpr->base.functionId == TSDB_FUNC_PRJ) {
H
Haojun Liao 已提交
2364
      int32_t num = (int32_t) taosArrayGetSize(pNewQueryInfo->groupbyExpr.columnInfo);
2365 2366
      for(int32_t k = 0; k < num; ++k) {
        SColIndex* pIndex = taosArrayGet(pNewQueryInfo->groupbyExpr.columnInfo, k);
H
Haojun Liao 已提交
2367 2368 2369
        if (pExpr->base.colInfo.colId == pIndex->colId) {
          pSup->tagLen += pExpr->base.resBytes;
          taosArrayPush(pSup->pColsInfo, &pExpr->base.resColId);
2370 2371

          SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = pIndex->colIndex};
H
Haojun Liao 已提交
2372
          SSchema* schema = tscGetColumnSchemaById(pTableMetaInfo1->pTableMeta, pExpr->base.colInfo.colId);
2373 2374

          //doLimitOutputNormalColOfGroupby
H
Haojun Liao 已提交
2375
          SExprInfo* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_PRJ, &colIndex, schema, TSDB_COL_NORMAL, getNewResColId(pCmd));
H
Haojun Liao 已提交
2376 2377 2378 2379
          p->base.numOfParams = 1;
          p->base.param[0].i64 = 1;
          p->base.param[0].nType = TSDB_DATA_TYPE_INT;
          p->base.resColId = pExpr->base.resColId;  // update the result column id
2380 2381
        }
      }
2382 2383 2384
    }
  }

2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399
  // add the normal column filter cond
  if (pColList != NULL) {
    size_t s = taosArrayGetSize(pColList);
    for (int32_t i = 0; i < s; ++i) {
      SColumn *pCol = taosArrayGetP(pColList, i);

      if (pCol->info.flist.numOfFilters > 0) {  // copy to the pNew->cmd.colList if it is filtered.
        SColumn *p = tscColumnClone(pCol);
        taosArrayPush(pNewQueryInfo->colList, &p);
      }
    }

    tscColumnListDestroy(pColList);
  }

H
Haojun Liao 已提交
2400
  tscInsertPrimaryTsSourceColumn(pNewQueryInfo, pTableMetaInfo->pTableMeta->id.uid);
2401 2402 2403
  tscTansformFuncForSTableQuery(pNewQueryInfo);

  tscDebug(
H
Haojun Liao 已提交
2404
      "0x%"PRIx64" first round subquery:0x%"PRIx64" tableIndex:%d, vgroupIndex:%d, numOfVgroups:%d, type:%d, query to retrieve timestamps, "
2405
      "numOfExpr:%" PRIzu ", colList:%d, numOfOutputFields:%d, name:%s",
H
Haojun Liao 已提交
2406
      pSql->self, pNew->self, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pNewQueryInfo->type,
H
Haojun Liao 已提交
2407
      tscNumOfExprs(pNewQueryInfo), index+1, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name));
2408 2409 2410

  tscHandleMasterSTableQuery(pNew);
  return TSDB_CODE_SUCCESS;
2411 2412 2413 2414 2415 2416 2417

  _error:
  destroySup(pSup);
  taos_free_result(pNew);
  pSql->res.code = terrno;
  tscAsyncResultOnError(pSql);
  return terrno;
2418
}
D
TD-2516  
dapan1121 已提交
2419

H
hjxilinx 已提交
2420 2421 2422 2423 2424
int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
  
  // pRes->code check only serves in launching metric sub-queries
2425
  if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
2426
    pCmd->command = TSDB_SQL_RETRIEVE_GLOBALMERGE;  // enable the abort of kill super table function.
H
hjxilinx 已提交
2427 2428 2429
    return pRes->code;
  }
  
2430
  tExtMemBuffer   **pMemoryBuf = NULL;
H
Haojun Liao 已提交
2431 2432
  tOrderDescriptor *pDesc  = NULL;

H
Haojun Liao 已提交
2433
  pRes->qId = 0x1;  // hack the qhandle check
H
hjxilinx 已提交
2434
  
H
Haojun Liao 已提交
2435
  const uint32_t nBufferSize = (1u << 16u);  // 64KB
H
hjxilinx 已提交
2436
  
H
Haojun Liao 已提交
2437
  SQueryInfo     *pQueryInfo = tscGetQueryInfo(pCmd);
H
hjxilinx 已提交
2438
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
Haojun Liao 已提交
2439 2440
  SSubqueryState *pState = &pSql->subState;

H
Haojun Liao 已提交
2441 2442 2443 2444
  pState->numOfSub = 0;
  if (pTableMetaInfo->pVgroupTables == NULL) {
    pState->numOfSub = pTableMetaInfo->vgroupList->numOfVgroups;
  } else {
S
Shengliang Guan 已提交
2445
    pState->numOfSub = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
H
Haojun Liao 已提交
2446 2447
  }

H
Haojun Liao 已提交
2448
  assert(pState->numOfSub > 0);
H
hjxilinx 已提交
2449
  
H
Haojun Liao 已提交
2450
  int32_t ret = tscCreateGlobalMergerEnv(pQueryInfo, &pMemoryBuf, pSql->subState.numOfSub, &pDesc, nBufferSize, pSql->self);
H
hjxilinx 已提交
2451
  if (ret != 0) {
2452
    pRes->code = ret;
H
Haojun Liao 已提交
2453
    tscAsyncResultOnError(pSql);
S
TD-1848  
Shengliang Guan 已提交
2454
    tfree(pMemoryBuf);
H
hjxilinx 已提交
2455
    return ret;
H
hjxilinx 已提交
2456
  }
2457

H
Haojun Liao 已提交
2458
  tscDebug("0x%"PRIx64" retrieved query data from %d vnode(s)", pSql->self, pState->numOfSub);
2459
  pSql->pSubs = calloc(pState->numOfSub, POINTER_BYTES);
H
Haojun Liao 已提交
2460
  if (pSql->pSubs == NULL) {
S
TD-1848  
Shengliang Guan 已提交
2461
    tfree(pSql->pSubs);
H
Haojun Liao 已提交
2462
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
Haojun Liao 已提交
2463
    tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc,pState->numOfSub);
H
Haojun Liao 已提交
2464

H
Haojun Liao 已提交
2465
    tscAsyncResultOnError(pSql);
H
Haojun Liao 已提交
2466 2467 2468
    return ret;
  }

2469 2470 2471 2472 2473 2474 2475 2476
  if (pState->states == NULL) {
    pState->states = calloc(pState->numOfSub, sizeof(*pState->states));
    if (pState->states == NULL) {
      pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      tscAsyncResultOnError(pSql);
      tfree(pMemoryBuf);
      return ret;
    }
D
fix bug  
dapan1121 已提交
2477 2478

    pthread_mutex_init(&pState->mutex, NULL);
2479 2480 2481
  }

  memset(pState->states, 0, sizeof(*pState->states) * pState->numOfSub);
H
Haojun Liao 已提交
2482
  tscDebug("0x%"PRIx64" reset all sub states to 0", pSql->self);
2483
  
H
hjxilinx 已提交
2484 2485 2486
  pRes->code = TSDB_CODE_SUCCESS;
  
  int32_t i = 0;
H
Haojun Liao 已提交
2487
  for (; i < pState->numOfSub; ++i) {
H
hjxilinx 已提交
2488 2489
    SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport));
    if (trs == NULL) {
D
dapan1121 已提交
2490
      tscError("0x%"PRIx64" failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno));
H
hjxilinx 已提交
2491 2492 2493 2494 2495
      break;
    }
    
    trs->pExtMemBuffer = pMemoryBuf;
    trs->pOrderDescriptor = pDesc;
H
Haojun Liao 已提交
2496

H
hjxilinx 已提交
2497 2498
    trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage));
    if (trs->localBuffer == NULL) {
D
dapan1121 已提交
2499
      tscError("0x%"PRIx64" failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno));
S
TD-1848  
Shengliang Guan 已提交
2500
      tfree(trs);
H
hjxilinx 已提交
2501 2502 2503
      break;
    }
    
H
Haojun Liao 已提交
2504 2505
    trs->subqueryIndex  = i;
    trs->pParentSql     = pSql;
H
Haojun Liao 已提交
2506

2507
    SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL);
H
hjxilinx 已提交
2508
    if (pNew == NULL) {
D
dapan1121 已提交
2509
      tscError("0x%"PRIx64" failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno));
S
TD-1848  
Shengliang Guan 已提交
2510 2511
      tfree(trs->localBuffer);
      tfree(trs);
H
hjxilinx 已提交
2512 2513 2514 2515 2516
      break;
    }
    
    // todo handle multi-vnode situation
    if (pQueryInfo->tsBuf) {
H
Haojun Liao 已提交
2517
      SQueryInfo *pNewQueryInfo = tscGetQueryInfo(&pNew->cmd);
H
hjxilinx 已提交
2518
      pNewQueryInfo->tsBuf = tsBufClone(pQueryInfo->tsBuf);
H
Haojun Liao 已提交
2519
      assert(pNewQueryInfo->tsBuf != NULL);
H
hjxilinx 已提交
2520 2521
    }
    
2522 2523
    tscDebug("0x%"PRIx64" sub:0x%"PRIx64" create subquery success. orderOfSub:%d", pSql->self, pNew->self,
        trs->subqueryIndex);
H
hjxilinx 已提交
2524 2525
  }
  
H
Haojun Liao 已提交
2526
  if (i < pState->numOfSub) {
2527
    tscError("0x%"PRIx64" failed to prepare subquery structure and launch subqueries", pSql->self);
2528
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
2529
    
H
Haojun Liao 已提交
2530
    tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc, pState->numOfSub);
H
Haojun Liao 已提交
2531
    doCleanupSubqueries(pSql, i);
H
hjxilinx 已提交
2532 2533 2534
    return pRes->code;   // free all allocated resource
  }
  
2535
  if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
H
Haojun Liao 已提交
2536
    tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc, pState->numOfSub);
H
Haojun Liao 已提交
2537
    doCleanupSubqueries(pSql, i);
H
hjxilinx 已提交
2538 2539 2540
    return pRes->code;
  }
  
H
Haojun Liao 已提交
2541
  for(int32_t j = 0; j < pState->numOfSub; ++j) {
H
hjxilinx 已提交
2542 2543 2544
    SSqlObj* pSub = pSql->pSubs[j];
    SRetrieveSupport* pSupport = pSub->param;
    
H
Haojun Liao 已提交
2545
    tscDebug("0x%"PRIx64" sub:%p launch subquery, orderOfSub:%d.", pSql->self, pSub, pSupport->subqueryIndex);
2546
    tscBuildAndSendRequest(pSub, NULL);
H
hjxilinx 已提交
2547
  }
H
Haojun Liao 已提交
2548

H
hjxilinx 已提交
2549 2550 2551
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
2552 2553
static void tscFreeRetrieveSup(SSqlObj *pSql) {
  SRetrieveSupport *trsupport = pSql->param;
2554

H
Haojun Liao 已提交
2555 2556
  void* p = atomic_val_compare_exchange_ptr(&pSql->param, trsupport, 0);
  if (p == NULL) {
H
Haojun Liao 已提交
2557
    tscDebug("0x%"PRIx64" retrieve supp already released", pSql->self);
H
Haojun Liao 已提交
2558 2559 2560
    return;
  }

H
Haojun Liao 已提交
2561
  tscDebug("0x%"PRIx64" start to free subquery supp obj:%p", pSql->self, trsupport);
S
TD-1848  
Shengliang Guan 已提交
2562 2563
  tfree(trsupport->localBuffer);
  tfree(trsupport);
H
hjxilinx 已提交
2564 2565
}

2566
static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows);
2567
static void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows);
H
hjxilinx 已提交
2568

H
Haojun Liao 已提交
2569
static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t code) {
H
hjxilinx 已提交
2570
// set no disk space error info
D
dapan1121 已提交
2571
  tscError("sub:0x%"PRIx64" failed to flush data to disk, reason:%s", ((SSqlObj *)tres)->self, tstrerror(code));
H
Haojun Liao 已提交
2572
  SSqlObj* pParentSql = trsupport->pParentSql;
H
Haojun Liao 已提交
2573 2574

  pParentSql->res.code = code;
H
hjxilinx 已提交
2575
  trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
H
Haojun Liao 已提交
2576
  tscHandleSubqueryError(trsupport, tres, pParentSql->res.code);
H
hjxilinx 已提交
2577 2578
}

H
Haojun Liao 已提交
2579 2580 2581 2582
/*
 * current query failed, and the retry count is less than the available
 * count, retry query clear previous retrieved data, then launch a new sub query
 */
D
fix bug  
dapan1121 已提交
2583 2584 2585
static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32_t code, int32_t *sent) {
  *sent = 0;
  
D
fix bug  
dapan1121 已提交
2586 2587 2588 2589 2590 2591 2592 2593 2594 2595
  SRetrieveSupport *trsupport = malloc(sizeof(SRetrieveSupport));
  if (trsupport == NULL) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  memcpy(trsupport, oriTrs, sizeof(*trsupport));

  const uint32_t nBufferSize = (1u << 16u);  // 64KB
  trsupport->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage));
  if (trsupport->localBuffer == NULL) {
D
dapan1121 已提交
2596
    tscError("0x%"PRIx64" failed to malloc buffer for local buffer, reason:%s", pSql->self, strerror(errno));
D
fix bug  
dapan1121 已提交
2597 2598 2599 2600
    tfree(trsupport);
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  
H
Haojun Liao 已提交
2601 2602 2603
  SSqlObj *pParentSql = trsupport->pParentSql;
  int32_t  subqueryIndex = trsupport->subqueryIndex;

H
Haojun Liao 已提交
2604
  STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0);
S
TD-1732  
Shengliang Guan 已提交
2605
  SVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
H
Haojun Liao 已提交
2606 2607 2608 2609 2610

  tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]);

  // clear local saved number of results
  trsupport->localBuffer->num = 0;
D
dapan1121 已提交
2611
  tscError("0x%"PRIx64" sub:0x%"PRIx64" retrieve/query failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSql->self, pSql->self,
H
Haojun Liao 已提交
2612 2613
           tstrerror(code), subqueryIndex, trsupport->numOfRetry);

2614
  SSqlObj *pNew = tscCreateSTableSubquery(trsupport->pParentSql, trsupport, pSql);
H
Haojun Liao 已提交
2615
  if (pNew == NULL) {
D
dapan1121 已提交
2616 2617
    tscError("0x%"PRIx64" sub:0x%"PRIx64" failed to create new subquery due to error:%s, abort retry, vgId:%d, orderOfSub:%d",
             oriTrs->pParentSql->self, pSql->self, tstrerror(terrno), pVgroup->vgId, oriTrs->subqueryIndex);
H
Haojun Liao 已提交
2618

2619
    pParentSql->res.code = terrno;
D
fix bug  
dapan1121 已提交
2620
    oriTrs->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
H
Haojun Liao 已提交
2621

D
fix bug  
dapan1121 已提交
2622
    tfree(trsupport);
H
Haojun Liao 已提交
2623 2624 2625
    return pParentSql->res.code;
  }

2626
  int32_t ret = tscBuildAndSendRequest(pNew, NULL);
2627

D
fix bug  
dapan1121 已提交
2628 2629
  *sent = 1;
  
2630 2631
  // if failed to process sql, let following code handle the pSql
  if (ret == TSDB_CODE_SUCCESS) {
D
fix bug  
dapan1121 已提交
2632
    tscFreeRetrieveSup(pSql);
2633
    taos_free_result(pSql);
H
Haojun Liao 已提交
2634
    return ret;
D
fix bug  
dapan1121 已提交
2635
  } else {    
2636
    pParentSql->pSubs[trsupport->subqueryIndex] = pSql;
D
fix bug  
dapan1121 已提交
2637 2638
    tscFreeRetrieveSup(pNew);
    taos_free_result(pNew);
H
Haojun Liao 已提交
2639
    return ret;
2640
  }
H
Haojun Liao 已提交
2641 2642
}

2643
void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
H
Haojun Liao 已提交
2644 2645 2646 2647 2648
  // it has been freed already
  if (pSql->param != trsupport || pSql->param == NULL) {
    return;
  }

H
Haojun Liao 已提交
2649
  SSqlObj *pParentSql = trsupport->pParentSql;
H
hjxilinx 已提交
2650 2651 2652
  int32_t  subqueryIndex = trsupport->subqueryIndex;
  
  assert(pSql != NULL);
H
Haojun Liao 已提交
2653

H
Haojun Liao 已提交
2654
  SSubqueryState* pState = &pParentSql->subState;
H
Haojun Liao 已提交
2655

2656
  // retrieved in subquery failed. OR query cancelled in retrieve phase.
H
Haojun Liao 已提交
2657 2658
  if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) {

H
hjxilinx 已提交
2659 2660
    /*
     * kill current sub-query connection, which may retrieve data from vnodes;
2661
     * Here we get: pPObj->res.code == TSDB_CODE_TSC_QUERY_CANCELLED
H
hjxilinx 已提交
2662 2663 2664
     */
    pSql->res.numOfRows = 0;
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;  // disable retry efforts
H
Haojun Liao 已提交
2665
    tscDebug("0x%"PRIx64" query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%s", pParentSql->self, pSql,
2666
             subqueryIndex, tstrerror(pParentSql->res.code));
H
hjxilinx 已提交
2667
  }
H
Haojun Liao 已提交
2668

H
hjxilinx 已提交
2669
  if (numOfRows >= 0) {  // current query is successful, but other sub query failed, still abort current query.
H
Haojun Liao 已提交
2670
    tscDebug("0x%"PRIx64" sub:0x%"PRIx64" retrieve numOfRows:%d,orderOfSub:%d", pParentSql->self, pSql->self, numOfRows, subqueryIndex);
D
dapan1121 已提交
2671
    tscError("0x%"PRIx64" sub:0x%"PRIx64" abort further retrieval due to other queries failure,orderOfSub:%d,code:%s", pParentSql->self, pSql->self,
2672
             subqueryIndex, tstrerror(pParentSql->res.code));
H
hjxilinx 已提交
2673
  } else {
H
Haojun Liao 已提交
2674
    if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pParentSql->res.code == TSDB_CODE_SUCCESS) {
D
fix bug  
dapan1121 已提交
2675 2676 2677 2678
      int32_t sent = 0;
      
      tscReissueSubquery(trsupport, pSql, numOfRows, &sent);
      if (sent) {
H
hjxilinx 已提交
2679 2680 2681
        return;
      }
    } else {  // reach the maximum retry count, abort
H
Haojun Liao 已提交
2682
      atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows);
D
dapan1121 已提交
2683
      tscError("0x%"PRIx64" sub:0x%"PRIx64" retrieve failed,code:%s,orderOfSub:%d failed.no more retry,set global code:%s", pParentSql->self, pSql->self,
H
Haojun Liao 已提交
2684
               tstrerror(numOfRows), subqueryIndex, tstrerror(pParentSql->res.code));
H
hjxilinx 已提交
2685 2686
    }
  }
H
Haojun Liao 已提交
2687

D
fix bug  
dapan1121 已提交
2688
  if (!subAndCheckDone(pSql, pParentSql, subqueryIndex)) {
H
Haojun Liao 已提交
2689 2690
    tscDebug("0x%"PRIx64" sub:0x%"PRIx64",%d freed, not finished, total:%d", pParentSql->self,
        pSql->self, trsupport->subqueryIndex, pState->numOfSub);
H
Haojun Liao 已提交
2691

H
Haojun Liao 已提交
2692
    tscFreeRetrieveSup(pSql);
S
Shengliang Guan 已提交
2693
    return;
2694
  }  
H
hjxilinx 已提交
2695 2696
  
  // all subqueries are failed
D
dapan1121 已提交
2697
  tscError("0x%"PRIx64" retrieve from %d vnode(s) completed,code:%s.FAILED.", pParentSql->self, pState->numOfSub,
H
Haojun Liao 已提交
2698 2699
      tstrerror(pParentSql->res.code));

H
hjxilinx 已提交
2700
  // release allocated resource
H
Haojun Liao 已提交
2701
  tscDestroyGlobalMergerEnv(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor,
H
Haojun Liao 已提交
2702
                            pState->numOfSub);
H
hjxilinx 已提交
2703
  
H
Haojun Liao 已提交
2704
  tscFreeRetrieveSup(pSql);
2705

H
hjxilinx 已提交
2706
  // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
H
Haojun Liao 已提交
2707
  SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd);
2708 2709 2710 2711 2712

  if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
    (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code);
  } else {  // regular super table query
    if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2713
      tscAsyncResultOnError(pParentSql);
2714 2715
    }
  }
H
hjxilinx 已提交
2716 2717
}

2718 2719
static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* pSql) {
  int32_t           idx = trsupport->subqueryIndex;
H
Haojun Liao 已提交
2720
  SSqlObj *         pParentSql = trsupport->pParentSql;
2721 2722
  tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
  
H
Haojun Liao 已提交
2723
  SSubqueryState* pState = &pParentSql->subState;
H
Haojun Liao 已提交
2724
  SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
2725
  
2726 2727
  STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
  
2728
  // data in from current vnode is stored in cache and disk
S
Shengliang Guan 已提交
2729
  uint32_t numOfRowsFromSubquery = (uint32_t)(trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num);
H
Haojun Liao 已提交
2730
  SVgroupsInfo* vgroupsInfo = pTableMetaInfo->vgroupList;
2731 2732
  tscDebug("0x%"PRIx64" sub:0x%"PRIx64" all data retrieved from ep:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pParentSql->self,
      pSql->self, vgroupsInfo->vgroups[0].epAddr[0].fqdn, vgroupsInfo->vgroups[0].vgId, numOfRowsFromSubquery, idx);
2733 2734 2735 2736
  
  tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity);

#ifdef _DEBUG_VIEW
2737
  printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->num);
2738 2739
    SSrcColumnInfo colInfo[256] = {0};
    tscGetSrcColumnInfo(colInfo, pQueryInfo);
2740 2741
    tColModelDisplayEx(pDesc->pColumnModel, trsupport->localBuffer->data, trsupport->localBuffer->num,
                       trsupport->localBuffer->num, colInfo);
2742 2743
#endif
  
H
Haojun Liao 已提交
2744
  if (tsTotalTmpDirGB != 0 && tsAvailTmpDirectorySpace < tsReservedTmpDirectorySpace) {
D
dapan1121 已提交
2745
    tscError("0x%"PRIx64" sub:0x%"PRIx64" client disk space remain %.3f GB, need at least %.3f GB, stop query", pParentSql->self, pSql->self,
H
Haojun Liao 已提交
2746
             tsAvailTmpDirectorySpace, tsReservedTmpDirectorySpace);
S
Shengliang Guan 已提交
2747 2748
    tscAbortFurtherRetryRetrieval(trsupport, pSql, TSDB_CODE_TSC_NO_DISKSPACE);
    return;
2749 2750 2751
  }
  
  // each result for a vnode is ordered as an independant list,
H
Haojun Liao 已提交
2752
  // then used as an input of loser tree for disk-based merge
2753 2754
  int32_t code = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pQueryInfo->groupbyExpr.orderType);
  if (code != 0) { // set no disk space error info, and abort retry
S
Shengliang Guan 已提交
2755 2756
    tscAbortFurtherRetryRetrieval(trsupport, pSql, code);
    return;
2757 2758
  }
  
D
fix bug  
dapan1121 已提交
2759
  if (!subAndCheckDone(pSql, pParentSql, idx)) {
2760 2761
    tscDebug("0x%"PRIx64" sub:0x%"PRIx64" orderOfSub:%d freed, not finished", pParentSql->self, pSql->self,
        trsupport->subqueryIndex);
H
Haojun Liao 已提交
2762

H
Haojun Liao 已提交
2763
    tscFreeRetrieveSup(pSql);
S
Shengliang Guan 已提交
2764
    return;
2765
  }  
2766 2767 2768 2769
  
  // all sub-queries are returned, start to local merge process
  pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage;
  
H
Haojun Liao 已提交
2770 2771
  tscDebug("0x%"PRIx64" retrieve from %d vnodes completed.final NumOfRows:%" PRId64 ",start to build loser tree",
      pParentSql->self, pState->numOfSub, pState->numOfRetrievedRows);
2772
  
H
Haojun Liao 已提交
2773
  SQueryInfo *pPQueryInfo = tscGetQueryInfo(&pParentSql->cmd);
2774 2775
  tscClearInterpInfo(pPQueryInfo);
  
H
Haojun Liao 已提交
2776
  code = tscCreateGlobalMerger(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, pPQueryInfo, &pParentSql->res.pMerger, pParentSql->self);
2777 2778 2779 2780 2781
  pParentSql->res.code = code;

  if (code == TSDB_CODE_SUCCESS && trsupport->pExtMemBuffer == NULL) {
    pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; // no result, set the result empty
  } else {
2782
    pParentSql->cmd.command = TSDB_SQL_RETRIEVE_GLOBALMERGE;
2783 2784 2785 2786
  }

  tscCreateResPointerInfo(&pParentSql->res, pPQueryInfo);

H
Haojun Liao 已提交
2787
  tscDebug("0x%"PRIx64" build loser tree completed", pParentSql->self);
2788
  
H
Haojun Liao 已提交
2789 2790 2791
  pParentSql->res.precision = pSql->res.precision;
  pParentSql->res.numOfRows = 0;
  pParentSql->res.row = 0;
2792
  pParentSql->res.numOfGroups = 0;
H
Haojun Liao 已提交
2793

2794
  tscFreeRetrieveSup(pSql);
H
Haojun Liao 已提交
2795

2796 2797 2798
  if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
    (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
  } else {
H
Haojun Liao 已提交
2799
    tscAsyncResultOnError(pParentSql);
2800
  }
2801 2802 2803
}

static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
H
Haojun Liao 已提交
2804 2805 2806 2807
  SSqlObj *pSql = (SSqlObj *)tres;
  assert(pSql != NULL);

  // this query has been freed already
H
hjxilinx 已提交
2808
  SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
H
Haojun Liao 已提交
2809
  if (pSql->param == NULL || param == NULL) {
H
Haojun Liao 已提交
2810
    tscDebug("0x%"PRIx64" already freed in dnodecallback", pSql->self);
H
Haojun Liao 已提交
2811 2812 2813
    return;
  }

H
hjxilinx 已提交
2814
  tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
H
Haojun Liao 已提交
2815
  int32_t           idx   = trsupport->subqueryIndex;
H
Haojun Liao 已提交
2816
  SSqlObj *         pParentSql = trsupport->pParentSql;
H
Haojun Liao 已提交
2817

H
Haojun Liao 已提交
2818
  SSubqueryState* pState = &pParentSql->subState;
H
hjxilinx 已提交
2819
  
H
Haojun Liao 已提交
2820
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0);
S
TD-1732  
Shengliang Guan 已提交
2821
  SVgroupInfo  *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
H
Haojun Liao 已提交
2822 2823 2824

  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
H
Haojun Liao 已提交
2825 2826
    tscDebug("0x%"PRIx64" query cancelled/failed, sub:%p, vgId:%d, orderOfSub:%d, code:%s, global code:%s",
             pParentSql->self, pSql, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(numOfRows), tstrerror(pParentSql->res.code));
H
Haojun Liao 已提交
2827 2828 2829 2830 2831 2832 2833 2834

    tscHandleSubqueryError(param, tres, numOfRows);
    return;
  }

  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(numOfRows == taos_errno(pSql));

H
Haojun Liao 已提交
2835 2836 2837 2838
    if (numOfRows == TSDB_CODE_TSC_QUERY_CANCELLED) {
      trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
    }

H
Haojun Liao 已提交
2839
    if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
D
dapan1121 已提交
2840
      tscError("0x%"PRIx64" sub:0x%"PRIx64" failed code:%s, retry:%d", pParentSql->self, pSql->self, tstrerror(numOfRows), trsupport->numOfRetry);
H
Haojun Liao 已提交
2841

D
fix bug  
dapan1121 已提交
2842 2843 2844 2845
      int32_t sent = 0;
      
      tscReissueSubquery(trsupport, pSql, numOfRows, &sent);
      if (sent) {
H
Haojun Liao 已提交
2846 2847 2848
        return;
      }
    } else {
H
Haojun Liao 已提交
2849
      tscDebug("0x%"PRIx64" sub:%p reach the max retry times, set global code:%s", pParentSql->self, pSql, tstrerror(numOfRows));
H
Haojun Liao 已提交
2850 2851 2852 2853 2854
      atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows);  // set global code and abort
    }

    tscHandleSubqueryError(param, tres, numOfRows);
    return;
H
hjxilinx 已提交
2855 2856 2857
  }
  
  SSqlRes *   pRes = &pSql->res;
H
Haojun Liao 已提交
2858
  SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
H
hjxilinx 已提交
2859 2860 2861 2862 2863
  
  if (numOfRows > 0) {
    assert(pRes->numOfRows == numOfRows);
    int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows);
    
H
Haojun Liao 已提交
2864 2865
    tscDebug("0x%"PRIx64" sub:%p retrieve numOfRows:%d totalNumOfRows:%" PRIu64 " from ep:%s, orderOfSub:%d",
        pParentSql->self, pSql, pRes->numOfRows, pState->numOfRetrievedRows, pSql->epSet.fqdn[pSql->epSet.inUse], idx);
2866

H
Haojun Liao 已提交
2867
    if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0) && !(tscGetQueryInfo(&pParentSql->cmd)->distinctTag)) {
D
dapan1121 已提交
2868 2869
      tscError("0x%"PRIx64" sub:0x%"PRIx64" num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64,
               pParentSql->self, pSql->self, tsMaxNumOfOrderedResults, num);
S
Shengliang Guan 已提交
2870 2871
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_SORTED_RES_TOO_MANY);
      return;
H
hjxilinx 已提交
2872 2873 2874
    }

#ifdef _DEBUG_VIEW
2875
    printf("received data from vnode: %"PRIu64" rows\n", pRes->numOfRows);
H
hjxilinx 已提交
2876 2877 2878 2879 2880
    SSrcColumnInfo colInfo[256] = {0};

    tscGetSrcColumnInfo(colInfo, pQueryInfo);
    tColModelDisplayEx(pDesc->pColumnModel, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo);
#endif
2881
    
H
Haojun Liao 已提交
2882 2883
    // no disk space for tmp directory
    if (tsTotalTmpDirGB != 0 && tsAvailTmpDirectorySpace < tsReservedTmpDirectorySpace) {
D
dapan1121 已提交
2884
      tscError("0x%"PRIx64" sub:0x%"PRIx64" client disk space remain %.3f GB, need at least %.3f GB, stop query", pParentSql->self, pSql->self,
H
Haojun Liao 已提交
2885
               tsAvailTmpDirectorySpace, tsReservedTmpDirectorySpace);
S
Shengliang Guan 已提交
2886 2887
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_NO_DISKSPACE);
      return;
H
hjxilinx 已提交
2888 2889 2890
    }
    
    int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data,
H
Haojun Liao 已提交
2891
                               pRes->numOfRows, pQueryInfo->groupbyExpr.orderType);
2892
    if (ret != 0) { // set no disk space error info, and abort retry
2893
      tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_NO_DISKSPACE);
2894 2895 2896 2897
    } else if (pRes->completed) {
      tscAllDataRetrievedFromDnode(trsupport, pSql);
    } else { // continue fetch data from dnode
      taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
H
hjxilinx 已提交
2898
    }
2899
    
2900 2901
  } else { // all data has been retrieved to client
    tscAllDataRetrievedFromDnode(trsupport, pSql);
H
hjxilinx 已提交
2902 2903 2904
  }
}

2905
static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
H
hjxilinx 已提交
2906 2907
  const int32_t table_index = 0;
  
2908
  SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, TSDB_SQL_SELECT, prevSqlObj);
H
hjxilinx 已提交
2909
  if (pNew != NULL) {  // the sub query of two-stage super table query
H
Haojun Liao 已提交
2910
    SQueryInfo *pQueryInfo = tscGetQueryInfo(&pNew->cmd);
2911

H
Haojun Liao 已提交
2912
    pNew->cmd.active = pQueryInfo;
H
hjxilinx 已提交
2913
    pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
H
Haojun Liao 已提交
2914 2915 2916 2917 2918

    // clear the limit/offset info, since it should not be sent to vnode to be executed.
    pQueryInfo->limit.limit = -1;
    pQueryInfo->limit.offset = 0;

2919
    assert(trsupport->subqueryIndex < pSql->subState.numOfSub);
H
hjxilinx 已提交
2920
    
H
hjxilinx 已提交
2921
    // launch subquery for each vnode, so the subquery index equals to the vgroupIndex.
H
hjxilinx 已提交
2922
    STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index);
H
hjxilinx 已提交
2923
    pTableMetaInfo->vgroupIndex = trsupport->subqueryIndex;
H
Haojun Liao 已提交
2924

H
hjxilinx 已提交
2925 2926 2927 2928 2929 2930
    pSql->pSubs[trsupport->subqueryIndex] = pNew;
  }
  
  return pNew;
}

2931
// todo there is are race condition in this function, while cancel is called by user.
H
hjxilinx 已提交
2932
void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
2933 2934 2935 2936 2937 2938 2939
  // the param may be null, since it may be done by other query threads. and the asyncOnError may enter in this
  // function while kill query by a user.
  if (param == NULL) {
    assert(code != TSDB_CODE_SUCCESS);
    return;
  }

2940
  SRetrieveSupport *trsupport = (SRetrieveSupport *) param;
H
hjxilinx 已提交
2941
  
H
Haojun Liao 已提交
2942
  SSqlObj*  pParentSql = trsupport->pParentSql;
2943
  SSqlObj*  pSql = (SSqlObj *) tres;
2944

H
Haojun Liao 已提交
2945
  SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
H
Haojun Liao 已提交
2946
  assert(pQueryInfo->numOfTables == 1);
H
hjxilinx 已提交
2947
  
H
Haojun Liao 已提交
2948
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0);
S
TD-1732  
Shengliang Guan 已提交
2949
  SVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[trsupport->subqueryIndex];
H
Haojun Liao 已提交
2950

H
Haojun Liao 已提交
2951
  // stable query killed or other subquery failed, all query stopped
H
Haojun Liao 已提交
2952
  if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
2953
    trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
D
dapan1121 已提交
2954 2955
    tscError("0x%"PRIx64" query cancelled or failed, sub:0x%"PRIx64", vgId:%d, orderOfSub:%d, code:%s, global code:%s",
        pParentSql->self, pSql->self, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(code), tstrerror(pParentSql->res.code));
H
Haojun Liao 已提交
2956 2957 2958

    tscHandleSubqueryError(param, tres, code);
    return;
H
hjxilinx 已提交
2959 2960 2961
  }
  
  /*
H
Haojun Liao 已提交
2962
   * if a subquery on a vnode failed, all retrieve operations from vnode that occurs later
2963
   * than this one are actually not necessary, we simply call the tscRetrieveFromDnodeCallBack
H
hjxilinx 已提交
2964 2965
   * function to abort current and remain retrieve process.
   *
2966
   * NOTE: thread safe is required.
H
hjxilinx 已提交
2967
   */
H
Haojun Liao 已提交
2968 2969
  if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
    assert(code == taos_errno(pSql));
2970

H
Haojun Liao 已提交
2971
    if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
D
dapan1121 已提交
2972
      tscError("0x%"PRIx64" sub:0x%"PRIx64" failed code:%s, retry:%d", pParentSql->self, pSql->self, tstrerror(code), trsupport->numOfRetry);
D
fix bug  
dapan1121 已提交
2973 2974 2975 2976
      
      int32_t sent = 0;
      tscReissueSubquery(trsupport, pSql, code, &sent);
      if (sent) {
H
hjxilinx 已提交
2977 2978
        return;
      }
2979
    } else {
D
dapan1121 已提交
2980
      tscError("0x%"PRIx64" sub:0x%"PRIx64" reach the max retry times, set global code:%s", pParentSql->self, pSql->self, tstrerror(code));
H
Haojun Liao 已提交
2981
      atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code);  // set global code and abort
2982
    }
H
Haojun Liao 已提交
2983 2984 2985 2986 2987

    tscHandleSubqueryError(param, tres, pParentSql->res.code);
    return;
  }

H
Haojun Liao 已提交
2988 2989
  tscDebug("0x%"PRIx64" sub:0x%"PRIx64" query complete, ep:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSql->self,
      pSql->self, pVgroup->epAddr[pSql->epSet.inUse].fqdn, pVgroup->vgId, trsupport->subqueryIndex);
H
Haojun Liao 已提交
2990

H
Haojun Liao 已提交
2991
  if (pSql->res.qId == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode
H
Haojun Liao 已提交
2992 2993 2994
    tscRetrieveFromDnodeCallBack(param, pSql, 0);
  } else {
    taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
H
hjxilinx 已提交
2995 2996 2997
  }
}

2998 2999
static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) {
  if (pParentObj->retry > pParentObj->maxRetry) {
D
dapan1121 已提交
3000
    tscError("0x%"PRIx64" max retry reached, abort the retry effort", pParentObj->self);
3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020
    return false;
  }

  for (int32_t i = 0; i < numOfSub; ++i) {
    int32_t code = pParentObj->pSubs[i]->res.code;
    if (code == TSDB_CODE_SUCCESS) {
      continue;
    }

    if (code != TSDB_CODE_TDB_TABLE_RECONFIGURE && code != TSDB_CODE_TDB_INVALID_TABLE_ID &&
        code != TSDB_CODE_VND_INVALID_VGROUP_ID && code != TSDB_CODE_RPC_NETWORK_UNAVAIL &&
        code != TSDB_CODE_APP_NOT_READY) {
      pParentObj->res.code = code;
      return false;
    }
  }

  return true;
}

H
Haojun Liao 已提交
3021 3022 3023 3024 3025 3026 3027 3028 3029
static void doFreeInsertSupporter(SSqlObj* pSqlObj) {
  assert(pSqlObj != NULL && pSqlObj->subState.numOfSub > 0);

  for(int32_t i = 0; i < pSqlObj->subState.numOfSub; ++i) {
    SSqlObj* pSql = pSqlObj->pSubs[i];
    tfree(pSql->param);
  }
}

H
Haojun Liao 已提交
3030
static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) {
H
hjxilinx 已提交
3031 3032
  SInsertSupporter *pSupporter = (SInsertSupporter *)param;
  SSqlObj* pParentObj = pSupporter->pSql;
H
Haojun Liao 已提交
3033

H
Haojun Liao 已提交
3034
  // record the total inserted rows
H
Haojun Liao 已提交
3035
  if (numOfRows > 0) {
D
fix bug  
dapan1121 已提交
3036
    atomic_add_fetch_32(&pParentObj->res.numOfRows, numOfRows);
H
Haojun Liao 已提交
3037 3038
  }

H
Haojun Liao 已提交
3039
  if (taos_errno(tres) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3040 3041 3042 3043
    SSqlObj* pSql = (SSqlObj*) tres;
    assert(pSql != NULL && pSql->res.code == numOfRows);
    
    pParentObj->res.code = pSql->res.code;
H
Haojun Liao 已提交
3044

3045
    // set the flag in the parent sqlObj
H
Haojun Liao 已提交
3046 3047
    if (pSql->cmd.insertParam.schemaAttached) {
      pParentObj->cmd.insertParam.schemaAttached = 1;
3048 3049
    }
  }
H
Haojun Liao 已提交
3050

D
fix bug  
dapan1121 已提交
3051
  if (!subAndCheckDone(tres, pParentObj, pSupporter->index)) {
H
Haojun Liao 已提交
3052
    tscDebug("0x%"PRIx64" insert:%p,%d completed, total:%d", pParentObj->self, tres, pSupporter->index, pParentObj->subState.numOfSub);
H
hjxilinx 已提交
3053 3054
    return;
  }
H
Haojun Liao 已提交
3055

H
hjxilinx 已提交
3056 3057
  // restore user defined fp
  pParentObj->fp = pParentObj->fetchFp;
3058
  int32_t numOfSub = pParentObj->subState.numOfSub;
3059
  doFreeInsertSupporter(pParentObj);
3060 3061

  if (pParentObj->res.code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3062
    tscDebug("0x%"PRIx64" Async insertion completed, total inserted:%d", pParentObj->self, pParentObj->res.numOfRows);
3063 3064 3065 3066 3067 3068

    // todo remove this parameter in async callback function definition.
    // all data has been sent to vnode, call user function
    int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows;
    (*pParentObj->fp)(pParentObj->param, pParentObj, v);
  } else {
3069
    if (!needRetryInsert(pParentObj, numOfSub)) {
H
Haojun Liao 已提交
3070
      tscAsyncResultOnError(pParentObj);
3071 3072
      return;
    }
3073

3074
    int32_t numOfFailed = 0;
3075 3076 3077 3078 3079 3080
    for(int32_t i = 0; i < numOfSub; ++i) {
      SSqlObj* pSql = pParentObj->pSubs[i];
      if (pSql->res.code != TSDB_CODE_SUCCESS) {
        numOfFailed += 1;

        // clean up tableMeta in cache
D
fix bug  
dapan1121 已提交
3081
        tscFreeQueryInfo(&pSql->cmd, false);
H
Haojun Liao 已提交
3082
        SQueryInfo* pQueryInfo = tscGetQueryInfoS(&pSql->cmd);
H
Haojun Liao 已提交
3083
        STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, 0);
3084
        tscAddTableMetaInfo(pQueryInfo, &pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL);
3085

3086 3087
        subquerySetState(pSql, &pParentObj->subState, i, 0);

H
Haojun Liao 已提交
3088
        tscDebug("0x%"PRIx64", failed sub:%d, %p", pParentObj->self, i, pSql);
3089 3090 3091
      }
    }

D
dapan1121 已提交
3092
    tscError("0x%"PRIx64" Async insertion completed, total inserted:%d rows, numOfFailed:%d, numOfTotal:%d", pParentObj->self,
3093 3094
             pParentObj->res.numOfRows, numOfFailed, numOfSub);

H
Haojun Liao 已提交
3095 3096
    tscDebug("0x%"PRIx64" cleanup %d tableMeta in hashTable before reparse sql", pParentObj->self, pParentObj->cmd.insertParam.numOfTables);
    for(int32_t i = 0; i < pParentObj->cmd.insertParam.numOfTables; ++i) {
3097
      char name[TSDB_TABLE_FNAME_LEN] = {0};
H
Haojun Liao 已提交
3098
      tNameExtractFullName(pParentObj->cmd.insertParam.pTableNameList[i], name);
3099
      taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
3100
    }
3101

3102
    pParentObj->res.code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
3103
//    pParentObj->cmd.parseFinished = false;
3104

3105
    tscResetSqlCmd(&pParentObj->cmd, false);
3106

H
Haojun Liao 已提交
3107 3108 3109
    // in case of insert, redo parsing the sql string and build new submit data block for two reasons:
    // 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly.
    // 2. vnode may need the schema information along with submit block to update its local table schema.
H
Haojun Liao 已提交
3110
    tscDebug("0x%"PRIx64" re-parse sql to generate submit data, retry:%d", pParentObj->self, pParentObj->retry);
S
TD-2475  
Shengliang Guan 已提交
3111 3112
    pParentObj->retry++;

3113 3114 3115 3116 3117
    int32_t code = tsParseSql(pParentObj, true);
    if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;

    if (code != TSDB_CODE_SUCCESS) {
      pParentObj->res.code = code;
H
Haojun Liao 已提交
3118
      tscAsyncResultOnError(pParentObj);
3119 3120 3121
      return;
    }

H
Haojun Liao 已提交
3122
    tscHandleMultivnodeInsert(pParentObj);
3123
  }
3124 3125 3126 3127 3128 3129 3130
}

/**
 * it is a subquery, so after parse the sql string, copy the submit block to payload of itself
 * @param pSql
 * @return
 */
3131
int32_t tscHandleInsertRetry(SSqlObj* pParent, SSqlObj* pSql) {
3132 3133 3134 3135
  assert(pSql != NULL && pSql->param != NULL);
  SSqlRes* pRes = &pSql->res;

  SInsertSupporter* pSupporter = (SInsertSupporter*) pSql->param;
H
Haojun Liao 已提交
3136
  assert(pSupporter->index < pSupporter->pSql->subState.numOfSub);
3137

H
Haojun Liao 已提交
3138
  STableDataBlocks* pTableDataBlock = taosArrayGetP(pParent->cmd.insertParam.pDataBlocks, pSupporter->index);
H
Haojun Liao 已提交
3139 3140 3141
  int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock);

  if ((pRes->code = code)!= TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3142
    tscAsyncResultOnError(pSql);
H
Haojun Liao 已提交
3143
    return code;  // here the pSql may have been released already.
3144 3145
  }

3146
  return tscBuildAndSendRequest(pSql, NULL);
H
hjxilinx 已提交
3147 3148 3149 3150
}

int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
H
Haojun Liao 已提交
3151 3152
  SSqlRes *pRes = &pSql->res;

3153 3154
  // it is the failure retry insert
  if (pSql->pSubs != NULL) {
D
dapan1121 已提交
3155 3156 3157 3158 3159 3160
    int32_t blockNum = (int32_t)taosArrayGetSize(pCmd->insertParam.pDataBlocks);
    if (pSql->subState.numOfSub != blockNum) {
      tscError("0x%"PRIx64" sub num:%d is not same with data block num:%d", pSql->self, pSql->subState.numOfSub, blockNum);
      pRes->code = TSDB_CODE_TSC_APP_ERROR;
      return pRes->code;
    }
H
Haojun Liao 已提交
3161

3162 3163
    for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
      SSqlObj* pSub = pSql->pSubs[i];
3164 3165 3166
      SInsertSupporter* pSup = calloc(1, sizeof(SInsertSupporter));
      pSup->index = i;
      pSup->pSql = pSql;
3167

3168
      pSub->param = pSup;
H
Haojun Liao 已提交
3169
      tscDebug("0x%"PRIx64" sub:0x%"PRIx64" launch sub insert, orderOfSub:%d", pSql->self, pSub->self, i);
3170 3171 3172 3173 3174 3175 3176 3177
      if (pSub->res.code != TSDB_CODE_SUCCESS) {
        tscHandleInsertRetry(pSql, pSub);
      }
    }

    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
3178
  pSql->subState.numOfSub = (uint16_t)taosArrayGetSize(pCmd->insertParam.pDataBlocks);
H
Haojun Liao 已提交
3179
  assert(pSql->subState.numOfSub > 0);
H
Haojun Liao 已提交
3180 3181

  pRes->code = TSDB_CODE_SUCCESS;
3182

H
Haojun Liao 已提交
3183 3184 3185
  // the number of already initialized subqueries
  int32_t numOfSub = 0;

3186 3187 3188 3189 3190 3191
  if (pSql->subState.states == NULL) {
    pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states));
    if (pSql->subState.states == NULL) {
      pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      goto _error;
    }
D
fix bug  
dapan1121 已提交
3192 3193

    pthread_mutex_init(&pSql->subState.mutex, NULL);
3194 3195 3196
  }

  memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub);
H
Haojun Liao 已提交
3197
  tscDebug("0x%"PRIx64" reset all sub states to 0", pSql->self);
3198

H
Haojun Liao 已提交
3199
  pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
H
Haojun Liao 已提交
3200 3201 3202 3203
  if (pSql->pSubs == NULL) {
    goto _error;
  }

H
Haojun Liao 已提交
3204
  tscDebug("0x%"PRIx64" submit data to %d vnode(s)", pSql->self, pSql->subState.numOfSub);
3205

H
Haojun Liao 已提交
3206
  while(numOfSub < pSql->subState.numOfSub) {
H
Haojun Liao 已提交
3207
    SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter));
H
Haojun Liao 已提交
3208 3209 3210 3211
    if (pSupporter == NULL) {
      goto _error;
    }

3212 3213 3214 3215
    pSupporter->pSql   = pSql;
    pSupporter->index  = numOfSub;

    SSqlObj *pNew = createSimpleSubObj(pSql, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT);
H
hjxilinx 已提交
3216
    if (pNew == NULL) {
D
dapan1121 已提交
3217
      tscError("0x%"PRIx64" failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql->self, numOfSub, strerror(errno));
H
Haojun Liao 已提交
3218
      goto _error;
H
hjxilinx 已提交
3219
    }
3220 3221 3222
  
    /*
     * assign the callback function to fetchFp to make sure that the error process function can restore
H
Haojun Liao 已提交
3223
     * the callback function (multiVnodeInsertFinalize) correctly.
3224 3225
     */
    pNew->fetchFp = pNew->fp;
H
Haojun Liao 已提交
3226
    pSql->pSubs[numOfSub] = pNew;
H
Haojun Liao 已提交
3227

H
Haojun Liao 已提交
3228
    STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->insertParam.pDataBlocks, numOfSub);
3229
    pRes->code = tscCopyDataBlockToPayload(pNew, pTableDataBlock);
H
Haojun Liao 已提交
3230
    if (pRes->code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3231
      tscDebug("0x%"PRIx64" sub:%p create subObj success. orderOfSub:%d", pSql->self, pNew, numOfSub);
3232
      numOfSub++;
H
Haojun Liao 已提交
3233
    } else {
H
Haojun Liao 已提交
3234
      tscDebug("0x%"PRIx64" prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%s", pSql->self, numOfSub,
H
Haojun Liao 已提交
3235
               pSql->subState.numOfSub, tstrerror(pRes->code));
H
Haojun Liao 已提交
3236
      goto _error;
H
Haojun Liao 已提交
3237
    }
H
hjxilinx 已提交
3238 3239
  }
  
H
Haojun Liao 已提交
3240
  if (numOfSub < pSql->subState.numOfSub) {
D
dapan1121 已提交
3241
    tscError("0x%"PRIx64" failed to prepare subObj structure and launch sub-insertion", pSql->self);
3242
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
Haojun Liao 已提交
3243
    goto _error;
H
hjxilinx 已提交
3244
  }
H
Haojun Liao 已提交
3245

H
Haojun Liao 已提交
3246
  pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks);
3247

H
Haojun Liao 已提交
3248 3249
  // use the local variable
  for (int32_t j = 0; j < numOfSub; ++j) {
H
hjxilinx 已提交
3250
    SSqlObj *pSub = pSql->pSubs[j];
H
Haojun Liao 已提交
3251
    tscDebug("0x%"PRIx64" sub:%p launch sub insert, orderOfSub:%d", pSql->self, pSub, j);
3252
    tscBuildAndSendRequest(pSub, NULL);
H
hjxilinx 已提交
3253
  }
H
Haojun Liao 已提交
3254

H
hjxilinx 已提交
3255
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
3256 3257 3258

  _error:
  return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hjxilinx 已提交
3259
}
H
hjxilinx 已提交
3260

H
Haojun Liao 已提交
3261
static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t columnIndex, int16_t* bytes) {
H
Haojun Liao 已提交
3262
  SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
H
Haojun Liao 已提交
3263

H
Haojun Liao 已提交
3264
  SInternalField* pInfo = (SInternalField*) TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, columnIndex);
H
Haojun Liao 已提交
3265
  assert(pInfo->pExpr->pExpr == NULL);
H
Haojun Liao 已提交
3266

H
Haojun Liao 已提交
3267
  *bytes = pInfo->pExpr->base.resBytes;
H
Haojun Liao 已提交
3268 3269 3270
  if (pRes->data != NULL) {
    return pRes->data + pInfo->pExpr->base.offset * pRes->numOfRows + pRes->row * (*bytes);
  } else {
H
Haojun Liao 已提交
3271
    return ((char*)pRes->urow[columnIndex]) + pRes->row * (*bytes);
H
Haojun Liao 已提交
3272
  }
H
Haojun Liao 已提交
3273 3274 3275 3276 3277
}

static void doBuildResFromSubqueries(SSqlObj* pSql) {
  SSqlRes* pRes = &pSql->res;

H
Haojun Liao 已提交
3278
  SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
H
Haojun Liao 已提交
3279 3280

  int32_t numOfRes = INT32_MAX;
H
Haojun Liao 已提交
3281
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
Haojun Liao 已提交
3282 3283
    SSqlObj* pSub = pSql->pSubs[i];
    if (pSub == NULL) {
H
Haojun Liao 已提交
3284 3285 3286
      continue;
    }

S
Shengliang Guan 已提交
3287
    int32_t remain = (int32_t)(pSub->res.numOfRows - pSub->res.row);
H
Haojun Liao 已提交
3288
    numOfRes = (int32_t)(MIN(numOfRes, remain));
H
Haojun Liao 已提交
3289 3290
  }

H
Haojun Liao 已提交
3291 3292
  if (numOfRes == 0) {  // no result any more, free all subquery objects
    freeJoinSubqueryObj(pSql);
H
Haojun Liao 已提交
3293 3294 3295
    return;
  }

3296
//  tscRestoreFuncForSTableQuery(pQueryInfo);
H
Haojun Liao 已提交
3297
  int32_t rowSize = tscGetResRowLength(pQueryInfo->exprList);
H
Haojun Liao 已提交
3298

H
Haojun Liao 已提交
3299 3300
  assert(numOfRes * rowSize > 0);
  char* tmp = realloc(pRes->pRsp, numOfRes * rowSize + sizeof(tFilePage));
H
Haojun Liao 已提交
3301 3302 3303 3304 3305 3306 3307
  if (tmp == NULL) {
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return;
  } else {
    pRes->pRsp = tmp;
  }

H
Haojun Liao 已提交
3308 3309
  tFilePage* pFilePage = (tFilePage*) pRes->pRsp;
  pFilePage->num = numOfRes;
H
Haojun Liao 已提交
3310

H
Haojun Liao 已提交
3311
  pRes->data = pFilePage->data;
H
Haojun Liao 已提交
3312
  char* data = pRes->data;
H
Haojun Liao 已提交
3313

H
Haojun Liao 已提交
3314 3315
  int16_t bytes = 0;

H
Haojun Liao 已提交
3316 3317 3318 3319 3320 3321 3322 3323
  tscRestoreFuncForSTableQuery(pQueryInfo);
  tscFieldInfoUpdateOffset(pQueryInfo);
  for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
    SSqlObj* pSub = pSql->pSubs[i];
    if (pSub == NULL) {
      continue;
    }

H
Haojun Liao 已提交
3324
    SQueryInfo* pSubQueryInfo = pSub->cmd.pQueryInfo;
H
Haojun Liao 已提交
3325 3326 3327 3328
    tscRestoreFuncForSTableQuery(pSubQueryInfo);
    tscFieldInfoUpdateOffset(pSubQueryInfo);
  }

H
Haojun Liao 已提交
3329
  size_t numOfExprs = tscNumOfExprs(pQueryInfo);
H
Haojun Liao 已提交
3330 3331
  for(int32_t i = 0; i < numOfExprs; ++i) {
    SColumnIndex* pIndex = &pRes->pColumnIndex[i];
H
Haojun Liao 已提交
3332 3333
    SSqlRes*      pRes1 = &pSql->pSubs[pIndex->tableIndex]->res;
    SSqlCmd*      pCmd1 = &pSql->pSubs[pIndex->tableIndex]->cmd;
H
Haojun Liao 已提交
3334 3335 3336 3337 3338

    char* pData = getResultBlockPosition(pCmd1, pRes1, pIndex->columnIndex, &bytes);
    memcpy(data, pData, bytes * numOfRes);

    data += bytes * numOfRes;
H
Haojun Liao 已提交
3339 3340 3341 3342 3343 3344 3345 3346 3347 3348
  }

  for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
    SSqlObj* pSub = pSql->pSubs[i];
    if (pSub == NULL) {
      continue;
    }

    pSub->res.row += numOfRes;
    assert(pSub->res.row <= pSub->res.numOfRows);
H
Haojun Liao 已提交
3349 3350 3351 3352
  }

  pRes->numOfRows = numOfRes;
  pRes->numOfClauseTotal += numOfRes;
H
Haojun Liao 已提交
3353 3354 3355 3356 3357 3358 3359 3360 3361 3362 3363

  int32_t finalRowSize = 0;
  for(int32_t i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
    TAOS_FIELD* pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
    finalRowSize += pField->bytes;
  }

  doArithmeticCalculate(pQueryInfo, pFilePage, rowSize, finalRowSize);

  pRes->data = pFilePage->data;
  tscSetResRawPtr(pRes, pQueryInfo);
H
Haojun Liao 已提交
3364 3365
}

H
hjxilinx 已提交
3366
void tscBuildResFromSubqueries(SSqlObj *pSql) {
H
Haojun Liao 已提交
3367 3368
  SSqlRes* pRes = &pSql->res;

H
hjxilinx 已提交
3369
  if (pRes->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3370
    tscAsyncResultOnError(pSql);
H
hjxilinx 已提交
3371 3372
    return;
  }
H
Haojun Liao 已提交
3373 3374

  if (pRes->tsrow == NULL) {
H
Haojun Liao 已提交
3375
    SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
H
Haojun Liao 已提交
3376
    pRes->numOfCols = (int16_t) tscNumOfExprs(pQueryInfo);
H
Haojun Liao 已提交
3377

H
Haojun Liao 已提交
3378 3379 3380 3381
    pRes->tsrow  = calloc(pRes->numOfCols, POINTER_BYTES);
    pRes->urow   = calloc(pRes->numOfCols, POINTER_BYTES);
    pRes->buffer = calloc(pRes->numOfCols, POINTER_BYTES);
    pRes->length = calloc(pRes->numOfCols, sizeof(int32_t));
H
Haojun Liao 已提交
3382

H
Haojun Liao 已提交
3383 3384
    if (pRes->tsrow == NULL || pRes->buffer == NULL || pRes->length == NULL) {
      pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
Haojun Liao 已提交
3385
      tscAsyncResultOnError(pSql);
H
Haojun Liao 已提交
3386 3387
      return;
    }
H
Haojun Liao 已提交
3388 3389
  }

H
Haojun Liao 已提交
3390 3391 3392 3393 3394
  assert (pRes->row >= pRes->numOfRows);
  doBuildResFromSubqueries(pSql);
  if (pRes->code == TSDB_CODE_SUCCESS) {
    (*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
  } else {
H
Haojun Liao 已提交
3395
    tscAsyncResultOnError(pSql);
H
hjxilinx 已提交
3396 3397 3398
  }
}

H
Haojun Liao 已提交
3399
char *getArithmeticInputSrc(void *param, const char *name, int32_t colId) {
3400 3401 3402
  SArithmeticSupport *pSupport = (SArithmeticSupport *) param;

  int32_t index = -1;
H
Haojun Liao 已提交
3403
  SExprInfo* pExpr = NULL;
3404 3405 3406
  
  for (int32_t i = 0; i < pSupport->numOfCols; ++i) {
    pExpr = taosArrayGetP(pSupport->exprList, i);
H
Haojun Liao 已提交
3407
    if (strncmp(name, pExpr->base.aliasName, sizeof(pExpr->base.aliasName) - 1) == 0) {
3408 3409 3410 3411 3412 3413
      index = i;
      break;
    }
  }

  assert(index >= 0 && index < pSupport->numOfCols);
H
Haojun Liao 已提交
3414
  return pSupport->data[index] + pSupport->offset * pExpr->base.resBytes;
3415 3416
}

3417
TAOS_ROW doSetResultRowData(SSqlObj *pSql) {
H
hjxilinx 已提交
3418 3419
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
H
Haojun Liao 已提交
3420

H
hjxilinx 已提交
3421 3422
  assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows);
  if (pRes->row >= pRes->numOfRows) {  // all the results has returned to invoker
S
TD-1848  
Shengliang Guan 已提交
3423
    tfree(pRes->tsrow);
H
hjxilinx 已提交
3424 3425
    return pRes->tsrow;
  }
H
Haojun Liao 已提交
3426

H
Haojun Liao 已提交
3427
  SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
H
Haojun Liao 已提交
3428

H
Haojun Liao 已提交
3429
  size_t size = tscNumOfFields(pQueryInfo);
3430 3431

  int32_t j = 0;
H
Haojun Liao 已提交
3432
  for (int i = 0; i < size; ++i) {
3433
    SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
3434 3435 3436
    if (!pInfo->visible) {
      continue;
    }
H
Haojun Liao 已提交
3437

H
Haojun Liao 已提交
3438
    int32_t type  = pInfo->field.type;
3439
    int32_t bytes = pInfo->field.bytes;
H
Haojun Liao 已提交
3440

3441
    if (type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR) {
3442
      pRes->tsrow[j] = isNull(pRes->urow[i], type) ? NULL : pRes->urow[i];
3443
    } else {
3444 3445
      pRes->tsrow[j] = isNull(pRes->urow[i], type) ? NULL : varDataVal(pRes->urow[i]);
      pRes->length[j] = varDataLen(pRes->urow[i]);
H
hjxilinx 已提交
3446
    }
H
Haojun Liao 已提交
3447

H
Haojun Liao 已提交
3448
    ((char**) pRes->urow)[i] += bytes;
3449
    j += 1;
H
hjxilinx 已提交
3450
  }
H
Haojun Liao 已提交
3451

H
hjxilinx 已提交
3452 3453 3454 3455
  pRes->row++;  // index increase one-step
  return pRes->tsrow;
}

H
Haojun Liao 已提交
3456
static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
H
hjxilinx 已提交
3457 3458 3459
  bool     hasData = true;
  SSqlCmd *pCmd = &pSql->cmd;
  
H
Haojun Liao 已提交
3460
  SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
H
hjxilinx 已提交
3461 3462 3463
  if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
    bool allSubqueryExhausted = true;
    
H
Haojun Liao 已提交
3464
    for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
3465 3466 3467 3468 3469 3470 3471
      if (pSql->pSubs[i] == NULL) {
        continue;
      }
      
      SSqlRes *pRes1 = &pSql->pSubs[i]->res;
      SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd;
      
H
Haojun Liao 已提交
3472
      SQueryInfo *pQueryInfo1 = tscGetQueryInfo(pCmd1);
H
hjxilinx 已提交
3473 3474 3475 3476 3477 3478 3479 3480 3481 3482 3483 3484 3485 3486 3487 3488 3489
      assert(pQueryInfo1->numOfTables == 1);
      
      STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo1, 0);
      
      /*
       * if the global limitation is not reached, and current result has not exhausted, or next more vnodes are
       * available, goes on
       */
      if (pTableMetaInfo->vgroupIndex < pTableMetaInfo->vgroupList->numOfVgroups && pRes1->row < pRes1->numOfRows &&
          (!tscHasReachLimitation(pQueryInfo1, pRes1))) {
        allSubqueryExhausted = false;
        break;
      }
    }
    
    hasData = !allSubqueryExhausted;
  } else {  // otherwise, in case inner join, if any subquery exhausted, query completed.
H
Haojun Liao 已提交
3490
    for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
H
hjxilinx 已提交
3491 3492 3493 3494 3495
      if (pSql->pSubs[i] == 0) {
        continue;
      }
      
      SSqlRes *   pRes1 = &pSql->pSubs[i]->res;
H
Haojun Liao 已提交
3496
      SQueryInfo *pQueryInfo1 = tscGetQueryInfo(&pSql->pSubs[i]->cmd);
H
hjxilinx 已提交
3497 3498
      
      if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pQueryInfo1, pRes1) &&
H
Haojun Liao 已提交
3499 3500
           tscIsProjectionQuery(pQueryInfo1)) ||
          (pRes1->numOfRows == 0)) {
H
hjxilinx 已提交
3501 3502 3503 3504 3505 3506 3507 3508
        hasData = false;
        break;
      }
    }
  }
  
  return hasData;
}
H
Haojun Liao 已提交
3509

H
Haojun Liao 已提交
3510
void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pSourceOperator,
3511
                               char* sql, void* merger, int32_t stage, uint64_t qId) {
H
Haojun Liao 已提交
3512
  assert(pQueryInfo != NULL);
H
Haojun Liao 已提交
3513 3514 3515 3516 3517 3518 3519
  SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
  if (pQInfo == NULL) {
    goto _cleanup;
  }

  // to make sure third party won't overwrite this structure
  pQInfo->signature = pQInfo;
3520
  pQInfo->qId       = qId;
3521 3522
  SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
  SQueryAttr       *pQueryAttr  = &pQInfo->query;
H
Haojun Liao 已提交
3523 3524

  pRuntimeEnv->pQueryAttr = pQueryAttr;
H
Haojun Liao 已提交
3525
  tscCreateQueryFromQueryInfo(pQueryInfo, pQueryAttr, NULL);
H
Haojun Liao 已提交
3526

H
Haojun Liao 已提交
3527 3528
  pQueryAttr->tableGroupInfo = *pTableGroupInfo;

H
Haojun Liao 已提交
3529
  // calculate the result row size
H
Haojun Liao 已提交
3530
  SExprInfo* pEx = NULL;
H
Haojun Liao 已提交
3531 3532
  int32_t num = 0;
  if (pQueryAttr->pExpr3 != NULL) {
H
Haojun Liao 已提交
3533
    pEx = pQueryAttr->pExpr3;
H
Haojun Liao 已提交
3534 3535
    num = pQueryAttr->numOfExpr3;
  } else if (pQueryAttr->pExpr2 != NULL) {
H
Haojun Liao 已提交
3536
    pEx = pQueryAttr->pExpr2;
H
Haojun Liao 已提交
3537 3538
    num = pQueryAttr->numOfExpr2;
  } else {
H
Haojun Liao 已提交
3539
    pEx = pQueryAttr->pExpr1;
H
Haojun Liao 已提交
3540 3541 3542 3543
    num = pQueryAttr->numOfOutput;
  }

  for (int16_t col = 0; col < num; ++col) {
H
Haojun Liao 已提交
3544
    pQueryAttr->resultRowSize += pEx[col].base.resBytes;
H
Haojun Liao 已提交
3545 3546

    // keep the tag length
H
Haojun Liao 已提交
3547 3548
    if (TSDB_COL_IS_TAG(pEx[col].base.colInfo.flag)) {
      pQueryAttr->tagLen += pEx[col].base.resBytes;
H
Haojun Liao 已提交
3549 3550 3551 3552 3553 3554 3555 3556 3557 3558 3559 3560 3561 3562 3563 3564 3565 3566 3567 3568 3569
    }
  }

  size_t numOfGroups = 0;
  if (pTableGroupInfo->pGroupList != NULL) {
    numOfGroups = taosArrayGetSize(pTableGroupInfo->pGroupList);
    STableGroupInfo* pTableqinfo = &pQInfo->runtimeEnv.tableqinfoGroupInfo;

    pTableqinfo->pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES);
    pTableqinfo->numOfTables = pTableGroupInfo->numOfTables;
    pTableqinfo->map = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
  }

  pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
  if (pQInfo->pBuf == NULL) {
    goto _cleanup;
  }

  pQInfo->dataReady = QUERY_RESULT_NOT_READY;
  pQInfo->rspContext = NULL;
  pQInfo->sql = sql;
H
Haojun Liao 已提交
3570

H
Haojun Liao 已提交
3571 3572 3573 3574 3575
  pthread_mutex_init(&pQInfo->lock, NULL);
  tsem_init(&pQInfo->ready, 0, 0);

  int32_t index = 0;
  for(int32_t i = 0; i < numOfGroups; ++i) {
H
Haojun Liao 已提交
3576
    SArray* pa = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i);
H
Haojun Liao 已提交
3577 3578 3579 3580 3581 3582 3583 3584 3585

    size_t s = taosArrayGetSize(pa);
    SArray* p1 = taosArrayInit(s, POINTER_BYTES);
    if (p1 == NULL) {
      goto _cleanup;
    }

    taosArrayPush(pRuntimeEnv->tableqinfoGroupInfo.pGroupList, &p1);

H
Haojun Liao 已提交
3586
    STimeWindow window = pQueryAttr->window;
H
Haojun Liao 已提交
3587 3588 3589 3590 3591
    for(int32_t j = 0; j < s; ++j) {
      STableKeyInfo* info = taosArrayGet(pa, j);
      window.skey = info->lastKey;

      void* buf = (char*) pQInfo->pBuf + index * sizeof(STableQueryInfo);
H
Haojun Liao 已提交
3592
      STableQueryInfo* item = createTableQueryInfo(pQueryAttr, info->pTable, pQueryAttr->groupbyColumn, window, buf);
H
Haojun Liao 已提交
3593 3594 3595 3596 3597 3598 3599 3600 3601 3602 3603 3604 3605
      if (item == NULL) {
        goto _cleanup;
      }

      item->groupIndex = i;
      taosArrayPush(p1, &item);

      STableId id = {.tid = 0, .uid = 0};
      taosHashPut(pRuntimeEnv->tableqinfoGroupInfo.map, &id.tid, sizeof(id.tid), &item, POINTER_BYTES);
      index += 1;
    }
  }

3606
  // todo refactor: filter should not be applied here.
H
Haojun Liao 已提交
3607
  createFilterInfo(pQueryAttr, 0);
3608
  pQueryAttr->numOfFilterCols = 0;
H
Haojun Liao 已提交
3609

H
Haojun Liao 已提交
3610 3611 3612 3613 3614 3615
  SArray* pa = NULL;
  if (stage == MASTER_SCAN) {
    pa = createExecOperatorPlan(pQueryAttr);
  } else {
    pa = createGlobalMergePlan(pQueryAttr);
  }
H
Haojun Liao 已提交
3616

H
Haojun Liao 已提交
3617
  STsBufInfo bufInfo = {0};
H
Haojun Liao 已提交
3618
  SQueryParam param = {.pOperator = pa};
H
Haojun Liao 已提交
3619
  /*int32_t code = */initQInfo(&bufInfo, NULL, pSourceOperator, pQInfo, &param, NULL, 0, merger);
3620 3621
  taosArrayDestroy(pa);

H
Haojun Liao 已提交
3622 3623 3624 3625 3626 3627
  return pQInfo;

  _cleanup:
  freeQInfo(pQInfo);
  return NULL;
}