tscStream.c 27.3 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
hjxilinx 已提交
16
#include <tschemautil.h>
S
slguan 已提交
17
#include "os.h"
H
hjxilinx 已提交
18
#include "taosmsg.h"
S
slguan 已提交
19
#include "tscLog.h"
H
hjxilinx 已提交
20 21
#include "tscUtil.h"
#include "tsched.h"
B
Bomin Zhang 已提交
22
#include "tcache.h"
H
hjxilinx 已提交
23
#include "tsclient.h"
H
hzcheng 已提交
24 25 26 27
#include "ttimer.h"
#include "tutil.h"

#include "tscProfile.h"
28
#include "tscSubquery.h"
H
hzcheng 已提交
29 30 31 32 33 34

static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows);
static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows);
static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql);
static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer);

H
hjxilinx 已提交
35 36 37 38
static int64_t getDelayValueAfterTimewindowClosed(SSqlStream* pStream, int64_t launchDelay) {
  return taosGetTimestamp(pStream->precision) + launchDelay - pStream->stime - 1;
}

39
static bool isProjectStream(SQueryInfo* pQueryInfo) {
H
hjxilinx 已提交
40
  for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
41
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
S
slguan 已提交
42
    if (pExpr->functionId != TSDB_FUNC_PRJ) {
H
hzcheng 已提交
43 44 45 46 47 48 49
      return false;
    }
  }

  return true;
}

50
static int64_t tscGetRetryDelayTime(SSqlStream* pStream, int64_t slidingTime, int16_t prec) {
S
TD-1057  
Shengliang Guan 已提交
51
  float retryRangeFactor = 0.3f;
52 53
  int64_t retryDelta = (int64_t)(tsRetryStreamCompDelay * retryRangeFactor);
  retryDelta = ((rand() % retryDelta) + tsRetryStreamCompDelay) * 1000L;
H
hzcheng 已提交
54

B
Bomin Zhang 已提交
55
  if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') {
56 57 58 59 60 61 62 63
    // change to ms
    if (prec == TSDB_TIME_PRECISION_MICRO) {
      slidingTime = slidingTime / 1000;
    }

    if (slidingTime < retryDelta) {
      return slidingTime;
    }
H
hzcheng 已提交
64
  }
65 66
  
  return retryDelta;
H
hzcheng 已提交
67 68
}

H
Haojun Liao 已提交
69 70 71 72 73
static void setRetryInfo(SSqlStream* pStream, int32_t code) {
  SSqlObj* pSql = pStream->pSql;

  pSql->res.code = code;
  int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision);
H
Haojun Liao 已提交
74
  tscDebug("0x%"PRIx64" stream:%p, get table Meta failed, retry in %" PRId64 "ms", pSql->self, pStream, retryDelayTime);
H
Haojun Liao 已提交
75 76 77
  tscSetRetryTimer(pStream, pSql, retryDelayTime);
}

78 79
static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
  SSqlStream *pStream = (SSqlStream *)param;
H
Haojun Liao 已提交
80
  assert(pStream->pSql == tres);
H
hzcheng 已提交
81

82
  SSqlObj* pSql = (SSqlObj*) tres;
H
Haojun Liao 已提交
83 84

  pSql->fp      = doLaunchQuery;
85
  pSql->fetchFp = doLaunchQuery;
86
  pSql->res.completed = false;
87

H
Haojun Liao 已提交
88 89 90 91 92
  if (code != TSDB_CODE_SUCCESS) {
    setRetryInfo(pStream, code);
    return;
  }

93
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
hjxilinx 已提交
94
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
H
hzcheng 已提交
95

96
  code = tscGetTableMeta(pSql, pTableMetaInfo);
weixin_48148422's avatar
weixin_48148422 已提交
97
  if (code == 0 && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
98
    code = tscGetSTableVgroupInfo(pSql, 0);
H
hzcheng 已提交
99 100
  }

H
Haojun Liao 已提交
101 102 103 104
  if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
    return;
  }

D
fix bug  
dapan1121 已提交
105 106 107 108 109 110
  if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) && (pTableMetaInfo->pVgroupTables == NULL) && (pTableMetaInfo->vgroupList == NULL || pTableMetaInfo->vgroupList->numOfVgroups <= 0)) {
    tscDebug("%p empty vgroup list", pSql);
    pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
    code = TSDB_CODE_TSC_APP_ERROR;
  }

111 112
  // failed to get table Meta or vgroup list, retry in 10sec.
  if (code == TSDB_CODE_SUCCESS) {
113
    tscTansformFuncForSTableQuery(pQueryInfo);
114 115 116
    
    
    tscDebug("0x%"PRIx64" stream:%p, start stream query on:%s QueryInfo->skey=%"PRId64" ekey=%"PRId64" ", pSql->self, pStream, tNameGetTableName(&pTableMetaInfo->name), pQueryInfo->window.skey, pQueryInfo->window.ekey);
117

D
fix bug  
dapan1121 已提交
118 119
    pQueryInfo->command = TSDB_SQL_SELECT;
    
120 121 122
    pSql->fp = tscProcessStreamQueryCallback;
    pSql->fetchFp = tscProcessStreamQueryCallback;
    tscDoQuery(pSql);
123
    tscIncStreamExecutionCount(pStream);
H
Haojun Liao 已提交
124 125
  } else {
    setRetryInfo(pStream, code);
126
  }
H
hzcheng 已提交
127 128
}

129 130 131 132 133
static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
  SSqlStream *pStream = (SSqlStream *)pMsg->ahandle;
  doLaunchQuery(pStream, pStream->pSql, 0);
}

H
hzcheng 已提交
134 135
static void tscProcessStreamTimer(void *handle, void *tmrId) {
  SSqlStream *pStream = (SSqlStream *)handle;
136 137 138 139
  if (pStream == NULL || pStream->pTimer != tmrId) {
    return;
  }

H
hzcheng 已提交
140 141 142 143
  pStream->pTimer = NULL;

  pStream->numOfRes = 0;  // reset the numOfRes.
  SSqlObj *pSql = pStream->pSql;
144 145 146 147
  // pSql ==  NULL  maybe killStream already called
  if(pSql == NULL) {
    return ;
  }
148
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
H
Haojun Liao 已提交
149
  tscDebug("0x%"PRIx64" add into timer", pSql->self);
H
hzcheng 已提交
150

151
  if (pStream->isProject) {
H
hzcheng 已提交
152
    /*
H
hjxilinx 已提交
153
     * pQueryInfo->window.ekey, which is the start time, does not change in case of
H
hzcheng 已提交
154 155
     * repeat first execution, once the first execution failed.
     */
H
hjxilinx 已提交
156
    pQueryInfo->window.skey = pStream->stime;  // start time
H
hzcheng 已提交
157

H
hjxilinx 已提交
158 159 160
    pQueryInfo->window.ekey = taosGetTimestamp(pStream->precision);  // end time
    if (pQueryInfo->window.ekey > pStream->etime) {
      pQueryInfo->window.ekey = pStream->etime;
H
hzcheng 已提交
161 162
    }
  } else {
163
    pQueryInfo->window.skey = pStream->stime;
164 165 166 167 168 169 170 171 172
    int64_t etime = taosGetTimestamp(pStream->precision);
    // delay to wait all data in last time window
    if (pStream->precision == TSDB_TIME_PRECISION_MICRO) {
      etime -= tsMaxStreamComputDelay * 1000l;
    } else {
      etime -= tsMaxStreamComputDelay;
    }
    if (etime > pStream->etime) {
      etime = pStream->etime;
B
Bomin Zhang 已提交
173
    } else if (pStream->interval.intervalUnit != 'y' && pStream->interval.intervalUnit != 'n') {
174 175 176 177 178
      if(pStream->stime == INT64_MIN) {
        etime = taosTimeTruncate(etime, &pStream->interval, pStream->precision);
      } else {
        etime = pStream->stime + (etime - pStream->stime) / pStream->interval.interval * pStream->interval.interval;
      }
179
    } else {
B
Bomin Zhang 已提交
180
      etime = taosTimeTruncate(etime, &pStream->interval, pStream->precision);
181 182
    }
    pQueryInfo->window.ekey = etime;
B
Bomin Zhang 已提交
183
    if (pQueryInfo->window.skey >= pQueryInfo->window.ekey) {
B
Bomin Zhang 已提交
184 185
      int64_t timer = pStream->interval.sliding;
      if (pStream->interval.intervalUnit == 'y' || pStream->interval.intervalUnit == 'n') {
186 187
        timer = 86400 * 1000l;
      } else if (pStream->precision == TSDB_TIME_PRECISION_MICRO) {
B
Bomin Zhang 已提交
188 189 190
        timer /= 1000l;
      }
      tscSetRetryTimer(pStream, pSql, timer);
B
Bomin Zhang 已提交
191 192
      return;
    }
H
hzcheng 已提交
193 194 195
  }

  // launch stream computing in a new thread
B
Bomin Zhang 已提交
196
  SSchedMsg schedMsg = { 0 };
H
hzcheng 已提交
197 198 199 200 201 202 203 204 205 206
  schedMsg.fp = tscProcessStreamLaunchQuery;
  schedMsg.ahandle = pStream;
  schedMsg.thandle = (void *)1;
  schedMsg.msg = NULL;
  taosScheduleTask(tscQhandle, &schedMsg);
}

static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows) {
  SSqlStream *pStream = (SSqlStream *)param;
  if (tres == NULL || numOfRows < 0) {
B
Bomin Zhang 已提交
207
    int64_t retryDelay = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision);
D
dapan1121 已提交
208
    tscError("0x%"PRIx64" stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql->self, pStream, numOfRows,
209
             retryDelay);
H
hzcheng 已提交
210

211
    STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0);
212

H
Haojun Liao 已提交
213 214 215 216
    char name[TSDB_TABLE_FNAME_LEN] = {0};
    tNameExtractFullName(&pTableMetaInfo->name, name);

    taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
D
fix bug  
dapan1121 已提交
217 218 219 220 221 222 223 224

    tfree(pTableMetaInfo->pTableMeta);

    tscFreeSqlResult(pStream->pSql);
    tscFreeSubobj(pStream->pSql);    
    tfree(pStream->pSql->pSubs);
    pStream->pSql->subState.numOfSub = 0;

H
Haojun Liao 已提交
225 226
    pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);

H
hzcheng 已提交
227 228 229 230 231 232 233
    tscSetRetryTimer(pStream, pStream->pSql, retryDelay);
    return;
  }

  taos_fetch_rows_a(tres, tscProcessStreamRetrieveResult, param);
}

234 235 236 237 238 239 240 241 242
// no need to be called as this is alreay done in the query
static void tscStreamFillTimeGap(SSqlStream* pStream, TSKEY ts) {
#if 0
  SSqlObj *   pSql = pStream->pSql;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  
  if (pQueryInfo->fillType != TSDB_FILL_SET_VALUE && pQueryInfo->fillType != TSDB_FILL_NULL) {
    return;
  }
S
slguan 已提交
243

244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
  SSqlRes *pRes = &pSql->res;
  /* failed to retrieve any result in this retrieve */
  pSql->res.numOfRows = 1;
  void *row[TSDB_MAX_COLUMNS] = {0};
  char  tmpRes[TSDB_MAX_BYTES_PER_ROW] = {0};
  void *oldPtr = pSql->res.data;
  pSql->res.data = tmpRes;
  int32_t rowNum = 0;

  while (pStream->stime + pStream->slidingTime < ts) {
    pStream->stime += pStream->slidingTime;
    *(TSKEY*)row[0] =  pStream->stime;
    for (int32_t i = 1; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
      int16_t     offset = tscFieldInfoGetOffset(pQueryInfo, i);
      TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
      assignVal(pSql->res.data + offset, (char *)(&pQueryInfo->fillVal[i]), pField->bytes, pField->type);
      row[i] = pSql->res.data + offset;
    }
    (*pStream->fp)(pStream->param, pSql, row);
    ++rowNum;
  }
H
hzcheng 已提交
265

266
  if (rowNum > 0) {
H
Haojun Liao 已提交
267
    tscDebug("0x%"PRIx64" stream:%p %d rows padded", pSql, pStream, rowNum);
H
hzcheng 已提交
268
  }
269 270 271 272

  pRes->numOfRows = 0;
  pRes->data = oldPtr;
#endif
H
hzcheng 已提交
273 274 275
}

static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows) {
S
slguan 已提交
276 277
  SSqlStream *    pStream = (SSqlStream *)param;
  SSqlObj *       pSql = (SSqlObj *)res;
H
hzcheng 已提交
278 279

  if (pSql == NULL || numOfRows < 0) {
B
Bomin Zhang 已提交
280
    int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision);
D
dapan1121 已提交
281
    tscError("0x%"PRIx64" stream:%p, retrieve data failed, code:0x%08x, retry in %" PRId64 "ms", pSql->self, pStream, numOfRows, retryDelayTime);
H
hjxilinx 已提交
282
  
H
hzcheng 已提交
283 284 285 286
    tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime);
    return;
  }

287 288
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);

S
slguan 已提交
289 290 291
  if (numOfRows > 0) { // when reaching here the first execution of stream computing is successful.
    for(int32_t i = 0; i < numOfRows; ++i) {
      TAOS_ROW row = taos_fetch_row(res);
B
Bomin Zhang 已提交
292
      if (row != NULL) {
H
Haojun Liao 已提交
293
        tscDebug("0x%"PRIx64" stream:%p fetch result", pSql->self, pStream);
B
Bomin Zhang 已提交
294 295 296 297 298 299
        tscStreamFillTimeGap(pStream, *(TSKEY*)row[0]);
        pStream->stime = *(TSKEY *)row[0];
        // user callback function
        (*pStream->fp)(pStream->param, res, row);
        pStream->numOfRes++;
      }
H
hzcheng 已提交
300 301
    }

302
    if (!pStream->isProject) {
B
Bomin Zhang 已提交
303
      pStream->stime = taosTimeAdd(pStream->stime, pStream->interval.sliding, pStream->interval.slidingUnit, pStream->precision);
304
    }
H
hzcheng 已提交
305 306 307 308 309
    // actually only one row is returned. this following is not necessary
    taos_fetch_rows_a(res, tscProcessStreamRetrieveResult, pStream);
  } else {  // numOfRows == 0, all data has been retrieved
    pStream->useconds += pSql->res.useconds;
    if (pStream->numOfRes == 0) {
310
      if (pStream->isProject) {
H
hzcheng 已提交
311 312 313
        /* no resuls in the query range, retry */
        // todo set retry dynamic time
        int32_t retry = tsProjectExecInterval;
D
dapan1121 已提交
314
        tscError("0x%"PRIx64" stream:%p, retrieve no data, code:0x%08x, retry in %" PRId32 "ms", pSql->self, pStream, numOfRows, retry);
H
hzcheng 已提交
315 316 317 318

        tscSetRetryTimer(pStream, pStream->pSql, retry);
        return;
      }
319 320
    } else if (pStream->isProject) {
      pStream->stime += 1;
H
hzcheng 已提交
321 322
    }

H
Haojun Liao 已提交
323
    tscDebug("0x%"PRIx64" stream:%p, query on:%s, fetch result completed, fetched rows:%" PRId64, pSql->self, pStream, tNameGetTableName(&pTableMetaInfo->name),
H
Haojun Liao 已提交
324
             pStream->numOfRes);
H
hzcheng 已提交
325

326
    tfree(pTableMetaInfo->pTableMeta);
B
Bomin Zhang 已提交
327

B
Bomin Zhang 已提交
328
    tscFreeSqlResult(pSql);
D
dapan1121 已提交
329
    tscFreeSubobj(pSql);    
S
TD-1848  
Shengliang Guan 已提交
330
    tfree(pSql->pSubs);
H
Haojun Liao 已提交
331
    pSql->subState.numOfSub = 0;
H
Haojun Liao 已提交
332
    pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
H
hzcheng 已提交
333 334 335 336 337
    tscSetNextLaunchTimer(pStream, pSql);
  }
}

static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer) {
H
hjxilinx 已提交
338
  int64_t delay = getDelayValueAfterTimewindowClosed(pStream, timer);
339
  
340
  if (pStream->isProject) {
341
    int64_t now = taosGetTimestamp(pStream->precision);
H
hzcheng 已提交
342
    int64_t etime = now > pStream->etime ? pStream->etime : now;
T
tickduan 已提交
343 344 345 346 347 348
    int64_t maxRetent = tsMaxRetentWindow * 1000;
    if(pStream->precision == TSDB_TIME_PRECISION_MICRO) {
      maxRetent *= 1000;
    }
         
    if (pStream->etime < now && now - pStream->etime > maxRetent) {
H
hzcheng 已提交
349 350 351
      /*
       * current time window will be closed, since it too early to exceed the maxRetentWindow value
       */
H
Haojun Liao 已提交
352 353
      tscDebug("0x%"PRIx64" stream:%p, etime:%" PRId64 " is too old, exceeds the max retention time window:%" PRId64 ", stop the stream",
               pStream->pSql->self, pStream, pStream->stime, pStream->etime);
H
hzcheng 已提交
354 355 356 357 358
      // TODO : How to terminate stream here
      if (pStream->callback) {
        // Callback function from upper level
        pStream->callback(pStream->param);
      }
L
lihui 已提交
359
      taos_close_stream(pStream);
H
hzcheng 已提交
360 361
      return;
    }
H
hjxilinx 已提交
362
  
H
Haojun Liao 已提交
363
    tscDebug("0x%"PRIx64" stream:%p, next start at %" PRId64 ", in %" PRId64 "ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64, pStream->pSql->self, pStream,
H
hjxilinx 已提交
364
             now + timer, timer, delay, pStream->stime, etime);
H
hzcheng 已提交
365
  } else {
366 367
    tscDebug("0x%"PRIx64" stream:%p, next start at %" PRId64 " - %" PRId64 " end, in %" PRId64 "ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64, pStream->pSql->self, pStream,
             pStream->stime, pStream->etime, timer, delay, pStream->stime - pStream->interval.interval, pStream->stime - 1);
H
hzcheng 已提交
368 369 370 371 372
  }

  pSql->cmd.command = TSDB_SQL_SELECT;

  // start timer for next computing
S
TD-1057  
Shengliang Guan 已提交
373
  taosTmrReset(tscProcessStreamTimer, (int32_t)timer, pStream, tscTmr, &pStream->pTimer);
H
hzcheng 已提交
374 375
}

H
hjxilinx 已提交
376 377 378 379
static int64_t getLaunchTimeDelay(const SSqlStream* pStream) {
  int64_t maxDelay =
      (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMaxStreamComputDelay * 1000L : tsMaxStreamComputDelay;
  
380
  int64_t delayDelta = maxDelay;
B
Bomin Zhang 已提交
381 382
  if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') {
    delayDelta = (int64_t)(pStream->interval.sliding * tsStreamComputDelayRatio);
383 384 385
    if (delayDelta > maxDelay) {
      delayDelta = maxDelay;
    }
B
Bomin Zhang 已提交
386
    int64_t remainTimeWindow = pStream->interval.sliding - delayDelta;
387 388 389
    if (maxDelay > remainTimeWindow) {
      maxDelay = (int64_t)(remainTimeWindow / 1.5f);
    }
H
hjxilinx 已提交
390 391 392 393
  }
  
  int64_t currentDelay = (rand() % maxDelay);  // a random number
  currentDelay += delayDelta;
B
Bomin Zhang 已提交
394 395
  if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') {
    assert(currentDelay < pStream->interval.sliding);
396
  }
H
hjxilinx 已提交
397 398 399 400 401
  
  return currentDelay;
}


H
hzcheng 已提交
402 403
static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
  int64_t timer = 0;
404
  
405
  if (pStream->isProject) {
H
hzcheng 已提交
406 407 408 409
    /*
     * for project query, no mater fetch data successfully or not, next launch will issue
     * more than the sliding time window
     */
B
Bomin Zhang 已提交
410
    timer = pStream->interval.sliding;
H
hzcheng 已提交
411
    if (pStream->stime > pStream->etime) {
H
Haojun Liao 已提交
412 413
      tscDebug("0x%"PRIx64" stream:%p, stime:%" PRId64 " is larger than end time: %" PRId64 ", stop the stream",
          pStream->pSql->self, pStream, pStream->stime, pStream->etime);
H
hzcheng 已提交
414 415 416 417 418
      // TODO : How to terminate stream here
      if (pStream->callback) {
        // Callback function from upper level
        pStream->callback(pStream->param);
      }
L
lihui 已提交
419
      taos_close_stream(pStream);
H
hzcheng 已提交
420 421 422
      return;
    }
  } else {
B
Bomin Zhang 已提交
423 424
    int64_t stime = taosTimeTruncate(pStream->stime - 1, &pStream->interval, pStream->precision);
    //int64_t stime = taosGetIntervalStartTimestamp(pStream->stime - 1, pStream->interval.interval, pStream->interval.interval, pStream->interval.intervalUnit, pStream->precision);
425
    if (stime >= pStream->etime) {
H
Haojun Liao 已提交
426
      tscDebug("0x%"PRIx64" stream:%p, stime:%" PRId64 " is larger than end time: %" PRId64 ", stop the stream", pStream->pSql->self, pStream,
427
               pStream->stime, pStream->etime);
H
hzcheng 已提交
428 429 430 431 432
      // TODO : How to terminate stream here
      if (pStream->callback) {
        // Callback function from upper level
        pStream->callback(pStream->param);
      }
L
lihui 已提交
433
      taos_close_stream(pStream);
H
hzcheng 已提交
434 435
      return;
    }
D
fix bug  
dapan1121 已提交
436 437 438 439 440 441

    if (pStream->stime > 0) {
      timer = pStream->stime - taosGetTimestamp(pStream->precision);
      if (timer < 0) {
        timer = 0;
      }
H
hzcheng 已提交
442 443 444
    }
  }

H
hjxilinx 已提交
445 446
  timer += getLaunchTimeDelay(pStream);
  
447 448 449 450
  if (pStream->precision == TSDB_TIME_PRECISION_MICRO) {
    timer = timer / 1000L;
  }

H
hzcheng 已提交
451 452 453
  tscSetRetryTimer(pStream, pSql, timer);
}

454
static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
455 456
  int64_t minIntervalTime =
      (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinIntervalTime * 1000L : tsMinIntervalTime;
457 458
  
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
459 460 461 462 463

  if (!pStream->isProject && pQueryInfo->interval.interval == 0) {
    sprintf(pSql->cmd.payload, "the interval value is 0");
    return -1;
  }
464
  
465
  if (pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit!= 'y' && pQueryInfo->interval.interval < minIntervalTime) {
D
dapan1121 已提交
466
    tscWarn("0x%"PRIx64" stream:%p, original sample interval:%" PRId64 " too small, reset to:%" PRId64, pSql->self, pStream,
S
TD-1530  
Shengliang Guan 已提交
467
            (int64_t)pQueryInfo->interval.interval, minIntervalTime);
468
    pQueryInfo->interval.interval = minIntervalTime;
H
hzcheng 已提交
469 470
  }

B
Bomin Zhang 已提交
471 472
  pStream->interval.intervalUnit = pQueryInfo->interval.intervalUnit;
  pStream->interval.interval = pQueryInfo->interval.interval;  // it shall be derived from sql string
H
hzcheng 已提交
473

474 475 476
  if (pQueryInfo->interval.sliding <= 0) {
    pQueryInfo->interval.sliding = pQueryInfo->interval.interval;
    pQueryInfo->interval.slidingUnit = pQueryInfo->interval.intervalUnit;
H
hzcheng 已提交
477 478
  }

479 480 481
  int64_t minSlidingTime =
      (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinSlidingTime * 1000L : tsMinSlidingTime;

482
  if (pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit!= 'y' && pQueryInfo->interval.sliding < minSlidingTime) {
D
dapan1121 已提交
483
    tscWarn("0x%"PRIx64" stream:%p, original sliding value:%" PRId64 " too small, reset to:%" PRId64, pSql->self, pStream,
484
        pQueryInfo->interval.sliding, minSlidingTime);
H
hzcheng 已提交
485

486
    pQueryInfo->interval.sliding = minSlidingTime;
H
hzcheng 已提交
487 488
  }

489
  if (pQueryInfo->interval.sliding > pQueryInfo->interval.interval) {
D
dapan1121 已提交
490
    tscWarn("0x%"PRIx64" stream:%p, sliding value:%" PRId64 " can not be larger than interval range, reset to:%" PRId64, pSql->self, pStream,
491
            pQueryInfo->interval.sliding, pQueryInfo->interval.interval);
H
hzcheng 已提交
492

493
    pQueryInfo->interval.sliding = pQueryInfo->interval.interval;
H
hzcheng 已提交
494 495
  }

B
Bomin Zhang 已提交
496 497
  pStream->interval.slidingUnit = pQueryInfo->interval.slidingUnit;
  pStream->interval.sliding = pQueryInfo->interval.sliding;
H
hjxilinx 已提交
498
  
499
  if (pStream->isProject) {
500 501
    pQueryInfo->interval.interval = 0; // clear the interval value to avoid the force time window split by query processor
    pQueryInfo->interval.sliding = 0;
502
  }
503 504

  return TSDB_CODE_SUCCESS;
505 506 507
}

static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) {
508 509
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
  
510
  if (pStream->isProject) {
H
hzcheng 已提交
511
    // no data in table, flush all data till now to destination meter, 10sec delay
B
Bomin Zhang 已提交
512 513
    pStream->interval.interval = tsProjectExecInterval;
    pStream->interval.sliding = tsProjectExecInterval;
H
hzcheng 已提交
514

D
fix bug  
dapan1121 已提交
515
    if (stime != INT64_MIN) {  // first projection start from the latest event timestamp
H
hjxilinx 已提交
516
      assert(stime >= pQueryInfo->window.skey);
H
hzcheng 已提交
517 518
      stime += 1;  // exclude the last records from table
    } else {
H
hjxilinx 已提交
519
      stime = pQueryInfo->window.skey;
H
hzcheng 已提交
520
    }
521
  } else {             // timewindow based aggregation stream
D
fix bug  
dapan1121 已提交
522
    if (stime == INT64_MIN) {  // no data in meter till now
523 524
      if (pQueryInfo->window.skey != INT64_MIN) {
        stime = pQueryInfo->window.skey;
D
fix bug  
dapan1121 已提交
525 526
      } else {
        return stime;
527
      }
D
fix bug  
dapan1121 已提交
528
      
529
      stime = taosTimeTruncate(stime, &pStream->interval, pStream->precision);
H
hzcheng 已提交
530
    } else {
B
Bomin Zhang 已提交
531
      int64_t newStime = taosTimeTruncate(stime, &pStream->interval, pStream->precision);
H
hzcheng 已提交
532
      if (newStime != stime) {
D
dapan1121 已提交
533
        tscWarn("0x%"PRIx64" stream:%p, last timestamp:%" PRId64 ", reset to:%" PRId64, pSql->self, pStream, stime, newStime);
H
hzcheng 已提交
534 535 536 537 538
        stime = newStime;
      }
    }
  }

539 540
  return stime;
}
H
hzcheng 已提交
541

542
static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) {
543 544 545 546
  int64_t timer = 0, now = taosGetTimestamp(pStream->precision);
  if (pStream->stime > now) {
    timer = pStream->stime - now;
  }
H
hzcheng 已提交
547

548 549
  int64_t startDelay =
      (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsStreamCompStartDelay * 1000L : tsStreamCompStartDelay;
H
hjxilinx 已提交
550 551 552 553
  
  timer += getLaunchTimeDelay(pStream);
  timer += startDelay;
  
554 555
  return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer;
}
556

B
Bomin Zhang 已提交
557 558 559 560 561 562
static void tscCreateStream(void *param, TAOS_RES *res, int code) {
  SSqlStream* pStream = (SSqlStream*)param;
  SSqlObj* pSql = pStream->pSql;
  SSqlCmd* pCmd = &pSql->cmd;

  if (code != TSDB_CODE_SUCCESS) {
563
    pSql->res.code = code;
D
dapan1121 已提交
564
    tscError("0x%"PRIx64" open stream failed, sql:%s, reason:%s, code:%s", pSql->self, pSql->sqlstr, pCmd->payload, tstrerror(code));
565

B
Bomin Zhang 已提交
566 567 568 569 570 571 572 573 574 575 576 577 578 579
    pStream->fp(pStream->param, NULL, NULL);
    return;
  }

  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
  STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
  
  pStream->isProject = isProjectStream(pQueryInfo);
  pStream->precision = tinfo.precision;

  pStream->ctime = taosGetTimestamp(pStream->precision);
  pStream->etime = pQueryInfo->window.ekey;

580 581 582
  if (tscSetSlidingWindowInfo(pSql, pStream) != TSDB_CODE_SUCCESS) {
    pSql->res.code = code;

D
dapan1121 已提交
583
    tscError("0x%"PRIx64" stream %p open failed, since the interval value is incorrect", pSql->self, pStream);
584 585 586
    pStream->fp(pStream->param, NULL, NULL);
    return;
  }
B
Bomin Zhang 已提交
587 588 589

  pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, pStream->stime);

590
  // set stime with ltime if ltime > stime
591
  const char* dstTable = pStream->dstTable? pStream->dstTable: "";
592
  tscDebug(" CQ table=%s ltime is %"PRId64, dstTable, pStream->ltime);
T
tickduan 已提交
593
  if(pStream->ltime != INT64_MIN && pStream->ltime > pStream->stime) {
T
tickduan 已提交
594
    tscWarn(" CQ set stream %s stime=%"PRId64" replace with ltime=%"PRId64" if ltime>0  ", dstTable, pStream->stime, pStream->ltime);
595
    pStream->stime = pStream->ltime;
596 597
  }

B
Bomin Zhang 已提交
598 599
  int64_t starttime = tscGetLaunchTimestamp(pStream);
  pCmd->command = TSDB_SQL_SELECT;
600 601 602

  tscAddIntoStreamList(pStream);

S
TD-1057  
Shengliang Guan 已提交
603
  taosTmrReset(tscProcessStreamTimer, (int32_t)starttime, pStream, tscTmr, &pStream->pTimer);
B
Bomin Zhang 已提交
604

H
Haojun Liao 已提交
605
  tscDebug("0x%"PRIx64" stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql->self,
H
Haojun Liao 已提交
606
           pStream, tNameGetTableName(&pTableMetaInfo->name), pStream->interval.interval, pStream->interval.sliding, starttime, pSql->sqlstr);
B
Bomin Zhang 已提交
607 608
}

609 610 611 612
void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable) {
  pStream->dstTable = dstTable;
}

613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630
// fetchFp call back
void fetchFpStreamLastRow(void* param ,TAOS_RES* res, int num) {
  SSqlStream* pStream = (SSqlStream*)param;
  SSqlObj* pSql = res;
  
  // get row data set to ltime
  tscSetSqlOwner(pSql);
  TAOS_ROW row = doSetResultRowData(pSql);
  if( row && row[0] ) {
    pStream->ltime = *((int64_t*)row[0]);
    const char* dstTable = pStream->dstTable? pStream->dstTable: "";
    tscDebug(" CQ stream table=%s last row time=%"PRId64" .", dstTable, pStream->ltime);
  }
  tscClearSqlOwner(pSql);

  // no condition call 
  tscCreateStream(param, pStream->pSql, TSDB_CODE_SUCCESS);
  taos_free_result(res);
631 632
}

633 634 635 636 637 638 639 640 641 642 643
//  fp callback 
void fpStreamLastRow(void* param ,TAOS_RES* res, int code) {
  // check result successful
  if (code != TSDB_CODE_SUCCESS) {
    tscCreateStream(param, res, TSDB_CODE_SUCCESS);
    taos_free_result(res);
    return ;
  }

  // asynchronous fetch last row data
  taos_fetch_rows_a(res, fetchFpStreamLastRow, param);
644 645
}

646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672
void cbParseSql(void* param, TAOS_RES* res, int code) {
  // check result successful
  SSqlStream* pStream = (SSqlStream*)param;
  SSqlObj* pSql = pStream->pSql;
  SSqlCmd* pCmd = &pSql->cmd;
  if (code != TSDB_CODE_SUCCESS) {
    pSql->res.code = code;
    tscDebug("0x%"PRIx64" open stream parse sql failed, sql:%s, reason:%s, code:%s", pSql->self, pSql->sqlstr, pCmd->payload, tstrerror(code));
    pStream->fp(pStream->param, NULL, NULL);
    return;
  }

  // check dstTable valid
  if(pStream->dstTable == NULL || strlen(pStream->dstTable) == 0) {
    tscDebug(" cbParseSql dstTable is empty.");
    tscCreateStream(param, res, code);
    return ;
  }

  // query stream last row time async
  char sql[128] = "";
  sprintf(sql, "select last_row(*) from %s;", pStream->dstTable); 
  taos_query_a(pSql->pTscObj, sql, fpStreamLastRow, param);
  return ;
}

TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
673
                              int64_t stime, void *param, void (*callback)(void *), void* cqhandle) {
674 675 676
  STscObj *pObj = (STscObj *)taos;
  if (pObj == NULL || pObj->signature != pObj) return NULL;

677 678 679 680 681
  if(fp == NULL){
    tscError(" taos_open_stream api fp param must not NULL.");
    return NULL;
  }

682
  SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
S
slguan 已提交
683
  if (pSql == NULL) {
684 685 686 687 688
    return NULL;
  }

  pSql->signature = pSql;
  pSql->pTscObj = pObj;
B
Bomin Zhang 已提交
689

690 691 692
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

693 694
  SSqlStream *pStream = (SSqlStream *)calloc(1, sizeof(SSqlStream));
  if (pStream == NULL) {
D
dapan1121 已提交
695
    tscError("0x%"PRIx64" open stream failed, sql:%s, reason:%s, code:0x%08x", pSql->self, sqlstr, pCmd->payload, pRes->code);
696 697 698
    tscFreeSqlObj(pSql);
    return NULL;
  }
B
Bomin Zhang 已提交
699

T
tickduan 已提交
700
  pStream->ltime = INT64_MIN;
B
Bomin Zhang 已提交
701 702 703 704 705
  pStream->stime = stime;
  pStream->fp = fp;
  pStream->callback = callback;
  pStream->param = param;
  pStream->pSql = pSql;
706
  pStream->cqhandle = cqhandle;
707
  pSql->pStream = pStream;
B
Bomin Zhang 已提交
708
  pSql->param = pStream;
709
  pSql->maxRetry = TSDB_MAX_REPLICA;
710
  tscSetStreamDestTable(pStream, dstTable);
711 712 713

  pSql->sqlstr = calloc(1, strlen(sqlstr) + 1);
  if (pSql->sqlstr == NULL) {
D
dapan1121 已提交
714
    tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self);
715
    tscFreeSqlObj(pSql);
716
    free(pStream);
717
    return NULL;
718
  }
719

720 721
  strtolower(pSql->sqlstr, sqlstr);

S
Shengliang Guan 已提交
722
  tscDebugL("%p SQL: %s", pSql, pSql->sqlstr);
S
slguan 已提交
723
  tsem_init(&pSql->rspSem, 0, 0);
H
Haojun Liao 已提交
724

725 726
  pSql->fp      = cbParseSql;
  pSql->fetchFp = cbParseSql;
D
fix bug  
dapan1121 已提交
727 728

  registerSqlObj(pSql);
729

H
Haojun Liao 已提交
730
  int32_t code = tsParseSql(pSql, true);
B
Bomin Zhang 已提交
731
  if (code == TSDB_CODE_SUCCESS) {
732
    cbParseSql(pStream, pSql, code);
733
  } else if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
734
     tscDebug(" CQ taso_open_stream IN Process. sql=%s", sqlstr);
735
  } else {
D
dapan1121 已提交
736
    tscError("0x%"PRIx64" open stream failed, sql:%s, code:%s", pSql->self, sqlstr, tstrerror(code));
D
fix bug  
dapan1121 已提交
737
    taosReleaseRef(tscObjRef, pSql->self);
B
Bomin Zhang 已提交
738
    free(pStream);
739 740 741
    return NULL;
  }

H
hzcheng 已提交
742 743 744
  return pStream;
}

745 746
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
                              int64_t stime, void *param, void (*callback)(void *)) {  
747
  return taos_open_stream_withname(taos, "", sqlstr, fp, stime, param, callback, NULL);
748 749
}

H
hzcheng 已提交
750 751 752
void taos_close_stream(TAOS_STREAM *handle) {
  SSqlStream *pStream = (SSqlStream *)handle;

weixin_48148422's avatar
weixin_48148422 已提交
753
  SSqlObj *pSql = (SSqlObj *)atomic_exchange_ptr(&pStream->pSql, 0);
H
hzcheng 已提交
754 755 756 757 758 759 760 761 762 763 764 765
  if (pSql == NULL) {
    return;
  }

  /*
   * stream may be closed twice, 1. drop dst table, 2. kill stream
   * Here, we need a check before release memory
   */
  if (pSql->signature == pSql) {
    tscRemoveFromStreamList(pStream, pSql);

    taosTmrStopA(&(pStream->pTimer));
sangshuduo's avatar
sangshuduo 已提交
766

H
Haojun Liao 已提交
767
    tscDebug("0x%"PRIx64" stream:%p is closed", pSql->self, pStream);
B
Bomin Zhang 已提交
768 769
    // notify CQ to release the pStream object
    pStream->fp(pStream->param, NULL, NULL);
H
hzcheng 已提交
770
    pStream->pSql = NULL;
771 772

    taos_free_result(pSql);
S
TD-1848  
Shengliang Guan 已提交
773
    tfree(pStream);
H
hzcheng 已提交
774 775
  }
}