diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 20b5e86090ca39f3fba4e7b9e0a1f90b1aacb2db..6235e0aeba415627352bc3fd864eb6e35eb6086e 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -220,15 +220,16 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); if (numOfRows > 0) { // when reaching here the first execution of stream computing is successful. - pStream->numOfRes += numOfRows; for(int32_t i = 0; i < numOfRows; ++i) { TAOS_ROW row = taos_fetch_row(res); - tscDebug("%p stream:%p fetch result", pSql, pStream); - tscStreamFillTimeGap(pStream, *(TSKEY*)row[0]); - pStream->stime = *(TSKEY *)row[0]; - - // user callback function - (*pStream->fp)(pStream->param, res, row); + if (row != NULL) { + tscDebug("%p stream:%p fetch result", pSql, pStream); + tscStreamFillTimeGap(pStream, *(TSKEY*)row[0]); + pStream->stime = *(TSKEY *)row[0]; + // user callback function + (*pStream->fp)(pStream->param, res, row); + pStream->numOfRes++; + } } if (!pStream->isProject) { diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 9569987de007c30ba30a459792a8692c24767db8..81d73236a4e9972ac4401ebbb7a5815ebe0feb4b 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -203,6 +203,7 @@ static void tscProcessSubscriptionTimer(void *handle, void *tmrId) { static SArray* getTableList( SSqlObj* pSql ) { const char* p = strstr( pSql->sqlstr, " from " ); + assert(p != NULL); // we are sure this is a 'select' statement char* sql = alloca(strlen(p) + 32); sprintf(sql, "select tbid(tbname)%s", p); diff --git a/src/dnode/src/dnodeTelemetry.c b/src/dnode/src/dnodeTelemetry.c index 60277a275a331c3ecc9d8e17189e989112d0ebc1..22287f71fd10350cc5dcf80ccc32bed3c4f09822 100644 --- a/src/dnode/src/dnodeTelemetry.c +++ b/src/dnode/src/dnodeTelemetry.c @@ -33,7 +33,7 @@ static pthread_t tsTelemetryThread; #define TELEMETRY_SERVER "telemetry.taosdata.com" #define TELEMETRY_PORT 80 -#define REPORT_INTERVAL 86400 +#define REPORT_INTERVAL 86400 static void beginObject(SBufferWriter* bw) { tbufWriteChar(bw, '{'); @@ -220,28 +220,34 @@ static void sendTelemetryReport() { taosWriteSocket(fd, tbufGetData(&bw, false), contLen); tbufCloseWriter(&bw); - taosReadSocket(fd, buf, 10); // read something to avoid nginx error 499 + // read something to avoid nginx error 499 + if (taosReadSocket(fd, buf, 10) < 0) { + dTrace("failed to receive response, reason:%s", strerror(errno)); + } taosCloseSocket(fd); } static void* telemetryThread(void* param) { - int timeToWait = 0; + struct timespec end = {0}; + clock_gettime(CLOCK_REALTIME, &end); + end.tv_sec += 300; // wait 5 minutes to send first report + while (1) { - if (timeToWait <= 0) { - if (sdbIsMaster()) { - sendTelemetryReport(); + while (1) { + if (sem_timedwait(&tsExitSem, &end) == 0) { + return NULL; + } + struct timespec now = {0}; + clock_gettime(CLOCK_REALTIME, &now); + if (now.tv_sec > end.tv_sec || (now.tv_sec == end.tv_sec && now.tv_nsec >= end.tv_nsec)) { + break; } - timeToWait = REPORT_INTERVAL; } - int startAt = taosGetTimestampSec(); - struct timespec timeout = {.tv_sec = 0, .tv_nsec = 0}; - clock_gettime(CLOCK_REALTIME, &timeout); - timeout.tv_sec += timeToWait; - if (sem_timedwait(&tsExitSem, &timeout) == 0) { - break; + if (sdbIsMaster()) { + sendTelemetryReport(); } - timeToWait -= (taosGetTimestampSec() - startAt); + end.tv_sec += REPORT_INTERVAL; } return NULL;