diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 9cb43a9f3666f330704d968528986b38246b7351..fd82cbb8c506861d2efe17d5fb81dd8cce8408a7 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -357,7 +357,6 @@ typedef struct SSqlObj { char freed : 4; char listed : 4; tsem_t rspSem; - tsem_t emptyRspSem; SSqlCmd cmd; SSqlRes res; uint8_t numOfSubs; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index c1738e6801607703e51a6c6ef95edd6e8305f703..f05d2699258ad8518637c8e18851560be8b6589c 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -51,17 +51,15 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { tscError("failed to malloc payload"); - tfree(pSql); tscQueueAsyncError(fp, param); return; } - pSql->sqlstr = malloc(sqlLen + 1); + pSql->sqlstr = realloc(pSql->sqlstr, sqlLen + 1); if (pSql->sqlstr == NULL) { tscError("%p failed to malloc sql string buffer", pSql); tscQueueAsyncError(fp, param); free(pCmd->payload); - free(pSql); return; } @@ -412,7 +410,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { if (code != 0) { pRes->code = code; tscTrace("%p failed to renew tableMeta", pSql); - tsem_post(&pSql->rspSem); +// tsem_post(&pSql->rspSem); } else { tscTrace("%p renew tableMeta successfully, command:%d, code:%d, retry:%d", pSql, pSql->cmd.command, pSql->res.code, pSql->retry); @@ -424,7 +422,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { code = tscSendMsgToServer(pSql); if (code != 0) { pRes->code = code; - tsem_post(&pSql->rspSem); +// tsem_post(&pSql->rspSem); } } diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 241f24a74777be21ffb557209f20d227ff4ef957..8b065fcf51e3115792a6312ea74e4e0a130206a9 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -488,7 +488,6 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) { } tsem_init(&pSql->rspSem, 0, 0); - tsem_init(&pSql->emptyRspSem, 0, 1); pSql->signature = pSql; pSql->pTscObj = pObj; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index bf186051e840104ca1e908381fdf404680766d65..5696611387049b890eeaf726d465b67d6dbc3a4d 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -215,7 +215,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { if (pQueryInfo->numOfTables == 0) { pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo); } else { - pTableMetaInfo = &pQueryInfo->pTableMetaInfo[0]; + pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; } pCmd->command = pInfo->type; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index e57f9cc4104d9dc7abd6f0131fc40a34c9bc5941..591b41d9488c77094f293221b3285ca51c5cc476 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -285,14 +285,16 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { pRes->rspType = rpcMsg->msgType; pRes->rspLen = rpcMsg->contLen; - char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen); - if (tmp == NULL) { - pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; - } else { - pRes->pRsp = tmp; - if (pRes->rspLen) { + if (pRes->rspLen > 0) { + char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen); + if (tmp == NULL) { + pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; + } else { + pRes->pRsp = tmp; memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen); } + } else { + pRes->pRsp = NULL; } // ignore the error information returned from mnode when set ignore flag in sql diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index bbf46a235365541de55bee314009fdd607253912..0fe281859b23172ae9c924bd123a61624be6b4a6 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -182,7 +182,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha } tscTrace("%p DB connection is opening", pObj); - + // version compare only requires the first 3 segments of the version string int code = taosCheckVersion(version, taos_get_server_info(pObj), 3); if (code != 0) { @@ -268,9 +268,10 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) { } static void syncQueryCallback(void *param, TAOS_RES *tres, int code) { + assert(param != NULL); STscObj *pObj = (STscObj *)param; - assert(pObj != NULL && pObj->pSql != NULL); + assert(pObj->pSql != NULL); sem_post(&pObj->pSql->rspSem); } @@ -281,14 +282,7 @@ int taos_query(TAOS *taos, const char *sqlstr) { return TSDB_CODE_DISCONNECTED; } - SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); - if (pSql == NULL) { - tscError("failed to malloc sqlObj"); - return TSDB_CODE_CLI_OUT_OF_MEMORY; - } - - pObj->pSql = pSql; - tsem_init(&pSql->rspSem, 0, 0); + SSqlObj* pSql = pObj->pSql; int32_t sqlLen = strlen(sqlstr); doAsyncQuery(pObj, pObj->pSql, syncQueryCallback, taos, sqlstr, sqlLen); @@ -651,10 +645,10 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) { static void asyncFetchCallback(void *param, TAOS_RES *tres, int numOfRows) { SSqlObj* pSql = (SSqlObj*) tres; + if (numOfRows < 0) { // set the error code pSql->res.code = -numOfRows; } - sem_post(&pSql->rspSem); } @@ -754,20 +748,18 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { if (pRes == NULL || pRes->qhandle == 0) { /* Query rsp is not received from vnode, so the qhandle is NULL */ tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp); - if (pSql->fp != NULL) { - STscObj* pObj = pSql->pTscObj; - - if (pSql == pObj->pSql) { - pObj->pSql = NULL; - tscFreeSqlObj(pSql); - } - + + if (tscShouldFreeAsyncSqlObj(pSql)) { + tscFreeSqlObj(pSql); tscTrace("%p Async SqlObj is freed by app", pSql); - } else if (keepCmd) { - tscFreeSqlResult(pSql); } else { - tscFreeSqlObjPartial(pSql); + if (keepCmd) { + tscFreeSqlResult(pSql); + } else { + tscFreeSqlObjPartial(pSql); + } } + return; } @@ -836,17 +828,20 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { } } else { // if no free resource msg is sent to vnode, we free this object immediately. - - if (pSql->fp) { + bool free = tscShouldFreeAsyncSqlObj(pSql); + if (free) { assert(pRes->numOfRows == 0 || (pCmd->command > TSDB_SQL_LOCAL)); + tscFreeSqlObj(pSql); tscTrace("%p Async sql result is freed by app", pSql); - } else if (keepCmd) { - tscFreeSqlResult(pSql); - tscTrace("%p sql result is freed while sql command is kept", pSql); } else { - tscFreeSqlObjPartial(pSql); - tscTrace("%p sql result is freed", pSql); + if (keepCmd) { + tscFreeSqlResult(pSql); + tscTrace("%p sql result is freed while sql command is kept", pSql); + } else { + tscFreeSqlObjPartial(pSql); + tscTrace("%p sql result is freed by app", pSql); + } } } } @@ -892,9 +887,10 @@ char *taos_errstr(TAOS *taos) { STscObj *pObj = (STscObj *)taos; if (pObj == NULL || pObj->signature != pObj) - return (char*)tstrerror(globalCode); + return (char*)tstrerror(terrno); - SSqlObj *pSql = pObj->pSql; + SSqlObj* pSql = pObj->pSql; + if (hasAdditionalErrorInfo(pSql->res.code, &pSql->cmd)) { return pSql->cmd.payload; } else { diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 616bcbba504df2d949a4cbb3aee7ef84b7f6e082..9d172d4ea5279c648b636df0d8895c1a20bda7cd 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -124,7 +124,6 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* pSql->sqlstr = sqlstr; tsem_init(&pSql->rspSem, 0, 0); - tsem_init(&pSql->emptyRspSem, 0, 1); SSqlRes *pRes = &pSql->res; pRes->numOfRows = 1; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index b0c7b68ab4cfc263f6728d4ff8e95c224ec988cf..ca6ca369e422b448728d8761d7d5dc17e8377b0c 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -392,10 +392,10 @@ void freeSubqueryObj(SSqlObj* pSql) { static void doQuitSubquery(SSqlObj* pParentSql) { freeSubqueryObj(pParentSql); - tsem_wait(&pParentSql->emptyRspSem); - tsem_wait(&pParentSql->emptyRspSem); +// tsem_wait(&pParentSql->emptyRspSem); +// tsem_wait(&pParentSql->emptyRspSem); - tsem_post(&pParentSql->rspSem); +// tsem_post(&pParentSql->rspSem); } static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter) { @@ -567,7 +567,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { freeSubqueryObj(pParentSql); } - tsem_post(&pParentSql->rspSem); +// tsem_post(&pParentSql->rspSem); } else { tscTrace("%p sub:%p completed, completed:%d, total:%d", pParentSql, tres, finished, numOfTotal); } @@ -662,7 +662,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { } // wait for all subquery completed - tsem_wait(&pSql->rspSem); +// tsem_wait(&pSql->rspSem); // update the records for each subquery for(int32_t i = 0; i < pSql->numOfSubs; ++i) { @@ -797,10 +797,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { tscProcessSql(pSql); } else { // first retrieve from vnode during the secondary stage sub-query if (pParentSql->fp == NULL) { - tsem_wait(&pParentSql->emptyRspSem); - tsem_wait(&pParentSql->emptyRspSem); - - tsem_post(&pParentSql->rspSem); +// tsem_post(&pParentSql->rspSem); } else { // set the command flag must be after the semaphore been correctly set. // pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC; @@ -954,10 +951,7 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) { } } - tsem_post(&pSql->emptyRspSem); - tsem_wait(&pSql->rspSem); - - tsem_post(&pSql->emptyRspSem); +// tsem_wait(&pSql->rspSem); if (pSql->numOfSubs <= 0) { pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 7a585bfa68a3b82af98ae0ff358235f0ea9ed2a4..ead8c09dec3062b74745318b29c6eb3af11b96f8 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -40,15 +40,10 @@ void * tscQhandle; void * tscCheckDiskUsageTmr; int tsInsertHeadSize; -extern int tscEmbedded; -int tscNumOfThreads; -static pthread_once_t tscinit = PTHREAD_ONCE_INIT; -static pthread_mutex_t tscMutex; +int tscNumOfThreads; -extern int tsTscEnableRecordSql; -extern int tsNumOfLogLines; +static pthread_once_t tscinit = PTHREAD_ONCE_INIT; void taosInitNote(int numOfNoteLines, int maxNotes, char* lable); -void deltaToUtcInitOnce(); void tscCheckDiskUsage(void *para, void *unused) { taosGetDisk(); @@ -60,7 +55,6 @@ int32_t tscInitRpc(const char *user, const char *secret) { char secretEncrypt[32] = {0}; taosEncryptPass((uint8_t *)secret, strlen(secret), secretEncrypt); - pthread_mutex_lock(&tscMutex); if (pVnodeConn == NULL) { memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localIp = tsLocalIp; @@ -78,7 +72,6 @@ int32_t tscInitRpc(const char *user, const char *secret) { pVnodeConn = rpcOpen(&rpcInit); if (pVnodeConn == NULL) { tscError("failed to init connection to vnode"); - pthread_mutex_unlock(&tscMutex); return -1; } } @@ -100,12 +93,10 @@ int32_t tscInitRpc(const char *user, const char *secret) { pTscMgmtConn = rpcOpen(&rpcInit); if (pTscMgmtConn == NULL) { tscError("failed to init connection to mgmt"); - pthread_mutex_unlock(&tscMutex); return -1; } } - pthread_mutex_unlock(&tscMutex); return 0; } @@ -113,7 +104,7 @@ void taos_init_imp() { char temp[128]; struct stat dirstat; - pthread_mutex_init(&tscMutex, NULL); + errno = TSDB_CODE_SUCCESS; srand(taosGetTimestampSec()); deltaToUtcInitOnce(); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index d003a72774abdba5cbafc1628656fe161e1aef2c..5c7d8789eaa54d52e34dcb1d4e4c83eae7d23bbe 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -469,7 +469,7 @@ void tscFreeSqlObjPartial(SSqlObj* pSql) { pSql->freed = 0; tscFreeSqlCmdData(pCmd); - tscTrace("%p free sqlObj partial completed", pSql); + tscTrace("%p partially free sqlObj completed", pSql); } void tscFreeSqlObj(SSqlObj* pSql) { @@ -487,8 +487,6 @@ void tscFreeSqlObj(SSqlObj* pSql) { tfree(pCmd->payload); pCmd->allocSize = 0; - - tsem_destroy(&pSql->rspSem); free(pSql); } @@ -820,7 +818,9 @@ void tscCloseTscObj(STscObj* pObj) { taosTmrStopA(&(pObj->pTimer)); tscFreeSqlObj(pSql); + sem_destroy(&pSql->rspSem); pthread_mutex_destroy(&pObj->mutex); + tscTrace("%p DB connection is closed", pObj); tfree(pObj); } @@ -842,10 +842,9 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) { if (pCmd->payload == NULL) { assert(pCmd->allocSize == 0); - pCmd->payload = (char*)malloc(size); + pCmd->payload = (char*)calloc(1, size); if (pCmd->payload == NULL) return TSDB_CODE_CLI_OUT_OF_MEMORY; pCmd->allocSize = size; - memset(pCmd->payload, 0, pCmd->allocSize); } else { if (pCmd->allocSize < size) { char* b = realloc(pCmd->payload, size); @@ -853,6 +852,8 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) { pCmd->payload = b; pCmd->allocSize = size; } + + memset(pCmd->payload, 0, pCmd->payloadLen); } //memset(pCmd->payload, 0, pCmd->allocSize); @@ -1742,7 +1743,7 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) { } STscObj* pTscObj = pSql->pTscObj; - if (pSql->pStream != NULL || pTscObj->pHb == pSql) { + if (pSql->pStream != NULL || pTscObj->pHb == pSql || pTscObj->pSql == pSql) { return false; } @@ -1929,7 +1930,6 @@ STableMetaInfo* tscAddMeterMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST } pTableMetaInfo->pTableMeta = pTableMeta; -// pTableMetaInfo->pMetricMeta = pMetricMeta; pTableMetaInfo->numOfTags = numOfTags; if (tags != NULL) { @@ -1963,7 +1963,7 @@ void doRemoveMeterMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFro } void tscRemoveAllMeterMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) { - tscTrace("%p deref the metric/meter meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables); + tscTrace("%p deref the table meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables); int32_t index = pQueryInfo->numOfTables; while (index >= 0) {