提交 f6d56d84 编写于 作者: H hjxilinx

fix bug in issue #984. [tbase-1384]

上级 f66963ca
...@@ -66,19 +66,24 @@ typedef struct SJoinSubquerySupporter { ...@@ -66,19 +66,24 @@ typedef struct SJoinSubquerySupporter {
char path[PATH_MAX]; // temporary file path char path[PATH_MAX]; // temporary file path
} SJoinSubquerySupporter; } SJoinSubquerySupporter;
void tscDestroyDataBlock(STableDataBlocks* pDataBlock); int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name,
STableDataBlocks* tscCreateDataBlock(size_t initialBufSize, int32_t rowSize, int32_t startOffset, const char* name); STableDataBlocks** dataBlocks);
void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks); void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks);
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes, void tscDestroyDataBlock(STableDataBlocks* pDataBlock);
uint32_t offset);
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes,
SDataBlockList* tscCreateBlockArrayList(); uint32_t offset);
void* tscDestroyBlockArrayList(SDataBlockList* pList);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock); SDataBlockList* tscCreateBlockArrayList();
void tscFreeUnusedDataBlocks(SDataBlockList* pList);
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList); void* tscDestroyBlockArrayList(SDataBlockList* pList);
STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size, int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
int32_t startOffset, int32_t rowSize, const char* tableId); void tscFreeUnusedDataBlocks(SDataBlockList* pList);
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList);
int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, const char* tableId,
STableDataBlocks** dataBlocks);
SVnodeSidList* tscGetVnodeSidList(SMetricMeta* pMetricmeta, int32_t vnodeIdx); SVnodeSidList* tscGetVnodeSidList(SMetricMeta* pMetricmeta, int32_t vnodeIdx);
SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx); SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
......
...@@ -650,10 +650,13 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char ...@@ -650,10 +650,13 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
SMeterMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta; SMeterMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta;
STableDataBlocks *dataBuf = STableDataBlocks *dataBuf = NULL;
tscGetDataBlockFromList(pTableHashList, pCmd->pDataBlocks, pMeterMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE, int32_t ret = tscGetDataBlockFromList(pTableHashList, pCmd->pDataBlocks, pMeterMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SShellSubmitBlock), pMeterMeta->rowSize, pMeterMetaInfo->name); sizeof(SShellSubmitBlock), pMeterMeta->rowSize, pMeterMetaInfo->name, &dataBuf);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
int32_t maxNumOfRows = tscAllocateMemIfNeed(dataBuf, pMeterMeta->rowSize); int32_t maxNumOfRows = tscAllocateMemIfNeed(dataBuf, pMeterMeta->rowSize);
if (0 == maxNumOfRows) { if (0 == maxNumOfRows) {
return TSDB_CODE_CLI_OUT_OF_MEMORY; return TSDB_CODE_CLI_OUT_OF_MEMORY;
...@@ -1059,9 +1062,13 @@ int doParserInsertSql(SSqlObj *pSql, char *str) { ...@@ -1059,9 +1062,13 @@ int doParserInsertSql(SSqlObj *pSql, char *str) {
strcpy(fname, full_path.we_wordv[0]); strcpy(fname, full_path.we_wordv[0]);
wordfree(&full_path); wordfree(&full_path);
STableDataBlocks *pDataBlock = tscCreateDataBlock(PATH_MAX, pMeterMetaInfo->pMeterMeta->rowSize, STableDataBlocks *pDataBlock = NULL;
sizeof(SShellSubmitBlock), pMeterMetaInfo->name); int32_t ret = tscCreateDataBlock(PATH_MAX, pMeterMetaInfo->pMeterMeta->rowSize, sizeof(SShellSubmitBlock),
pMeterMetaInfo->name, &pDataBlock);
if (ret != TSDB_CODE_SUCCESS) {
goto _error_clean;
}
tscAppendDataBlock(pCmd->pDataBlocks, pDataBlock); tscAppendDataBlock(pCmd->pDataBlocks, pDataBlock);
strcpy(pDataBlock->filename, fname); strcpy(pDataBlock->filename, fname);
} else if (sToken.type == TK_LP) { } else if (sToken.type == TK_LP) {
...@@ -1296,9 +1303,13 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { ...@@ -1296,9 +1303,13 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
int32_t rowSize = pMeterMeta->rowSize; int32_t rowSize = pMeterMeta->rowSize;
pCmd->pDataBlocks = tscCreateBlockArrayList(); pCmd->pDataBlocks = tscCreateBlockArrayList();
STableDataBlocks *pTableDataBlock = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, pMeterMeta->rowSize, STableDataBlocks *pTableDataBlock = NULL;
sizeof(SShellSubmitBlock), pMeterMetaInfo->name); int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, pMeterMeta->rowSize, sizeof(SShellSubmitBlock),
pMeterMetaInfo->name, &pTableDataBlock);
if (ret != TSDB_CODE_SUCCESS) {
return -1;
}
tscAppendDataBlock(pCmd->pDataBlocks, pTableDataBlock); tscAppendDataBlock(pCmd->pDataBlocks, pTableDataBlock);
maxRows = tscAllocateMemIfNeed(pTableDataBlock, rowSize); maxRows = tscAllocateMemIfNeed(pTableDataBlock, rowSize);
......
...@@ -22,8 +22,8 @@ ...@@ -22,8 +22,8 @@
#include "tscJoinProcess.h" #include "tscJoinProcess.h"
#include "tscProfile.h" #include "tscProfile.h"
#include "tscSecondaryMerge.h" #include "tscSecondaryMerge.h"
#include "tscUtil.h"
#include "tschemautil.h" #include "tschemautil.h"
#include "tscUtil.h"
#include "tsclient.h" #include "tsclient.h"
#include "tsqldef.h" #include "tsqldef.h"
#include "ttimer.h" #include "ttimer.h"
...@@ -60,21 +60,20 @@ void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, char* str, uint64_t uid) { ...@@ -60,21 +60,20 @@ void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, char* str, uint64_t uid) {
} }
// estimate the buffer size // estimate the buffer size
size_t tbnameCondLen = pTagCond->tbnameCond.cond != NULL? strlen(pTagCond->tbnameCond.cond):0; size_t tbnameCondLen = pTagCond->tbnameCond.cond != NULL ? strlen(pTagCond->tbnameCond.cond) : 0;
size_t redundantLen = 20; size_t redundantLen = 20;
size_t bufSize = strlen(pMeterMetaInfo->name) + tbnameCondLen + strlen(join) + strlen(tagIdBuf); size_t bufSize = strlen(pMeterMetaInfo->name) + tbnameCondLen + strlen(join) + strlen(tagIdBuf);
if (cond != NULL) { if (cond != NULL) {
bufSize += strlen(cond->cond); bufSize += strlen(cond->cond);
} }
bufSize = (size_t) ((bufSize + redundantLen) * 1.5); bufSize = (size_t)((bufSize + redundantLen) * 1.5);
char* tmp = calloc(1, bufSize); char* tmp = calloc(1, bufSize);
int32_t keyLen = snprintf(tmp, bufSize, "%s,%s,%s,%d,%s,[%s],%d", pMeterMetaInfo->name, int32_t keyLen = snprintf(tmp, bufSize, "%s,%s,%s,%d,%s,[%s],%d", pMeterMetaInfo->name,
(cond != NULL ? cond->cond : NULL), (cond != NULL ? cond->cond : NULL), (tbnameCondLen > 0 ? pTagCond->tbnameCond.cond : NULL),
(tbnameCondLen > 0 ? pTagCond->tbnameCond.cond : NULL), pTagCond->relType, join, tagIdBuf, pCmd->groupbyExpr.orderType);
pTagCond->relType, join, tagIdBuf, pCmd->groupbyExpr.orderType);
assert(keyLen <= bufSize); assert(keyLen <= bufSize);
...@@ -84,7 +83,7 @@ void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, char* str, uint64_t uid) { ...@@ -84,7 +83,7 @@ void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, char* str, uint64_t uid) {
MD5_CTX ctx; MD5_CTX ctx;
MD5Init(&ctx); MD5Init(&ctx);
MD5Update(&ctx, (uint8_t*) tmp, keyLen); MD5Update(&ctx, (uint8_t*)tmp, keyLen);
char* pStr = base64_encode(ctx.digest, tListLen(ctx.digest)); char* pStr = base64_encode(ctx.digest, tListLen(ctx.digest));
strcpy(str, pStr); strcpy(str, pStr);
} }
...@@ -242,11 +241,11 @@ bool tscProjectionQueryOnMetric(SSqlCmd* pCmd) { ...@@ -242,11 +241,11 @@ bool tscProjectionQueryOnMetric(SSqlCmd* pCmd) {
return false; return false;
} }
//for project query, only the following two function is allowed // for project query, only the following two function is allowed
for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) { for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) {
int32_t functionId = tscSqlExprGet(pCmd, i)->functionId; int32_t functionId = tscSqlExprGet(pCmd, i)->functionId;
if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TAGPRJ && if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TAG &&
functionId != TSDB_FUNC_TAG && functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_ARITHM) { functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_ARITHM) {
return false; return false;
} }
} }
...@@ -261,7 +260,7 @@ bool tscProjectionQueryOnTable(SSqlCmd* pCmd) { ...@@ -261,7 +260,7 @@ bool tscProjectionQueryOnTable(SSqlCmd* pCmd) {
return false; return false;
} }
} }
return true; return true;
} }
...@@ -458,9 +457,9 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock) { ...@@ -458,9 +457,9 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock) {
tfree(pDataBlock->pData); tfree(pDataBlock->pData);
tfree(pDataBlock->params); tfree(pDataBlock->params);
// free the refcount for metermeta // free the refcount for metermeta
taosRemoveDataFromCache(tscCacheHandle, (void**) &(pDataBlock->pMeterMeta), false); taosRemoveDataFromCache(tscCacheHandle, (void**)&(pDataBlock->pMeterMeta), false);
tfree(pDataBlock); tfree(pDataBlock);
} }
...@@ -533,23 +532,23 @@ void* tscDestroyBlockArrayList(SDataBlockList* pList) { ...@@ -533,23 +532,23 @@ void* tscDestroyBlockArrayList(SDataBlockList* pList) {
} }
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
assert(pDataBlock->pMeterMeta != NULL); assert(pDataBlock->pMeterMeta != NULL);
pCmd->count = pDataBlock->numOfMeters; pCmd->count = pDataBlock->numOfMeters;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
//set the correct metermeta object, the metermeta has been locked in pDataBlocks, so it must be in the cache // set the correct metermeta object, the metermeta has been locked in pDataBlocks, so it must be in the cache
if (pMeterMetaInfo->pMeterMeta != pDataBlock->pMeterMeta) { if (pMeterMetaInfo->pMeterMeta != pDataBlock->pMeterMeta) {
strcpy(pMeterMetaInfo->name, pDataBlock->meterId); strcpy(pMeterMetaInfo->name, pDataBlock->meterId);
taosRemoveDataFromCache(tscCacheHandle, (void**) &(pMeterMetaInfo->pMeterMeta), false); taosRemoveDataFromCache(tscCacheHandle, (void**)&(pMeterMetaInfo->pMeterMeta), false);
pMeterMetaInfo->pMeterMeta = pDataBlock->pMeterMeta; pMeterMetaInfo->pMeterMeta = pDataBlock->pMeterMeta;
pDataBlock->pMeterMeta = NULL; // delegate the ownership of metermeta to pMeterMetaInfo pDataBlock->pMeterMeta = NULL; // delegate the ownership of metermeta to pMeterMetaInfo
} else { } else {
assert(strncmp(pMeterMetaInfo->name, pDataBlock->meterId, tListLen(pDataBlock->meterId)) == 0); assert(strncmp(pMeterMetaInfo->name, pDataBlock->meterId, tListLen(pDataBlock->meterId)) == 0);
} }
/* /*
* the submit message consists of : [RPC header|message body|digest] * the submit message consists of : [RPC header|message body|digest]
* the dataBlock only includes the RPC Header buffer and actual submit messsage body, space for digest needs * the dataBlock only includes the RPC Header buffer and actual submit messsage body, space for digest needs
...@@ -559,15 +558,15 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { ...@@ -559,15 +558,15 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
if (TSDB_CODE_SUCCESS != ret) { if (TSDB_CODE_SUCCESS != ret) {
return ret; return ret;
} }
memcpy(pCmd->payload, pDataBlock->pData, pDataBlock->nAllocSize); memcpy(pCmd->payload, pDataBlock->pData, pDataBlock->nAllocSize);
/* /*
* the payloadLen should be actual message body size * the payloadLen should be actual message body size
* the old value of payloadLen is the allocated payload size * the old value of payloadLen is the allocated payload size
*/ */
pCmd->payloadLen = pDataBlock->nAllocSize - tsRpcHeadSize; pCmd->payloadLen = pDataBlock->nAllocSize - tsRpcHeadSize;
assert(pCmd->allocSize >= pCmd->payloadLen + tsRpcHeadSize + sizeof(STaosDigest)); assert(pCmd->allocSize >= pCmd->payloadLen + tsRpcHeadSize + sizeof(STaosDigest));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -587,13 +586,18 @@ void tscFreeUnusedDataBlocks(SDataBlockList* pList) { ...@@ -587,13 +586,18 @@ void tscFreeUnusedDataBlocks(SDataBlockList* pList) {
* @param rowSize * @param rowSize
* @param startOffset * @param startOffset
* @param name * @param name
* @param pMeterMeta the ownership of pMeterMeta should be transfer to STableDataBlocks * @param dataBlocks
* @return * @return
*/ */
STableDataBlocks* tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name) { int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name,
STableDataBlocks** dataBlocks) {
STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks)); STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks));
dataBuf->nAllocSize = (uint32_t) initialSize; if (dataBuf == NULL) {
tscError("failed to allocated memory, reason:%s", strerror(errno));
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
dataBuf->nAllocSize = (uint32_t)initialSize;
dataBuf->pData = calloc(1, dataBuf->nAllocSize); dataBuf->pData = calloc(1, dataBuf->nAllocSize);
dataBuf->ordered = true; dataBuf->ordered = true;
dataBuf->prevTS = INT64_MIN; dataBuf->prevTS = INT64_MIN;
...@@ -603,30 +607,44 @@ STableDataBlocks* tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_ ...@@ -603,30 +607,44 @@ STableDataBlocks* tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_
dataBuf->tsSource = -1; dataBuf->tsSource = -1;
strncpy(dataBuf->meterId, name, TSDB_METER_ID_LEN); strncpy(dataBuf->meterId, name, TSDB_METER_ID_LEN);
// sure that the metermeta must be in the local client cache /*
* The metermeta may be released since the metermeta cache are completed clean by other thread
* due to operation such as drop database.
*/
dataBuf->pMeterMeta = taosGetDataFromCache(tscCacheHandle, dataBuf->meterId); dataBuf->pMeterMeta = taosGetDataFromCache(tscCacheHandle, dataBuf->meterId);
assert(dataBuf->pMeterMeta != NULL && initialSize > 0); assert(initialSize > 0);
return dataBuf; if (dataBuf->pMeterMeta == NULL) {
tfree(dataBuf);
return TSDB_CODE_QUERY_CACHE_ERASED;
} else {
*dataBlocks = dataBuf;
return TSDB_CODE_SUCCESS;
}
} }
STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size, int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, const char* tableId) { int32_t startOffset, int32_t rowSize, const char* tableId,
STableDataBlocks* dataBuf = NULL; STableDataBlocks** dataBlocks) {
*dataBlocks = NULL;
STableDataBlocks** t1 = (STableDataBlocks**)taosGetIntHashData(pHashList, id); STableDataBlocks** t1 = (STableDataBlocks**) taosGetIntHashData(pHashList, id);
if (t1 != NULL) { if (t1 != NULL) {
dataBuf = *t1; *dataBlocks = *t1;
} }
if (dataBuf == NULL) { if (*dataBlocks == NULL) {
dataBuf = tscCreateDataBlock((size_t)size, rowSize, startOffset, tableId); int32_t ret = tscCreateDataBlock((size_t) size, rowSize, startOffset, tableId, dataBlocks);
dataBuf = *(STableDataBlocks**)taosAddIntHash(pHashList, id, (char*)&dataBuf); if (ret != TSDB_CODE_SUCCESS) {
tscAppendDataBlock(pDataBlockList, dataBuf); return ret;
}
*dataBlocks = *(STableDataBlocks**)taosAddIntHash(pHashList, id, (char*)(*dataBlocks));
tscAppendDataBlock(pDataBlockList, *dataBlocks);
} }
return dataBuf; return TSDB_CODE_SUCCESS;
} }
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockList) { int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockList) {
...@@ -638,9 +656,15 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi ...@@ -638,9 +656,15 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
for (int32_t i = 0; i < pTableDataBlockList->nSize; ++i) { for (int32_t i = 0; i < pTableDataBlockList->nSize; ++i) {
STableDataBlocks* pOneTableBlock = pTableDataBlockList->pData[i]; STableDataBlocks* pOneTableBlock = pTableDataBlockList->pData[i];
STableDataBlocks* dataBuf = STableDataBlocks* dataBuf = NULL;
tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgid, TSDB_PAYLOAD_SIZE, int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgid,
tsInsertHeadSize, 0, pOneTableBlock->meterId); TSDB_PAYLOAD_SIZE, tsInsertHeadSize, 0, pOneTableBlock->meterId, &dataBuf);
if (ret != TSDB_CODE_SUCCESS) {
tscError("%p failed to allocate the data buffer block for merging table data", pSql);
tscDestroyBlockArrayList(pTableDataBlockList);
return ret;
}
int64_t destSize = dataBuf->size + pOneTableBlock->size; int64_t destSize = dataBuf->size + pOneTableBlock->size;
if (dataBuf->nAllocSize < destSize) { if (dataBuf->nAllocSize < destSize) {
...@@ -705,7 +729,7 @@ void tscCloseTscObj(STscObj* pObj) { ...@@ -705,7 +729,7 @@ void tscCloseTscObj(STscObj* pObj) {
} }
bool tscIsInsertOrImportData(char* sqlstr) { bool tscIsInsertOrImportData(char* sqlstr) {
int32_t index = 0; int32_t index = 0;
SSQLToken t0 = tStrGetToken(sqlstr, &index, false, 0, NULL); SSQLToken t0 = tStrGetToken(sqlstr, &index, false, 0, NULL);
return t0.type == TK_INSERT || t0.type == TK_IMPORT; return t0.type == TK_INSERT || t0.type == TK_IMPORT;
} }
...@@ -946,13 +970,13 @@ static void _exprEvic(SSqlExprInfo* pExprInfo, int32_t index) { ...@@ -946,13 +970,13 @@ static void _exprEvic(SSqlExprInfo* pExprInfo, int32_t index) {
SSqlExpr* tscSqlExprInsertEmpty(SSqlCmd* pCmd, int32_t index, int16_t functionId) { SSqlExpr* tscSqlExprInsertEmpty(SSqlCmd* pCmd, int32_t index, int16_t functionId) {
SSqlExprInfo* pExprInfo = &pCmd->exprsInfo; SSqlExprInfo* pExprInfo = &pCmd->exprsInfo;
_exprCheckSpace(pExprInfo, pExprInfo->numOfExprs + 1); _exprCheckSpace(pExprInfo, pExprInfo->numOfExprs + 1);
_exprEvic(pExprInfo, index); _exprEvic(pExprInfo, index);
SSqlExpr* pExpr = &pExprInfo->pExprs[index]; SSqlExpr* pExpr = &pExprInfo->pExprs[index];
pExpr->functionId = functionId; pExpr->functionId = functionId;
pExprInfo->numOfExprs++; pExprInfo->numOfExprs++;
return pExpr; return pExpr;
} }
...@@ -1155,7 +1179,7 @@ SColumnBase* tscColumnBaseInfoInsert(SSqlCmd* pCmd, SColumnIndex* pColIndex) { ...@@ -1155,7 +1179,7 @@ SColumnBase* tscColumnBaseInfoInsert(SSqlCmd* pCmd, SColumnIndex* pColIndex) {
} }
void tscColumnFilterInfoCopy(SColumnFilterInfo* dst, const SColumnFilterInfo* src) { void tscColumnFilterInfoCopy(SColumnFilterInfo* dst, const SColumnFilterInfo* src) {
assert (src != NULL && dst != NULL); assert(src != NULL && dst != NULL);
assert(src->filterOnBinary == 0 || src->filterOnBinary == 1); assert(src->filterOnBinary == 0 || src->filterOnBinary == 1);
if (src->lowerRelOptr == TSDB_RELATION_INVALID && src->upperRelOptr == TSDB_RELATION_INVALID) { if (src->lowerRelOptr == TSDB_RELATION_INVALID && src->upperRelOptr == TSDB_RELATION_INVALID) {
...@@ -1164,15 +1188,15 @@ void tscColumnFilterInfoCopy(SColumnFilterInfo* dst, const SColumnFilterInfo* sr ...@@ -1164,15 +1188,15 @@ void tscColumnFilterInfoCopy(SColumnFilterInfo* dst, const SColumnFilterInfo* sr
*dst = *src; *dst = *src;
if (dst->filterOnBinary) { if (dst->filterOnBinary) {
size_t len = (size_t) dst->len + 1; size_t len = (size_t)dst->len + 1;
char* pTmp = calloc(1, len); char* pTmp = calloc(1, len);
dst->pz = (int64_t) pTmp; dst->pz = (int64_t)pTmp;
memcpy((char*) dst->pz, (char*) src->pz, (size_t) len); memcpy((char*)dst->pz, (char*)src->pz, (size_t)len);
} }
} }
void tscColumnBaseCopy(SColumnBase* dst, const SColumnBase* src) { void tscColumnBaseCopy(SColumnBase* dst, const SColumnBase* src) {
assert (src != NULL && dst != NULL); assert(src != NULL && dst != NULL);
*dst = *src; *dst = *src;
...@@ -1230,7 +1254,7 @@ void tscColumnBaseInfoDestroy(SColumnBaseInfo* pColumnBaseInfo) { ...@@ -1230,7 +1254,7 @@ void tscColumnBaseInfoDestroy(SColumnBaseInfo* pColumnBaseInfo) {
assert(pColBase->filterInfo[j].filterOnBinary == 0 || pColBase->filterInfo[j].filterOnBinary == 1); assert(pColBase->filterInfo[j].filterOnBinary == 0 || pColBase->filterInfo[j].filterOnBinary == 1);
if (pColBase->filterInfo[j].filterOnBinary) { if (pColBase->filterInfo[j].filterOnBinary) {
free((char*) pColBase->filterInfo[j].pz); free((char*)pColBase->filterInfo[j].pz);
pColBase->filterInfo[j].pz = 0; pColBase->filterInfo[j].pz = 0;
} }
} }
...@@ -1390,7 +1414,7 @@ bool tscValidateColumnId(SSqlCmd* pCmd, int32_t colId) { ...@@ -1390,7 +1414,7 @@ bool tscValidateColumnId(SSqlCmd* pCmd, int32_t colId) {
void tscTagCondCopy(STagCond* dest, const STagCond* src) { void tscTagCondCopy(STagCond* dest, const STagCond* src) {
memset(dest, 0, sizeof(STagCond)); memset(dest, 0, sizeof(STagCond));
if (src->tbnameCond.cond != NULL) { if (src->tbnameCond.cond != NULL) {
dest->tbnameCond.cond = strdup(src->tbnameCond.cond); dest->tbnameCond.cond = strdup(src->tbnameCond.cond);
} }
...@@ -1403,7 +1427,7 @@ void tscTagCondCopy(STagCond* dest, const STagCond* src) { ...@@ -1403,7 +1427,7 @@ void tscTagCondCopy(STagCond* dest, const STagCond* src) {
if (src->cond[i].cond != NULL) { if (src->cond[i].cond != NULL) {
dest->cond[i].cond = strdup(src->cond[i].cond); dest->cond[i].cond = strdup(src->cond[i].cond);
} }
dest->cond[i].uid = src->cond[i].uid; dest->cond[i].uid = src->cond[i].uid;
} }
...@@ -1513,10 +1537,10 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) { ...@@ -1513,10 +1537,10 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) {
* data blocks have been submit to vnode. * data blocks have been submit to vnode.
*/ */
SDataBlockList* pDataBlocks = pCmd->pDataBlocks; SDataBlockList* pDataBlocks = pCmd->pDataBlocks;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0);
assert(pSql->cmd.numOfTables == 1); assert(pSql->cmd.numOfTables == 1);
if (pDataBlocks == NULL || pMeterMetaInfo->vnodeIndex >= pDataBlocks->nSize) { if (pDataBlocks == NULL || pMeterMetaInfo->vnodeIndex >= pDataBlocks->nSize) {
tscTrace("%p object should be release since all data blocks have been submit", pSql); tscTrace("%p object should be release since all data blocks have been submit", pSql);
return true; return true;
...@@ -1632,7 +1656,7 @@ void tscResetForNextRetrieve(SSqlRes* pRes) { ...@@ -1632,7 +1656,7 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
} }
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, SSqlObj* pPrevSql) { SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, SSqlObj* pPrevSql) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, tableIndex); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, tableIndex);
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
...@@ -1677,7 +1701,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -1677,7 +1701,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
} }
tscColumnBaseInfoCopy(&pNew->cmd.colList, &pCmd->colList, (int16_t)tableIndex); tscColumnBaseInfoCopy(&pNew->cmd.colList, &pCmd->colList, (int16_t)tableIndex);
// set the correct query type // set the correct query type
if (pPrevSql != NULL) { if (pPrevSql != NULL) {
pNew->cmd.type = pPrevSql->cmd.type; pNew->cmd.type = pPrevSql->cmd.type;
...@@ -1707,14 +1731,14 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -1707,14 +1731,14 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
pNew->fp = fp; pNew->fp = fp;
pNew->param = param; pNew->param = param;
char key[TSDB_MAX_TAGS_LEN + 1] = {0}; char key[TSDB_MAX_TAGS_LEN + 1] = {0};
tscGetMetricMetaCacheKey(pCmd, key, uid); tscGetMetricMetaCacheKey(pCmd, key, uid);
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
printf("the metricmeta key is:%s\n", key); printf("the metricmeta key is:%s\n", key);
#endif #endif
char* name = pMeterMetaInfo->name; char* name = pMeterMetaInfo->name;
SMeterMetaInfo* pFinalInfo = NULL; SMeterMetaInfo* pFinalInfo = NULL;
...@@ -1739,7 +1763,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -1739,7 +1763,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
} }
tscTrace("%p new subquery %p, tableIndex:%d, vnodeIdx:%d, type:%d", pSql, pNew, tableIndex, tscTrace("%p new subquery %p, tableIndex:%d, vnodeIdx:%d, type:%d", pSql, pNew, tableIndex,
pMeterMetaInfo->vnodeIndex, pNew->cmd.type); pMeterMetaInfo->vnodeIndex, pNew->cmd.type);
return pNew; return pNew;
} }
...@@ -1780,40 +1804,41 @@ bool tscIsUpdateQuery(STscObj* pObj) { ...@@ -1780,40 +1804,41 @@ bool tscIsUpdateQuery(STscObj* pObj) {
SSqlCmd* pCmd = &pObj->pSql->cmd; SSqlCmd* pCmd = &pObj->pSql->cmd;
return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) || return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) ||
TSDB_SQL_USE_DB == pCmd->command) ? 1 : 0; TSDB_SQL_USE_DB == pCmd->command)
? 1
: 0;
} }
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql) { int32_t tscInvalidSQLErrMsg(char* msg, const char* additionalInfo, const char* sql) {
const char *msgFormat1 = "invalid SQL: %s"; const char* msgFormat1 = "invalid SQL: %s";
const char *msgFormat2 = "invalid SQL: syntax error near \"%s\" (%s)"; const char* msgFormat2 = "invalid SQL: syntax error near \"%s\" (%s)";
const char *msgFormat3 = "invalid SQL: syntax error near \"%s\""; const char* msgFormat3 = "invalid SQL: syntax error near \"%s\"";
const int32_t BACKWARD_CHAR_STEP = 0; const int32_t BACKWARD_CHAR_STEP = 0;
if (sql == NULL) { if (sql == NULL) {
assert(additionalInfo != NULL); assert(additionalInfo != NULL);
sprintf(msg, msgFormat1, additionalInfo); sprintf(msg, msgFormat1, additionalInfo);
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
} }
char buf[64] = {0}; // only extract part of sql string char buf[64] = {0}; // only extract part of sql string
strncpy(buf, (sql - BACKWARD_CHAR_STEP), tListLen(buf) - 1); strncpy(buf, (sql - BACKWARD_CHAR_STEP), tListLen(buf) - 1);
if (additionalInfo != NULL) { if (additionalInfo != NULL) {
sprintf(msg, msgFormat2, buf, additionalInfo); sprintf(msg, msgFormat2, buf, additionalInfo);
} else { } else {
sprintf(msg, msgFormat3, buf); // no additional information for invalid sql error sprintf(msg, msgFormat3, buf); // no additional information for invalid sql error
} }
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
} }
bool tscHasReachLimitation(SSqlObj* pSql) { bool tscHasReachLimitation(SSqlObj* pSql) {
assert(pSql != NULL && pSql->cmd.globalLimit != 0); assert(pSql != NULL && pSql->cmd.globalLimit != 0);
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
return (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit); return (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit);
} }
...@@ -137,8 +137,9 @@ extern "C" { ...@@ -137,8 +137,9 @@ extern "C" {
#define TSDB_CODE_INVALID_VNODE_STATUS 116 #define TSDB_CODE_INVALID_VNODE_STATUS 116
#define TSDB_CODE_FAILED_TO_LOCK_RESOURCES 117 #define TSDB_CODE_FAILED_TO_LOCK_RESOURCES 117
#define TSDB_CODE_TABLE_ID_MISMATCH 118 #define TSDB_CODE_TABLE_ID_MISMATCH 118
#define TSDB_CODE_QUERY_CACHE_ERASED 119
#define TSDB_CODE_MAX_ERROR_CODE 119 #define TSDB_CODE_MAX_ERROR_CODE 120
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -242,5 +242,6 @@ char *tsError[] = {"success", ...@@ -242,5 +242,6 @@ char *tsError[] = {"success",
"invalid table id", // 115 "invalid table id", // 115
"invalid vnode status", "invalid vnode status",
"failed to lock resources", "failed to lock resources",
"table id/uid mismatch", // 118 "table id/uid mismatch",
"client query cache erased", // 119
}; };
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册