diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index d79c9fc00ce255fd537536c68c0568a957e132e5..35d3184ccb898e8b11533909df254e0b373ea029 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -318,7 +318,6 @@ typedef struct SSqlObj { char freed : 4; char listed : 4; tsem_t rspSem; - pthread_mutex_t inUse; // make sure that one connection can only be utilized by one thread/process SSqlCmd cmd; SSqlRes res; uint16_t numOfSubs; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index bbba6ec636da2f59c713465b67bf4b9bc7bddfe4..a4cbd7f7ec16e15f38dc29fac2b87413ae13896d 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -123,13 +123,6 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con tsem_init(&pSql->rspSem, 0, 0); - pthread_mutexattr_t mutexattr; - memset(&mutexattr, 0, sizeof(pthread_mutexattr_t)); - - pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE_NP); - pthread_mutex_init(&pSql->inUse, &mutexattr); - pthread_mutexattr_destroy(&mutexattr); - pObj->pSql = pSql; pObj->pDnodeConn = pDnodeConn; @@ -291,23 +284,11 @@ int taos_query(TAOS *taos, const char *sqlstr) { } SSqlObj* pSql = pObj->pSql; - SSqlCmd* pCmd = &pSql->cmd; - - // now this TAOS_CONN object is in use by one thread - pthread_mutex_lock(&pSql->inUse); - - size_t sqlLen = strlen(sqlstr); + size_t sqlLen = strlen(sqlstr); doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen); // wait for the callback function to post the semaphore tsem_wait(&pSql->rspSem); - - if (pCmd->command != TSDB_SQL_SELECT && - pCmd->command != TSDB_SQL_SHOW && - pCmd->command != TSDB_SQL_DESCRIBE_TABLE) { - pthread_mutex_unlock(&pSql->inUse); - } - return pSql->res.code; } @@ -565,7 +546,6 @@ void taos_free_result(TAOS_RES *res) { tscFreeSqlObj(pSql); } else { tscPartiallyFreeSqlObj(pSql); - pthread_mutex_unlock(&pSql->inUse); // now this TAOS_CONN can be used by other threads } return; @@ -596,9 +576,8 @@ void taos_free_result(TAOS_RES *res) { if ((pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_SHOW || pCmd->command == TSDB_SQL_RETRIEVE || - pCmd->command == TSDB_SQL_FETCH) && pRes->code == TSDB_CODE_SUCCESS && - ((pCmd->command < TSDB_SQL_LOCAL && pRes->completed == false) || - (pCmd->command == TSDB_SQL_SELECT && pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL))) { + pCmd->command == TSDB_SQL_FETCH) && pRes->code == TSDB_CODE_SUCCESS && pRes->completed == false && + (pCmd->command == TSDB_SQL_SELECT && pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL)) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; tscTrace("%p send msg to free qhandle in vnode, code:%d, numOfRows:%d, command:%s", pSql, pRes->code, pRes->numOfRows, diff --git a/src/query/inc/qfill.h b/src/query/inc/qfill.h index 323ff7a8127f9310c7cfacd3ce6e6d0594faf16c..9ea9c8f7cf3df75c182f33ea5122d0752b097334 100644 --- a/src/query/inc/qfill.h +++ b/src/query/inc/qfill.h @@ -45,12 +45,13 @@ typedef struct SFillInfo { int32_t numOfCols; // number of columns, including the tags columns int32_t rowSize; // size of each row char ** pTags; // tags value for current interpolation - - int64_t slidingTime; // sliding value to determine the number of result for a given time window + int64_t slidingTime; // sliding value to determine the number of result for a given time window char * prevValues; // previous row of data, to generate the interpolation results char * nextValues; // next row of data + char** pData; // original result data block involved in filling data + int32_t capacityInRows; // data buffer size in rows + SFillColInfo* pFillCol; // column info for fill operations - char** pData; // original result data block involved in filling data } SFillInfo; typedef struct SPoint { diff --git a/src/query/src/qfill.c b/src/query/src/qfill.c index 36ffc433ce1d510dc4030ba34438014f42f57d20..8c8a50a3d86abf4d27439286a7a6d0918720efd3 100644 --- a/src/query/src/qfill.c +++ b/src/query/src/qfill.c @@ -79,7 +79,7 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_ int32_t rowsize = 0; for (int32_t i = 0; i < numOfCols; ++i) { int32_t bytes = pFillInfo->pFillCol[i].col.bytes; - pFillInfo->pData[i] = calloc(1, sizeof(tFilePage) + bytes * capacity); + pFillInfo->pData[i] = calloc(1, bytes * capacity); rowsize += bytes; } @@ -89,6 +89,8 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_ } pFillInfo->rowSize = rowsize; + pFillInfo->capacityInRows = capacity; + return pFillInfo; } @@ -119,6 +121,17 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) pFillInfo->rowIdx = 0; pFillInfo->endKey = endKey; pFillInfo->numOfRows = numOfRows; + + // ensure the space + if (pFillInfo->capacityInRows < numOfRows) { + for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) { + char* tmp = realloc(pFillInfo->pData[i], numOfRows*pFillInfo->pFillCol[i].col.bytes); + assert(tmp != NULL); // todo handle error + + memset(tmp, 0, numOfRows*pFillInfo->pFillCol[i].col.bytes); + pFillInfo->pData[i] = tmp; + } + } } void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput) { @@ -474,11 +487,11 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu } int64_t taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity) { - int32_t remain = taosNumOfRemainRows(pFillInfo); // todo use iterator? - int32_t rows = taosGetNumOfResultWithFill(pFillInfo, remain, pFillInfo->endKey, capacity); + int32_t remain = taosNumOfRemainRows(pFillInfo); // todo use iterator? + int32_t rows = taosGetNumOfResultWithFill(pFillInfo, remain, pFillInfo->endKey, capacity); + + int32_t numOfRes = generateDataBlockImpl(pFillInfo, output, remain, rows, pFillInfo->pData); + assert(numOfRes == rows); - int32_t numOfRes = generateDataBlockImpl(pFillInfo, output, remain, rows, pFillInfo->pData); - assert(numOfRes == rows); - - return numOfRes; + return numOfRes; }