提交 ac8c6e06 编写于 作者: B Bomin Zhang

fix some coverity issues

also enhance telemetry report interval
上级 d9c8642d
...@@ -220,15 +220,16 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf ...@@ -220,15 +220,16 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
if (numOfRows > 0) { // when reaching here the first execution of stream computing is successful. if (numOfRows > 0) { // when reaching here the first execution of stream computing is successful.
pStream->numOfRes += numOfRows;
for(int32_t i = 0; i < numOfRows; ++i) { for(int32_t i = 0; i < numOfRows; ++i) {
TAOS_ROW row = taos_fetch_row(res); TAOS_ROW row = taos_fetch_row(res);
tscDebug("%p stream:%p fetch result", pSql, pStream); if (row != NULL) {
tscStreamFillTimeGap(pStream, *(TSKEY*)row[0]); tscDebug("%p stream:%p fetch result", pSql, pStream);
pStream->stime = *(TSKEY *)row[0]; tscStreamFillTimeGap(pStream, *(TSKEY*)row[0]);
pStream->stime = *(TSKEY *)row[0];
// user callback function // user callback function
(*pStream->fp)(pStream->param, res, row); (*pStream->fp)(pStream->param, res, row);
pStream->numOfRes++;
}
} }
if (!pStream->isProject) { if (!pStream->isProject) {
......
...@@ -203,6 +203,7 @@ static void tscProcessSubscriptionTimer(void *handle, void *tmrId) { ...@@ -203,6 +203,7 @@ static void tscProcessSubscriptionTimer(void *handle, void *tmrId) {
static SArray* getTableList( SSqlObj* pSql ) { static SArray* getTableList( SSqlObj* pSql ) {
const char* p = strstr( pSql->sqlstr, " from " ); const char* p = strstr( pSql->sqlstr, " from " );
assert(p != NULL); // we are sure this is a 'select' statement
char* sql = alloca(strlen(p) + 32); char* sql = alloca(strlen(p) + 32);
sprintf(sql, "select tbid(tbname)%s", p); sprintf(sql, "select tbid(tbname)%s", p);
......
...@@ -33,7 +33,7 @@ static pthread_t tsTelemetryThread; ...@@ -33,7 +33,7 @@ static pthread_t tsTelemetryThread;
#define TELEMETRY_SERVER "telemetry.taosdata.com" #define TELEMETRY_SERVER "telemetry.taosdata.com"
#define TELEMETRY_PORT 80 #define TELEMETRY_PORT 80
#define REPORT_INTERVAL 86400 #define REPORT_INTERVAL 86400
static void beginObject(SBufferWriter* bw) { static void beginObject(SBufferWriter* bw) {
tbufWriteChar(bw, '{'); tbufWriteChar(bw, '{');
...@@ -220,28 +220,34 @@ static void sendTelemetryReport() { ...@@ -220,28 +220,34 @@ static void sendTelemetryReport() {
taosWriteSocket(fd, tbufGetData(&bw, false), contLen); taosWriteSocket(fd, tbufGetData(&bw, false), contLen);
tbufCloseWriter(&bw); tbufCloseWriter(&bw);
taosReadSocket(fd, buf, 10); // read something to avoid nginx error 499 // read something to avoid nginx error 499
if (taosReadSocket(fd, buf, 10) < 0) {
dTrace("failed to receive response, reason:%s", strerror(errno));
}
taosCloseSocket(fd); taosCloseSocket(fd);
} }
static void* telemetryThread(void* param) { static void* telemetryThread(void* param) {
int timeToWait = 0; struct timespec end = {0};
clock_gettime(CLOCK_REALTIME, &end);
end.tv_sec += 300; // wait 5 minutes to send first report
while (1) { while (1) {
if (timeToWait <= 0) { while (1) {
if (sdbIsMaster()) { if (sem_timedwait(&tsExitSem, &end) == 0) {
sendTelemetryReport(); return NULL;
}
struct timespec now = {0};
clock_gettime(CLOCK_REALTIME, &now);
if (now.tv_sec > end.tv_sec || (now.tv_sec == end.tv_sec && now.tv_nsec >= end.tv_nsec)) {
break;
} }
timeToWait = REPORT_INTERVAL;
} }
int startAt = taosGetTimestampSec(); if (sdbIsMaster()) {
struct timespec timeout = {.tv_sec = 0, .tv_nsec = 0}; sendTelemetryReport();
clock_gettime(CLOCK_REALTIME, &timeout);
timeout.tv_sec += timeToWait;
if (sem_timedwait(&tsExitSem, &timeout) == 0) {
break;
} }
timeToWait -= (taosGetTimestampSec() - startAt); end.tv_sec += REPORT_INTERVAL;
} }
return NULL; return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册