diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 9cb43a9f3666f330704d968528986b38246b7351..054b2894c5cff77b63f57b16b069aaf60e150ecc 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -357,7 +357,6 @@ typedef struct SSqlObj { char freed : 4; char listed : 4; tsem_t rspSem; - tsem_t emptyRspSem; SSqlCmd cmd; SSqlRes res; uint8_t numOfSubs; @@ -409,7 +408,7 @@ int tscProcessSql(SSqlObj *pSql); int tscRenewMeterMeta(SSqlObj *pSql, char *tableId); void tscQueueAsyncRes(SSqlObj *pSql); -void tscQueueAsyncError(void(*fp), void *param); +void tscQueueAsyncError(void(*fp), void *param, int32_t code); int tscProcessLocalCmd(SSqlObj *pSql); int tscCfgDynamicOptions(char *msg); @@ -450,7 +449,7 @@ void tscFreeSqlObj(SSqlObj *pObj); void tscCloseTscObj(STscObj *pObj); -void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, int32_t sqlLen); +void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen); void tscProcessMultiVnodesInsert(SSqlObj *pSql); void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index c1738e6801607703e51a6c6ef95edd6e8305f703..10878ee37fc8eb8b2717973657965f3e26b1d31d 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -40,7 +40,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows); -void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, int32_t sqlLen) { +void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; @@ -51,17 +51,15 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { tscError("failed to malloc payload"); - tfree(pSql); - tscQueueAsyncError(fp, param); + tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY); return; } - pSql->sqlstr = malloc(sqlLen + 1); + pSql->sqlstr = realloc(pSql->sqlstr, sqlLen + 1); if (pSql->sqlstr == NULL) { tscError("%p failed to malloc sql string buffer", pSql); - tscQueueAsyncError(fp, param); + tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY); free(pCmd->payload); - free(pSql); return; } @@ -75,7 +73,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; if (code != TSDB_CODE_SUCCESS) { - pSql->res.code = (uint8_t)code; + pSql->res.code = code; tscQueueAsyncRes(pSql); return; } @@ -88,15 +86,16 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa STscObj *pObj = (STscObj *)taos; if (pObj == NULL || pObj->signature != pObj) { tscError("bug!!! pObj:%p", pObj); - globalCode = TSDB_CODE_DISCONNECTED; - tscQueueAsyncError(fp, param); + terrno = TSDB_CODE_DISCONNECTED; + tscQueueAsyncError(fp, param, TSDB_CODE_DISCONNECTED); return; } int32_t sqlLen = strlen(sqlstr); if (sqlLen > tsMaxSQLStringLen) { tscError("sql string too long"); - tscQueueAsyncError(fp, param); + terrno = TSDB_CODE_INVALID_SQL; + tscQueueAsyncError(fp, param, TSDB_CODE_INVALID_SQL); return; } @@ -105,7 +104,8 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); if (pSql == NULL) { tscError("failed to malloc sqlObj"); - tscQueueAsyncError(fp, param); + terrno = TSDB_CODE_CLI_OUT_OF_MEMORY; + tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY); return; } @@ -170,7 +170,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo pRes->code = numOfRows; } - tscQueueAsyncError(pSql->fetchFp, param); + tscQueueAsyncError(pSql->fetchFp, param, pRes->code); return; } @@ -200,8 +200,8 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi SSqlObj *pSql = (SSqlObj *)taosa; if (pSql == NULL || pSql->signature != pSql) { tscError("sql object is NULL"); - globalCode = TSDB_CODE_DISCONNECTED; - tscQueueAsyncError(fp, param); +// globalCode = TSDB_CODE_DISCONNECTED; + tscQueueAsyncError(fp, param, TSDB_CODE_DISCONNECTED); return; } @@ -210,7 +210,7 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi if (pRes->qhandle == 0) { tscError("qhandle is NULL"); - tscQueueAsyncError(fp, param); + tscQueueAsyncError(fp, param, TSDB_CODE_INVALID_QHANDLE); return; } @@ -232,8 +232,8 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), SSqlObj *pSql = (SSqlObj *)taosa; if (pSql == NULL || pSql->signature != pSql) { tscError("sql object is NULL"); - globalCode = TSDB_CODE_DISCONNECTED; - tscQueueAsyncError(fp, param); +// globalCode = TSDB_CODE_DISCONNECTED; + tscQueueAsyncError(fp, param, TSDB_CODE_DISCONNECTED); return; } @@ -242,7 +242,7 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), if (pRes->qhandle == 0) { tscError("qhandle is NULL"); - tscQueueAsyncError(fp, param); + tscQueueAsyncError(fp, param, TSDB_CODE_INVALID_QHANDLE); return; } @@ -331,7 +331,7 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) { // pCmd may be released, so cache pCmd->command int cmd = pCmd->command; - int code = pRes->code ? -pRes->code : pRes->numOfRows; + int code = pRes->code;// ? -pRes->code : pRes->numOfRows; // in case of async insert, restore the user specified callback function bool shouldFree = tscShouldFreeAsyncSqlObj(pSql); @@ -349,18 +349,20 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) { } } -void tscProcessAsyncError(SSchedMsg *pMsg) { +static void tscProcessAsyncError(SSchedMsg *pMsg) { void (*fp)() = pMsg->ahandle; - - (*fp)(pMsg->thandle, NULL, -1); + (*fp)(pMsg->thandle, NULL, *(int32_t*)pMsg->msg); } -void tscQueueAsyncError(void(*fp), void *param) { +void tscQueueAsyncError(void(*fp), void *param, int32_t code) { + int32_t* c = malloc(sizeof(int32_t)); + *c = code; + SSchedMsg schedMsg; schedMsg.fp = tscProcessAsyncError; schedMsg.ahandle = fp; schedMsg.thandle = param; - schedMsg.msg = NULL; + schedMsg.msg = c; taosScheduleTask(tscQhandle, &schedMsg); } @@ -412,7 +414,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { if (code != 0) { pRes->code = code; tscTrace("%p failed to renew tableMeta", pSql); - tsem_post(&pSql->rspSem); +// tsem_post(&pSql->rspSem); } else { tscTrace("%p renew tableMeta successfully, command:%d, code:%d, retry:%d", pSql, pSql->cmd.command, pSql->res.code, pSql->retry); @@ -424,7 +426,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { code = tscSendMsgToServer(pSql); if (code != 0) { pRes->code = code; - tsem_post(&pSql->rspSem); +// tsem_post(&pSql->rspSem); } } diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 241f24a74777be21ffb557209f20d227ff4ef957..8b065fcf51e3115792a6312ea74e4e0a130206a9 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -488,7 +488,6 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) { } tsem_init(&pSql->rspSem, 0, 0); - tsem_init(&pSql->emptyRspSem, 0, 1); pSql->signature = pSql; pSql->pTscObj = pObj; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index bf186051e840104ca1e908381fdf404680766d65..5696611387049b890eeaf726d465b67d6dbc3a4d 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -215,7 +215,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { if (pQueryInfo->numOfTables == 0) { pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo); } else { - pTableMetaInfo = &pQueryInfo->pTableMetaInfo[0]; + pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; } pCmd->command = pInfo->type; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index e57f9cc4104d9dc7abd6f0131fc40a34c9bc5941..f651c3532414dd8d40cdc0a5620763268f217444 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -285,14 +285,16 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { pRes->rspType = rpcMsg->msgType; pRes->rspLen = rpcMsg->contLen; - char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen); - if (tmp == NULL) { - pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; - } else { - pRes->pRsp = tmp; - if (pRes->rspLen) { + if (pRes->rspLen > 0) { + char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen); + if (tmp == NULL) { + pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; + } else { + pRes->pRsp = tmp; memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen); } + } else { + pRes->pRsp = NULL; } // ignore the error information returned from mnode when set ignore flag in sql @@ -327,7 +329,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { void *taosres = tscKeepConn[pCmd->command] ? pSql : NULL; rpcMsg->code = pRes->code ? pRes->code : pRes->numOfRows; - tscTrace("%p Async SQL result:%s res:%p", pSql, tstrerror(pRes->code), taosres); + tscTrace("%p Async SQL result:%s res:%p", pSql, tstrerror(pRes->code), pSql); /* * Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index bbf46a235365541de55bee314009fdd607253912..f33e589c82164ca2b09a63b478d83c66bc58e37d 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -155,6 +155,10 @@ static void syncConnCallback(void *param, TAOS_RES *tres, int code) { STscObj *pObj = (STscObj *)param; assert(pObj != NULL && pObj->pSql != NULL); + if (code < 0) { + pObj->pSql->res.code = code; + } + sem_post(&pObj->pSql->rspSem); } @@ -177,17 +181,17 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha sem_wait(&pSql->rspSem); if (pSql->res.code != TSDB_CODE_SUCCESS) { + terrno = pSql->res.code; taos_close(pObj); return NULL; } tscTrace("%p DB connection is opening", pObj); - + // version compare only requires the first 3 segments of the version string int code = taosCheckVersion(version, taos_get_server_info(pObj), 3); if (code != 0) { - pSql->res.code = code; - + terrno = code; taos_close(pObj); return NULL; } else { @@ -267,31 +271,29 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) { return pRes->code; } -static void syncQueryCallback(void *param, TAOS_RES *tres, int code) { - STscObj *pObj = (STscObj *)param; - assert(pObj != NULL && pObj->pSql != NULL); +static void waitForQueryRsp(void *param, TAOS_RES *tres, int code) { + assert(param != NULL); + SSqlObj *pSql = ((STscObj *)param)->pSql; - sem_post(&pObj->pSql->rspSem); + // valid error code is less than 0 + if (code < 0) { + pSql->res.code = code; + } + + sem_post(&pSql->rspSem); } int taos_query(TAOS *taos, const char *sqlstr) { STscObj *pObj = (STscObj *)taos; if (pObj == NULL || pObj->signature != pObj) { - globalCode = TSDB_CODE_DISCONNECTED; + terrno = TSDB_CODE_DISCONNECTED; return TSDB_CODE_DISCONNECTED; } - SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); - if (pSql == NULL) { - tscError("failed to malloc sqlObj"); - return TSDB_CODE_CLI_OUT_OF_MEMORY; - } - - pObj->pSql = pSql; - tsem_init(&pSql->rspSem, 0, 0); + SSqlObj* pSql = pObj->pSql; - int32_t sqlLen = strlen(sqlstr); - doAsyncQuery(pObj, pObj->pSql, syncQueryCallback, taos, sqlstr, sqlLen); + size_t sqlLen = strlen(sqlstr); + doAsyncQuery(pObj, pObj->pSql, waitForQueryRsp, taos, sqlstr, sqlLen); // wait for the callback function to post the semaphore sem_wait(&pSql->rspSem); @@ -649,12 +651,12 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) { return pRes->tsrow; } -static void asyncFetchCallback(void *param, TAOS_RES *tres, int numOfRows) { +static void waitForRetrieveRsp(void *param, TAOS_RES *tres, int numOfRows) { SSqlObj* pSql = (SSqlObj*) tres; + if (numOfRows < 0) { // set the error code pSql->res.code = -numOfRows; } - sem_post(&pSql->rspSem); } @@ -677,7 +679,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { // current data are exhausted, fetch more data if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pRes->completed != true && (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC || pCmd->command == TSDB_SQL_FETCH))) { - taos_fetch_rows_a(res, asyncFetchCallback, pSql->pTscObj); + taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj); sem_wait(&pSql->rspSem); } @@ -754,20 +756,18 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { if (pRes == NULL || pRes->qhandle == 0) { /* Query rsp is not received from vnode, so the qhandle is NULL */ tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp); - if (pSql->fp != NULL) { - STscObj* pObj = pSql->pTscObj; - - if (pSql == pObj->pSql) { - pObj->pSql = NULL; - tscFreeSqlObj(pSql); - } - + + if (tscShouldFreeAsyncSqlObj(pSql)) { + tscFreeSqlObj(pSql); tscTrace("%p Async SqlObj is freed by app", pSql); - } else if (keepCmd) { - tscFreeSqlResult(pSql); } else { - tscFreeSqlObjPartial(pSql); + if (keepCmd) { + tscFreeSqlResult(pSql); + } else { + tscFreeSqlObjPartial(pSql); + } } + return; } @@ -793,7 +793,7 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { * be executed, and the retry efforts may result in double free the resources, e.g.,SRetrieveSupport */ if (pRes->code != TSDB_CODE_QUERY_CANCELLED && - ((pRes->numOfRows > 0 && pCmd->command < TSDB_SQL_LOCAL) || + ((pRes->numOfRows > 0 && pCmd->command < TSDB_SQL_LOCAL && pRes->completed == false) || (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows == 0 && pCmd->command == TSDB_SQL_SELECT && pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL))) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; @@ -836,39 +836,37 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { } } else { // if no free resource msg is sent to vnode, we free this object immediately. - - if (pSql->fp) { + bool free = tscShouldFreeAsyncSqlObj(pSql); + if (free) { assert(pRes->numOfRows == 0 || (pCmd->command > TSDB_SQL_LOCAL)); + tscFreeSqlObj(pSql); tscTrace("%p Async sql result is freed by app", pSql); - } else if (keepCmd) { - tscFreeSqlResult(pSql); - tscTrace("%p sql result is freed while sql command is kept", pSql); } else { - tscFreeSqlObjPartial(pSql); - tscTrace("%p sql result is freed", pSql); + if (keepCmd) { + tscFreeSqlResult(pSql); + tscTrace("%p sql result is freed while sql command is kept", pSql); + } else { + tscFreeSqlObjPartial(pSql); + tscTrace("%p sql result is freed by app", pSql); + } } } } void taos_free_result(TAOS_RES *res) { taos_free_result_imp(res, 0); } +// todo should not be used in async query int taos_errno(TAOS *taos) { STscObj *pObj = (STscObj *)taos; - int code; - - if (pObj == NULL || pObj->signature != pObj) return globalCode; - if ((int8_t)(pObj->pSql->res.code) == -1) - code = TSDB_CODE_OTHERS; - else - code = pObj->pSql->res.code; + if (pObj == NULL || pObj->signature != pObj) { + return terrno; + } - return code; + return pObj->pSql->res.code; } -//static bool validErrorCode(int32_t code) { return code >= TSDB_CODE_SUCCESS && code < TSDB_CODE_MAX_ERROR_CODE; } - /* * In case of invalid sql error, additional information is attached to explain * why the sql is invalid @@ -888,13 +886,15 @@ static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd *pCmd) { return z != NULL; } +// todo should not be used in async model char *taos_errstr(TAOS *taos) { STscObj *pObj = (STscObj *)taos; if (pObj == NULL || pObj->signature != pObj) - return (char*)tstrerror(globalCode); + return (char*)tstrerror(terrno); - SSqlObj *pSql = pObj->pSql; + SSqlObj* pSql = pObj->pSql; + if (hasAdditionalErrorInfo(pSql->res.code, &pSql->cmd)) { return pSql->cmd.payload; } else { diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 616bcbba504df2d949a4cbb3aee7ef84b7f6e082..9d172d4ea5279c648b636df0d8895c1a20bda7cd 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -124,7 +124,6 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* pSql->sqlstr = sqlstr; tsem_init(&pSql->rspSem, 0, 0); - tsem_init(&pSql->emptyRspSem, 0, 1); SSqlRes *pRes = &pSql->res; pRes->numOfRows = 1; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index b0c7b68ab4cfc263f6728d4ff8e95c224ec988cf..ca6ca369e422b448728d8761d7d5dc17e8377b0c 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -392,10 +392,10 @@ void freeSubqueryObj(SSqlObj* pSql) { static void doQuitSubquery(SSqlObj* pParentSql) { freeSubqueryObj(pParentSql); - tsem_wait(&pParentSql->emptyRspSem); - tsem_wait(&pParentSql->emptyRspSem); +// tsem_wait(&pParentSql->emptyRspSem); +// tsem_wait(&pParentSql->emptyRspSem); - tsem_post(&pParentSql->rspSem); +// tsem_post(&pParentSql->rspSem); } static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter) { @@ -567,7 +567,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { freeSubqueryObj(pParentSql); } - tsem_post(&pParentSql->rspSem); +// tsem_post(&pParentSql->rspSem); } else { tscTrace("%p sub:%p completed, completed:%d, total:%d", pParentSql, tres, finished, numOfTotal); } @@ -662,7 +662,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { } // wait for all subquery completed - tsem_wait(&pSql->rspSem); +// tsem_wait(&pSql->rspSem); // update the records for each subquery for(int32_t i = 0; i < pSql->numOfSubs; ++i) { @@ -797,10 +797,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { tscProcessSql(pSql); } else { // first retrieve from vnode during the secondary stage sub-query if (pParentSql->fp == NULL) { - tsem_wait(&pParentSql->emptyRspSem); - tsem_wait(&pParentSql->emptyRspSem); - - tsem_post(&pParentSql->rspSem); +// tsem_post(&pParentSql->rspSem); } else { // set the command flag must be after the semaphore been correctly set. // pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC; @@ -954,10 +951,7 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) { } } - tsem_post(&pSql->emptyRspSem); - tsem_wait(&pSql->rspSem); - - tsem_post(&pSql->emptyRspSem); +// tsem_wait(&pSql->rspSem); if (pSql->numOfSubs <= 0) { pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 7a585bfa68a3b82af98ae0ff358235f0ea9ed2a4..ead8c09dec3062b74745318b29c6eb3af11b96f8 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -40,15 +40,10 @@ void * tscQhandle; void * tscCheckDiskUsageTmr; int tsInsertHeadSize; -extern int tscEmbedded; -int tscNumOfThreads; -static pthread_once_t tscinit = PTHREAD_ONCE_INIT; -static pthread_mutex_t tscMutex; +int tscNumOfThreads; -extern int tsTscEnableRecordSql; -extern int tsNumOfLogLines; +static pthread_once_t tscinit = PTHREAD_ONCE_INIT; void taosInitNote(int numOfNoteLines, int maxNotes, char* lable); -void deltaToUtcInitOnce(); void tscCheckDiskUsage(void *para, void *unused) { taosGetDisk(); @@ -60,7 +55,6 @@ int32_t tscInitRpc(const char *user, const char *secret) { char secretEncrypt[32] = {0}; taosEncryptPass((uint8_t *)secret, strlen(secret), secretEncrypt); - pthread_mutex_lock(&tscMutex); if (pVnodeConn == NULL) { memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localIp = tsLocalIp; @@ -78,7 +72,6 @@ int32_t tscInitRpc(const char *user, const char *secret) { pVnodeConn = rpcOpen(&rpcInit); if (pVnodeConn == NULL) { tscError("failed to init connection to vnode"); - pthread_mutex_unlock(&tscMutex); return -1; } } @@ -100,12 +93,10 @@ int32_t tscInitRpc(const char *user, const char *secret) { pTscMgmtConn = rpcOpen(&rpcInit); if (pTscMgmtConn == NULL) { tscError("failed to init connection to mgmt"); - pthread_mutex_unlock(&tscMutex); return -1; } } - pthread_mutex_unlock(&tscMutex); return 0; } @@ -113,7 +104,7 @@ void taos_init_imp() { char temp[128]; struct stat dirstat; - pthread_mutex_init(&tscMutex, NULL); + errno = TSDB_CODE_SUCCESS; srand(taosGetTimestampSec()); deltaToUtcInitOnce(); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index d003a72774abdba5cbafc1628656fe161e1aef2c..5c7d8789eaa54d52e34dcb1d4e4c83eae7d23bbe 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -469,7 +469,7 @@ void tscFreeSqlObjPartial(SSqlObj* pSql) { pSql->freed = 0; tscFreeSqlCmdData(pCmd); - tscTrace("%p free sqlObj partial completed", pSql); + tscTrace("%p partially free sqlObj completed", pSql); } void tscFreeSqlObj(SSqlObj* pSql) { @@ -487,8 +487,6 @@ void tscFreeSqlObj(SSqlObj* pSql) { tfree(pCmd->payload); pCmd->allocSize = 0; - - tsem_destroy(&pSql->rspSem); free(pSql); } @@ -820,7 +818,9 @@ void tscCloseTscObj(STscObj* pObj) { taosTmrStopA(&(pObj->pTimer)); tscFreeSqlObj(pSql); + sem_destroy(&pSql->rspSem); pthread_mutex_destroy(&pObj->mutex); + tscTrace("%p DB connection is closed", pObj); tfree(pObj); } @@ -842,10 +842,9 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) { if (pCmd->payload == NULL) { assert(pCmd->allocSize == 0); - pCmd->payload = (char*)malloc(size); + pCmd->payload = (char*)calloc(1, size); if (pCmd->payload == NULL) return TSDB_CODE_CLI_OUT_OF_MEMORY; pCmd->allocSize = size; - memset(pCmd->payload, 0, pCmd->allocSize); } else { if (pCmd->allocSize < size) { char* b = realloc(pCmd->payload, size); @@ -853,6 +852,8 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) { pCmd->payload = b; pCmd->allocSize = size; } + + memset(pCmd->payload, 0, pCmd->payloadLen); } //memset(pCmd->payload, 0, pCmd->allocSize); @@ -1742,7 +1743,7 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) { } STscObj* pTscObj = pSql->pTscObj; - if (pSql->pStream != NULL || pTscObj->pHb == pSql) { + if (pSql->pStream != NULL || pTscObj->pHb == pSql || pTscObj->pSql == pSql) { return false; } @@ -1929,7 +1930,6 @@ STableMetaInfo* tscAddMeterMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST } pTableMetaInfo->pTableMeta = pTableMeta; -// pTableMetaInfo->pMetricMeta = pMetricMeta; pTableMetaInfo->numOfTags = numOfTags; if (tags != NULL) { @@ -1963,7 +1963,7 @@ void doRemoveMeterMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFro } void tscRemoveAllMeterMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) { - tscTrace("%p deref the metric/meter meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables); + tscTrace("%p deref the table meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables); int32_t index = pQueryInfo->numOfTables; while (index >= 0) { diff --git a/src/os/linux/src/tlinux.c b/src/os/linux/src/tlinux.c index bce4a8f13db7727cda101943529ebbd23481a32a..780e2903a064c45211ea3877c3fb2fef3a6c8bef 100644 --- a/src/os/linux/src/tlinux.c +++ b/src/os/linux/src/tlinux.c @@ -236,7 +236,7 @@ void *taosProcessAlarmSignal(void *tharg) { void (*callback)(int) = tharg; timer_t timerId; - struct sigevent sevent; + struct sigevent sevent = {0}; #ifdef _ALPINE sevent.sigev_notify = SIGEV_THREAD; diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 75823c4e2e95f5cfc1d458ceece7d2b4acb2d193..932c04337745fb4e36cc2b04a4b6829703f4e6ed 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -5148,7 +5148,7 @@ static void singleTableQueryImpl(SQInfo* pQInfo) { int64_t st = taosGetTimestampUs(); // group by normal column, sliding window query, interval query are handled by interval query processor - if (pQuery->intervalTime != 0 || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation) + if (isIntervalQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation) tableIntervalProcessor(pQInfo); } else { if (isFixedOutputQuery(pQuery)) { diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index ff8da1cdadd130dc28b6544f58620a50429c5643..bd6699eb8484b3ee9b1bbb9cb2415f10f1d7b8f3 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -147,6 +147,11 @@ void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direct } void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) { + if (pIter->numOfFGroups == 0) { + assert(pIter->pFileGroup == NULL); + return; + } + int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE; void *ptr = taosbsearch(&fid, pIter->base, sizeof(SFileGroup), pIter->numOfFGroups, compFGroupKey, flags); if (ptr == NULL) { diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index c134ba257a7b252d56995d6ef5be44dc01120b87..599d8bd34738d047f2e3331d3a242f08976bf7b9 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -745,12 +745,23 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf int32_t index = -1; int32_t tid = pCheckInfo->tableId.tid; - SFile* pFile = &pCheckInfo->pFileGroup->files[TSDB_FILE_TYPE_DATA]; - while (1) { + while (pCheckInfo->pFileGroup != NULL) { if ((fid = getFileCompInfo(pCheckInfo, pCheckInfo->pFileGroup)) < 0) { break; } + + SFile* pFile = &pCheckInfo->pFileGroup->files[TSDB_FILE_TYPE_DATA]; + + // no data block in current file, try next + if (pCheckInfo->compIndex[tid].numOfSuperBlocks == 0) { + dTrace("QInfo:%p no data block in file, fid:%d, tid:%d, try next", pQueryHandle->qinfo, + pCheckInfo->pFileGroup->fileId, tid); + + pCheckInfo->pFileGroup = tsdbGetFileGroupNext(&pCheckInfo->fileIter); + + continue; + } index = binarySearchForBlockImpl(pCheckInfo->pCompInfo->blocks, pCheckInfo->compIndex[tid].numOfSuperBlocks, pQueryHandle->order, key); @@ -792,6 +803,7 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf pCheckInfo->pDataCols = tdNewDataCols(1000, 2, 4096); tdInitDataCols(pCheckInfo->pDataCols, pCheckInfo->pTableObj->schema); + SFile* pFile = &pCheckInfo->pFileGroup->files[TSDB_FILE_TYPE_DATA]; if (pFile->fd == FD_INITIALIZER) { pFile->fd = open(pFile->fname, O_RDONLY); }