diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index f64287bdc4f2bfbe326d3a926673761e265c087d..bc3c33423f85fcc9c81e122b9a4e3904f598410a 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -413,13 +413,13 @@ static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor } int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, int32_t orderType) { - int32_t ret = tscFlushTmpBufferImpl(pMemoryBuf, pDesc, pPage, orderType); - if (ret != 0) { - return -1; + int32_t ret = 0; + if ((ret = tscFlushTmpBufferImpl(pMemoryBuf, pDesc, pPage, orderType)) != 0) { + return ret; } - if (!tExtMemBufferFlush(pMemoryBuf)) { - return -1; + if ((ret = tExtMemBufferFlush(pMemoryBuf)) != 0) { + return ret; } return 0; @@ -440,9 +440,9 @@ int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePa // current buffer is full, need to flushed to disk assert(pPage->num == pModel->capacity); - int32_t ret = tscFlushTmpBuffer(pMemoryBuf, pDesc, pPage, orderType); - if (ret != 0) { - return -1; + int32_t code = tscFlushTmpBuffer(pMemoryBuf, pDesc, pPage, orderType); + if (code != 0) { + return code; } int32_t remain = numOfRows - numOfRemainEntries; @@ -458,8 +458,8 @@ int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePa tColModelAppend(pModel, pPage, data, numOfRows - remain, numOfWriteElems, numOfRows); if (pPage->num == pModel->capacity) { - if (tscFlushTmpBuffer(pMemoryBuf, pDesc, pPage, orderType) != TSDB_CODE_SUCCESS) { - return -1; + if ((code = tscFlushTmpBuffer(pMemoryBuf, pDesc, pPage, orderType)) != TSDB_CODE_SUCCESS) { + return code; } } else { pPage->num = numOfWriteElems; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 788d53e00a0763ed09de4b6b7815adf3d8f735c3..6164c005b94aa4a5a6a20f5dd08f25f382d053a2 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -2622,7 +2622,7 @@ static int32_t doExtractColumnFilterInfo(SQueryInfo* pQueryInfo, SColumnFilterIn tVariantDump(&pRight->val, (char*)&pColumnFilter->upperBndd, colType, false); } else { // TK_GT,TK_GE,TK_EQ,TK_NE are based on the pColumn->lowerBndd if (colType == TSDB_DATA_TYPE_BINARY) { - pColumnFilter->pz = (int64_t)calloc(1, pRight->val.nLen + 1); + pColumnFilter->pz = (int64_t)calloc(1, pRight->val.nLen + TSDB_NCHAR_SIZE); pColumnFilter->len = pRight->val.nLen; tVariantDump(&pRight->val, (char*)pColumnFilter->pz, colType, false); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index e15a5e618ad0dbd3aac1751fa543c50692443f6c..1384dcecff04b197dae5bd8209cd767c2fd0d7cf 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -198,6 +198,8 @@ int tscSendMsgToServer(SSqlObj *pSql) { }; pSql->SRpcReqContext = rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg); + assert(pSql->SRpcReqContext != NULL); + return TSDB_CODE_SUCCESS; } @@ -412,7 +414,6 @@ void tscKillSTableQuery(SSqlObj *pSql) { for (int i = 0; i < pSql->numOfSubs; ++i) { SSqlObj *pSub = pSql->pSubs[i]; - if (pSub == NULL) { continue; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 0be7db435d679eb3ce8452975c59f754fd2e5cbf..6d4337e03276bdb51b5c93031fc2e2c52246b9b5 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -607,7 +607,9 @@ int* taos_fetch_lengths(TAOS_RES *res) { char *taos_get_client_info() { return version; } void taos_stop_query(TAOS_RES *res) { - if (res == NULL) return; + if (res == NULL) { + return; + } SSqlObj *pSql = (SSqlObj *)res; SSqlCmd *pCmd = &pSql->cmd; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 42a5d1d09bc16fba0cda1865cee5cbfbdbf4138c..a3e655a9711fe0df8a8bac95846a0869d2b1a6ca 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1492,7 +1492,7 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES tscError("sub:%p failed to flush data to disk:reason:%s", tres, lpMsgBuf); LocalFree(lpMsgBuf); #else - tscError("sub:%p failed to flush data to disk:reason:%s", tres, strerror(errno)); + tscError("sub:%p failed to flush data to disk, reason:%s", tres, tstrerror(code)); #endif SSqlObj* pParentSql = trsupport->pParentSqlObj; @@ -1501,7 +1501,6 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; pthread_mutex_unlock(&trsupport->queryMutex); - tscHandleSubqueryError(trsupport, tres, pParentSql->res.code); } @@ -1630,10 +1629,9 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p // each result for a vnode is ordered as an independant list, // then used as an input of loser tree for disk-based merge routine - int32_t ret = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, - pQueryInfo->groupbyExpr.orderType); - if (ret != 0) { // set no disk space error info, and abort retry - return tscAbortFurtherRetryRetrieval(trsupport, pSql, TSDB_CODE_TSC_NO_DISKSPACE); + int32_t code = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pQueryInfo->groupbyExpr.orderType); + if (code != 0) { // set no disk space error info, and abort retry + return tscAbortFurtherRetryRetrieval(trsupport, pSql, code); } int32_t remain = -1; @@ -1704,7 +1702,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR tscTrace("%p sub:%p retrieve numOfRows:%" PRId64 " totalNumOfRows:%" PRIu64 " from ip:%s, orderOfSub:%d", pPObj, pSql, pRes->numOfRows, pState->numOfRetrievedRows, pSql->ipList.fqdn[pSql->ipList.inUse], idx); - + if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) { tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64, pPObj, pSql, tsMaxNumOfOrderedResults, num); @@ -1729,7 +1727,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data, pRes->numOfRows, pQueryInfo->groupbyExpr.orderType); - if (ret < 0) { // set no disk space error info, and abort retry + if (ret != 0) { // set no disk space error info, and abort retry tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_NO_DISKSPACE); pthread_mutex_unlock(&trsupport->queryMutex); @@ -1988,6 +1986,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) { SColumnIndex* pIndex = &pRes->pColumnIndex[i]; SSqlRes *pRes1 = &pSql->pSubs[pIndex->tableIndex]->res; pRes->tsrow[i] = pRes1->tsrow[pIndex->columnIndex]; + pRes->length[i] = pRes1->length[pIndex->columnIndex]; } pRes->numOfClauseTotal++; diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index f2ba5201ba0c3b4d454dc9de047ccd0ceb4a6884..e28c691a8986fce0bcb7a04b8bb1e5f7f5c6605c 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -276,6 +276,8 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { st = taosGetTimestampUs(); TAOS_RES* pSql = taos_query(con, command); + result = pSql; // set it into the global variable + if (taos_errno(pSql)) { taos_error(pSql); return; @@ -284,7 +286,8 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { if (regex_match(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) { fprintf(stdout, "Database changed.\n\n"); fflush(stdout); - + + result = NULL; taos_free_result(pSql); return; } @@ -294,6 +297,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { int error_no = 0; int numOfRows = shellDumpResult(pSql, fname, &error_no, printMode); if (numOfRows < 0) { + result = NULL; taos_free_result(pSql); return; } @@ -315,7 +319,8 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { if (fname != NULL) { wordfree(&full_path); } - + + result = NULL; taos_free_result(pSql); } @@ -419,8 +424,8 @@ static void dumpFieldToFile(FILE* fp, const char* val, TAOS_FIELD* field, int32_ } } -static int dumpResultToFile(const char* fname, TAOS_RES* result) { - TAOS_ROW row = taos_fetch_row(result); +static int dumpResultToFile(const char* fname, TAOS_RES* tres) { + TAOS_ROW row = taos_fetch_row(tres); if (row == NULL) { return 0; } @@ -441,9 +446,9 @@ static int dumpResultToFile(const char* fname, TAOS_RES* result) { wordfree(&full_path); - int num_fields = taos_num_fields(result); - TAOS_FIELD *fields = taos_fetch_fields(result); - int precision = taos_result_precision(result); + int num_fields = taos_num_fields(tres); + TAOS_FIELD *fields = taos_fetch_fields(tres); + int precision = taos_result_precision(tres); for (int col = 0; col < num_fields; col++) { if (col > 0) { @@ -455,7 +460,7 @@ static int dumpResultToFile(const char* fname, TAOS_RES* result) { int numOfRows = 0; do { - int32_t* length = taos_fetch_lengths(result); + int32_t* length = taos_fetch_lengths(tres); for (int i = 0; i < num_fields; i++) { if (i > 0) { fputc(',', fp); @@ -465,10 +470,13 @@ static int dumpResultToFile(const char* fname, TAOS_RES* result) { fputc('\n', fp); numOfRows++; - row = taos_fetch_row(result); + row = taos_fetch_row(tres); } while( row != NULL); + result = NULL; + taos_free_result(tres); fclose(fp); + return numOfRows; } @@ -769,8 +777,7 @@ void write_history() { void taos_error(TAOS_RES *tres) { fprintf(stderr, "\nDB error: %s\n", taos_errstr(tres)); - - /* free local resouce: allocated memory/metric-meta refcnt */ + result = NULL; taos_free_result(tres); } @@ -845,9 +852,9 @@ void shellGetGrantInfo(void *con) { char sql[] = "show grants"; - result = taos_query(con, sql); + TAOS_RES* tres = taos_query(con, sql); - int code = taos_errno(result); + int code = taos_errno(tres); if (code != TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_COM_OPS_NOT_SUPPORT) { fprintf(stdout, "Server is Community Edition, version is %s\n\n", taos_get_server_info(con)); @@ -857,18 +864,18 @@ void shellGetGrantInfo(void *con) { return; } - int num_fields = taos_field_count(result); + int num_fields = taos_field_count(tres); if (num_fields == 0) { fprintf(stderr, "\nInvalid grant information.\n"); exit(0); } else { - if (result == NULL) { + if (tres == NULL) { fprintf(stderr, "\nGrant information is null.\n"); exit(0); } - TAOS_FIELD *fields = taos_fetch_fields(result); - TAOS_ROW row = taos_fetch_row(result); + TAOS_FIELD *fields = taos_fetch_fields(tres); + TAOS_ROW row = taos_fetch_row(tres); if (row == NULL) { fprintf(stderr, "\nFailed to get grant information from server. Abort.\n"); exit(0); @@ -888,8 +895,8 @@ void shellGetGrantInfo(void *con) { fprintf(stdout, "Server is Enterprise %s Edition, version is %s and will expire at %s.\n", serverVersion, taos_get_server_info(con), expiretime); } - taos_free_result(result); result = NULL; + taos_free_result(tres); } fprintf(stdout, "\n"); diff --git a/src/kit/shell/src/shellMain.c b/src/kit/shell/src/shellMain.c index 92474bdd03f56e6b5b0e0bde17a94a10606ac8d4..8481f498dd395cd449192d625d307ae812acaa30 100644 --- a/src/kit/shell/src/shellMain.c +++ b/src/kit/shell/src/shellMain.c @@ -15,7 +15,6 @@ #include "os.h" #include "shell.h" -#include "tsclient.h" pthread_t pid; @@ -23,14 +22,6 @@ pthread_t pid; void interruptHandler(int signum) { #ifdef LINUX taos_stop_query(result); - if (result != NULL) { - /* - * we need to free result in async model, in order to avoid free - * results while the master thread is waiting for server response. - */ - tscQueueAsyncFreeResult(result); - } - result = NULL; #else printf("\nReceive ctrl+c or other signal, quit shell.\n"); diff --git a/src/kit/taosdump/taosdump.c b/src/kit/taosdump/taosdump.c index 0b881864f2253de5caeebbe501c9be440d5b262d..7fa7ffa3a949176703b850c16b70e45c01454cdd 100644 --- a/src/kit/taosdump/taosdump.c +++ b/src/kit/taosdump/taosdump.c @@ -295,7 +295,6 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { static struct argp argp = {options, parse_opt, args_doc, doc}; TAOS *taos = NULL; -TAOS_RES *result = NULL; char *command = NULL; char *lcommand = NULL; char *buffer = NULL; @@ -465,10 +464,10 @@ int taosDumpOut(SDumpArguments *arguments) { taosDumpCharset(fp); sprintf(command, "show databases"); - result = taos_query(taos, command); + TAOS_RES* result = taos_query(taos, command); int32_t code = taos_errno(result); if (code != 0) { - fprintf(stderr, "failed to run command: %s, reason: %s\n", command, taos_errstr(taos)); + fprintf(stderr, "failed to run command: %s, reason: %s\n", command, taos_errstr(result)); taos_free_result(result); goto _exit_failure; } @@ -615,7 +614,7 @@ int taosDumpDb(SDbInfo *dbInfo, SDumpArguments *arguments, FILE *fp) { fprintf(fp, "USE %s\n\n", dbInfo->name); sprintf(command, "show tables"); - result = taos_query(taos,command); + TAOS_RES* result = taos_query(taos,command); int32_t code = taos_errno(result); if (code != 0) { fprintf(stderr, "failed to run command %s, error: %s\n", command, taos_errstr(result)); @@ -719,7 +718,7 @@ void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols sprintf(command, "select %s from %s limit 1", tableDes->cols[counter].field, tableDes->name); - result = taos_query(taos, command); + TAOS_RES* result = taos_query(taos, command); int32_t code = taos_errno(result); if (code != 0) { fprintf(stderr, "failed to run command %s, error: %s\n", command, taos_errstr(result)); @@ -797,7 +796,7 @@ int taosGetTableDes(char *table, STableDef *tableDes) { sprintf(command, "describe %s", table); - result = taos_query(taos, command); + TAOS_RES* result = taos_query(taos, command); int32_t code = taos_errno(result); if (code != 0) { fprintf(stderr, "failed to run command %s, error: %s\n", command, taos_errstr(result)); @@ -877,7 +876,7 @@ int32_t taosDumpMetric(char *metric, SDumpArguments *arguments, FILE *fp) { tstrncpy(tableRecord.metric, metric, TSDB_TABLE_NAME_LEN); sprintf(command, "select tbname from %s", metric); - result = taos_query(taos, command); + TAOS_RES* result = taos_query(taos, command); int32_t code = taos_errno(result); if (code != 0) { fprintf(stderr, "failed to run command %s, error: %s\n", command, taos_errstr(result)); @@ -930,7 +929,7 @@ int taosDumpTableData(FILE *fp, char *tbname, SDumpArguments *arguments) { sprintf(command, "select * from %s where _c0 >= %" PRId64 " and _c0 <= %" PRId64 " order by _c0 asc", tbname, arguments->start_time, arguments->end_time); - result = taos_query(taos, command); + TAOS_RES* result = taos_query(taos, command); int32_t code = taos_errno(result); if (code != 0) { fprintf(stderr, "failed to run command %s, reason: %s\n", command, taos_errstr(result)); @@ -1179,9 +1178,13 @@ int taosDumpIn(SDumpArguments *arguments) { tcommand = command; } taosReplaceCtrlChar(tcommand); - if (taos_query(taos, tcommand) == NULL) + + TAOS_RES* result = taos_query(taos, tcommand); + if (taos_errno(result) != 0){ fprintf(stderr, "linenu: %" PRId64 " failed to run command %s reason:%s \ncontinue...\n", linenu, command, - taos_errstr(taos)); + taos_errstr(result)); + taos_free_result(result); + } pstr = command; pstr[0] = '\0'; @@ -1227,12 +1230,12 @@ int taosDumpIn(SDumpArguments *arguments) { tcommand = command; } taosReplaceCtrlChar(tcommand); - result = taos_query(taos, tcommand); + TAOS_RES* result = taos_query(taos, tcommand); int32_t code = taos_errno(result); if (code != 0) { fprintf(stderr, "linenu:%" PRId64 " failed to run command %s reason: %s \ncontinue...\n", linenu, command, - taos_errstr(taos)); + taos_errstr(result)); } taos_free_result(result); } diff --git a/src/query/inc/qextbuffer.h b/src/query/inc/qextbuffer.h index 23b67083e593f5eab88b11f1f960d6af7138f3e9..2cbef2b1bea66654b262a3fb37061477969960f2 100644 --- a/src/query/inc/qextbuffer.h +++ b/src/query/inc/qextbuffer.h @@ -149,7 +149,7 @@ int16_t tExtMemBufferPut(tExtMemBuffer *pMemBuffer, void *data, int32_t numOfRow * @param pMemBuffer * @return */ -bool tExtMemBufferFlush(tExtMemBuffer *pMemBuffer); +int32_t tExtMemBufferFlush(tExtMemBuffer *pMemBuffer); /** * diff --git a/src/query/src/qextbuffer.c b/src/query/src/qextbuffer.c index e0a90e04082706eb720960e88356df466dbd017f..35f5e22ed51b9261fa621520d5757a777e9973c8 100644 --- a/src/query/src/qextbuffer.c +++ b/src/query/src/qextbuffer.c @@ -245,30 +245,29 @@ static void tExtMemBufferClearFlushoutInfo(tExtMemBuffer *pMemBuffer) { memset(pFileMeta->flushoutData.pFlushoutInfo, 0, sizeof(tFlushoutInfo) * pFileMeta->flushoutData.nAllocSize); } -bool tExtMemBufferFlush(tExtMemBuffer *pMemBuffer) { +int32_t tExtMemBufferFlush(tExtMemBuffer *pMemBuffer) { + int32_t ret = 0; if (pMemBuffer->numOfTotalElems == 0) { - return true; + return ret; } if (pMemBuffer->file == NULL) { if ((pMemBuffer->file = fopen(pMemBuffer->path, "wb+")) == NULL) { - return false; + ret = TAOS_SYSTEM_ERROR(errno); + return ret; } } /* all data has been flushed to disk, ignore flush operation */ if (pMemBuffer->numOfElemsInBuffer == 0) { - return true; + return ret; } - bool ret = true; - tFilePagesItem *first = pMemBuffer->pHead; - while (first != NULL) { size_t retVal = fwrite((char *)&(first->item), pMemBuffer->pageSize, 1, pMemBuffer->file); if (retVal <= 0) { // failed to write to buffer, may be not enough space - ret = false; + ret = TAOS_SYSTEM_ERROR(errno); } pMemBuffer->fileMeta.numOfElemsInFile += first->item.num;