提交 9768a879 编写于 作者: H Hongze Cheng

Merge branch 'develop' into feature/2.0tsdb

...@@ -5420,7 +5420,7 @@ int32_t doLocalQueryProcess(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { ...@@ -5420,7 +5420,7 @@ int32_t doLocalQueryProcess(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize, tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize, false); tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize, tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize, false);
const char* name = (pExprList->a[0].aliasName != NULL)? pExprList->a[0].aliasName:functionsInfo[index].name; const char* name = (pExprList->a[0].aliasName != NULL)? pExprList->a[0].aliasName:functionsInfo[index].name;
strncpy(pExpr1->aliasName, name, tListLen(pExpr1->aliasName)); tstrncpy(pExpr1->aliasName, name, tListLen(pExpr1->aliasName));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -324,7 +324,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -324,7 +324,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
tfree(pReducer->discardData); tfree(pReducer->discardData);
tfree(pReducer->pResultBuf); tfree(pReducer->pResultBuf);
tfree(pReducer->pFinalRes); tfree(pReducer->pFinalRes);
// tfree(pReducer->pBufForInterpo);
tfree(pReducer->prevRowOfInput); tfree(pReducer->prevRowOfInput);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -363,7 +362,8 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -363,7 +362,8 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
if (pQueryInfo->fillType != TSDB_FILL_NONE) { if (pQueryInfo->fillType != TSDB_FILL_NONE) {
SFillColInfo* pFillCol = createFillColInfo(pQueryInfo); SFillColInfo* pFillCol = createFillColInfo(pQueryInfo);
pReducer->pFillInfo = taosInitFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, pReducer->pFillInfo = taosInitFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols,
4096, numOfCols, pQueryInfo->slidingTime, pQueryInfo->fillType, pFillCol); 4096, numOfCols, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit,
tinfo.precision, pQueryInfo->fillType, pFillCol);
} }
int32_t startIndex = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols; int32_t startIndex = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols;
...@@ -494,7 +494,7 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { ...@@ -494,7 +494,7 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
tscTrace("%p waiting for delete procedure, status: %d", pSql, status); tscTrace("%p waiting for delete procedure, status: %d", pSql, status);
} }
taosDestoryFillInfo(pLocalReducer->pFillInfo); pLocalReducer->pFillInfo = taosDestoryFillInfo(pLocalReducer->pFillInfo);
if (pLocalReducer->pCtx != NULL) { if (pLocalReducer->pCtx != NULL) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
...@@ -980,8 +980,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO ...@@ -980,8 +980,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
} }
/* all output for current group are completed */ /* all output for current group are completed */
int32_t totalRemainRows = int32_t totalRemainRows = getFilledNumOfRes(pFillInfo, actualETime, pLocalReducer->resColModel->capacity);
taosGetNumOfResultWithFill(pFillInfo, rpoints, pFillInfo->slidingTime, actualETime);
if (totalRemainRows <= 0) { if (totalRemainRows <= 0) {
break; break;
} }
...@@ -1267,13 +1266,7 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no ...@@ -1267,13 +1266,7 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no
SFillInfo* pFillInfo = pLocalReducer->pFillInfo; SFillInfo* pFillInfo = pLocalReducer->pFillInfo;
if (pFillInfo != NULL) { if (pFillInfo != NULL) {
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); taosFillSetStartInfo(pFillInfo, pResBuf->num, pQueryInfo->window.ekey);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
TSKEY ekey = taosGetRevisedEndKey(pQueryInfo->window.ekey, pFillInfo->order, pFillInfo->slidingTime,
pQueryInfo->slidingTimeUnit, tinfo.precision);
taosFillSetStartInfo(pFillInfo, pResBuf->num, ekey);
taosFillCopyInputDataFromOneFilePage(pFillInfo, pResBuf); taosFillCopyInputDataFromOneFilePage(pFillInfo, pResBuf);
} }
...@@ -1327,23 +1320,15 @@ static bool doBuildFilledResultForGroup(SSqlObj *pSql) { ...@@ -1327,23 +1320,15 @@ static bool doBuildFilledResultForGroup(SSqlObj *pSql) {
SLocalReducer *pLocalReducer = pRes->pLocalReducer; SLocalReducer *pLocalReducer = pRes->pLocalReducer;
SFillInfo *pFillInfo = pLocalReducer->pFillInfo; SFillInfo *pFillInfo = pLocalReducer->pFillInfo;
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
int8_t p = tinfo.precision;
if (pFillInfo != NULL && taosNumOfRemainRows(pFillInfo) > 0) { if (pFillInfo != NULL && taosNumOfRemainRows(pFillInfo) > 0) {
assert(pQueryInfo->fillType != TSDB_FILL_NONE); assert(pQueryInfo->fillType != TSDB_FILL_NONE);
tFilePage *pFinalDataBuf = pLocalReducer->pResultBuf; tFilePage *pFinalDataBuf = pLocalReducer->pResultBuf;
int64_t etime = *(int64_t *)(pFinalDataBuf->data + TSDB_KEYSIZE * (pFillInfo->numOfRows - 1)); int64_t etime = *(int64_t *)(pFinalDataBuf->data + TSDB_KEYSIZE * (pFillInfo->numOfRows - 1));
int32_t remain = taosNumOfRemainRows(pFillInfo);
TSKEY ekey = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit, p);
// the first column must be the timestamp column // the first column must be the timestamp column
int32_t rows = taosGetNumOfResultWithFill(pFillInfo, remain, ekey, pLocalReducer->resColModel->capacity); int32_t rows = getFilledNumOfRes(pFillInfo, etime, pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo if (rows > 0) { // do fill gap
doFillResult(pSql, pLocalReducer, false); doFillResult(pSql, pLocalReducer, false);
} }
...@@ -1362,10 +1347,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) { ...@@ -1362,10 +1347,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
bool prevGroupCompleted = (!pLocalReducer->discard) && pLocalReducer->hasUnprocessedRow; bool prevGroupCompleted = (!pLocalReducer->discard) && pLocalReducer->hasUnprocessedRow;
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
if ((isAllSourcesCompleted(pLocalReducer) && !pLocalReducer->hasPrevRow) || pLocalReducer->pLocalDataSrc[0] == NULL || if ((isAllSourcesCompleted(pLocalReducer) && !pLocalReducer->hasPrevRow) || pLocalReducer->pLocalDataSrc[0] == NULL ||
prevGroupCompleted) { prevGroupCompleted) {
...@@ -1373,9 +1355,8 @@ static bool doHandleLastRemainData(SSqlObj *pSql) { ...@@ -1373,9 +1355,8 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
if (pQueryInfo->fillType != TSDB_FILL_NONE) { if (pQueryInfo->fillType != TSDB_FILL_NONE) {
int64_t etime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.ekey : pQueryInfo->window.skey; int64_t etime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.ekey : pQueryInfo->window.skey;
etime = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, assert(pFillInfo->numOfRows == 0);
pQueryInfo->slidingTimeUnit, tinfo.precision); int32_t rows = getFilledNumOfRes(pFillInfo, etime, pLocalReducer->resColModel->capacity);
int32_t rows = taosGetNumOfResultWithFill(pFillInfo, 0, etime, pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo if (rows > 0) { // do interpo
doFillResult(pSql, pLocalReducer, true); doFillResult(pSql, pLocalReducer, true);
} }
......
...@@ -1018,7 +1018,7 @@ int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1018,7 +1018,7 @@ int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload; SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
strcpy(pDrop->ep, pTableMetaInfo->name); tstrncpy(pDrop->ep, pTableMetaInfo->name, sizeof(pDrop->ep));
pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE; pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1036,7 +1036,7 @@ int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1036,7 +1036,7 @@ int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload; SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
strcpy(pDropMsg->user, pTableMetaInfo->name); tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1091,9 +1091,9 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1091,9 +1091,9 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
size_t nameLen = strlen(pTableMetaInfo->name); size_t nameLen = strlen(pTableMetaInfo->name);
if (nameLen > 0) { if (nameLen > 0) {
strcpy(pShowMsg->db, pTableMetaInfo->name); // prefix is set here tstrncpy(pShowMsg->db, pTableMetaInfo->name, sizeof(pShowMsg->db)); // prefix is set here
} else { } else {
strcpy(pShowMsg->db, pObj->db); tstrncpy(pShowMsg->db, pObj->db, sizeof(pShowMsg->db));
} }
SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt; SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
...@@ -1300,7 +1300,7 @@ int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1300,7 +1300,7 @@ int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload; SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
strcpy(pAlterDbMsg->db, pTableMetaInfo->name); tstrncpy(pAlterDbMsg->db, pTableMetaInfo->name, sizeof(pAlterDbMsg->db));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2150,7 +2150,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { ...@@ -2150,7 +2150,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp; SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
strcpy(pObj->acctId, pConnect->acctId); // copy acctId from response tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId)); // copy acctId from response
int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db); int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);
assert(len <= sizeof(pObj->db)); assert(len <= sizeof(pObj->db));
...@@ -2172,7 +2172,7 @@ int tscProcessUseDbRsp(SSqlObj *pSql) { ...@@ -2172,7 +2172,7 @@ int tscProcessUseDbRsp(SSqlObj *pSql) {
STscObj * pObj = pSql->pTscObj; STscObj * pObj = pSql->pTscObj;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
strcpy(pObj->db, pTableMetaInfo->name); tstrncpy(pObj->db, pTableMetaInfo->name, sizeof(pObj->db));
return 0; return 0;
} }
......
...@@ -144,11 +144,11 @@ void taos_init_imp() { ...@@ -144,11 +144,11 @@ void taos_init_imp() {
} }
int64_t refreshTime = tsTableMetaKeepTimer; int64_t refreshTime = tsTableMetaKeepTimer;
refreshTime = refreshTime > 2 ? 2 : refreshTime; refreshTime = refreshTime > 10 ? 10 : refreshTime;
refreshTime = refreshTime < 1 ? 1 : refreshTime; refreshTime = refreshTime < 10 ? 10 : refreshTime;
if (tscCacheHandle == NULL) { if (tscCacheHandle == NULL) {
tscCacheHandle = taosCacheInit(tscTmr, refreshTime); tscCacheHandle = taosCacheInit(refreshTime);
} }
tscTrace("client is initialized successfully"); tscTrace("client is initialized successfully");
......
...@@ -593,7 +593,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff ...@@ -593,7 +593,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
dataBuf->size = startOffset; dataBuf->size = startOffset;
dataBuf->tsSource = -1; dataBuf->tsSource = -1;
strncpy(dataBuf->tableId, name, TSDB_TABLE_ID_LEN); tstrncpy(dataBuf->tableId, name, sizeof(dataBuf->tableId));
/* /*
* The table meta may be released since the table meta cache are completed clean by other thread * The table meta may be released since the table meta cache are completed clean by other thread
...@@ -1666,7 +1666,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST ...@@ -1666,7 +1666,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
assert(pTableMetaInfo != NULL); assert(pTableMetaInfo != NULL);
if (name != NULL) { if (name != NULL) {
strncpy(pTableMetaInfo->name, name, TSDB_TABLE_ID_LEN); tstrncpy(pTableMetaInfo->name, name, sizeof(pTableMetaInfo->name));
} }
pTableMetaInfo->pTableMeta = pTableMeta; pTableMetaInfo->pTableMeta = pTableMeta;
......
...@@ -273,8 +273,7 @@ void dataColSetNullAt(SDataCol *pCol, int index) { ...@@ -273,8 +273,7 @@ void dataColSetNullAt(SDataCol *pCol, int index) {
if (IS_VAR_DATA_TYPE(pCol->type)) { if (IS_VAR_DATA_TYPE(pCol->type)) {
pCol->dataOff[index] = pCol->len; pCol->dataOff[index] = pCol->len;
char *ptr = POINTER_SHIFT(pCol->pData, pCol->len); char *ptr = POINTER_SHIFT(pCol->pData, pCol->len);
varDataLen(ptr) = (pCol->type == TSDB_DATA_TYPE_BINARY) ? sizeof(char) : TSDB_NCHAR_SIZE; setVardataNull(ptr, pCol->type);
setNull(varDataVal(ptr), pCol->type, pCol->bytes);
pCol->len += varDataTLen(ptr); pCol->len += varDataTLen(ptr);
} else { } else {
setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes); setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes);
......
...@@ -73,8 +73,8 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { ...@@ -73,8 +73,8 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
return NULL; return NULL;
} }
strcpy(pContext->user, pCfg->user); tstrncpy(pContext->user, pCfg->user, sizeof(pContext->user));
strcpy(pContext->pass, pCfg->pass); tstrncpy(pContext->pass, pCfg->pass, sizeof(pContext->pass));
const char* db = pCfg->db; const char* db = pCfg->db;
for (const char* p = db; *p != 0; p++) { for (const char* p = db; *p != 0; p++) {
if (*p == '.') { if (*p == '.') {
...@@ -82,7 +82,7 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { ...@@ -82,7 +82,7 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
break; break;
} }
} }
strcpy(pContext->db, db); tstrncpy(pContext->db, db, sizeof(pContext->db));
pContext->vgId = pCfg->vgId; pContext->vgId = pCfg->vgId;
pContext->cqWrite = pCfg->cqWrite; pContext->cqWrite = pCfg->cqWrite;
pContext->ahandle = ahandle; pContext->ahandle = ahandle;
...@@ -215,7 +215,7 @@ void cqDrop(void *handle) { ...@@ -215,7 +215,7 @@ void cqDrop(void *handle) {
cTrace("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr); cTrace("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr);
free(pObj); free(pObj);
pthread_mutex_lock(&pContext->mutex); pthread_mutex_unlock(&pContext->mutex);
} }
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
......
...@@ -296,7 +296,7 @@ typedef struct { ...@@ -296,7 +296,7 @@ typedef struct {
} SCMConnectMsg; } SCMConnectMsg;
typedef struct { typedef struct {
char acctId[TSDB_ACCT_LEN + 1]; char acctId[TSDB_ACCT_LEN];
char serverVersion[TSDB_VERSION_LEN]; char serverVersion[TSDB_VERSION_LEN];
int8_t writeAuth; int8_t writeAuth;
int8_t superAuth; int8_t superAuth;
......
...@@ -27,7 +27,7 @@ typedef struct { ...@@ -27,7 +27,7 @@ typedef struct {
int vgId; int vgId;
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN]; char pass[TSDB_PASSWORD_LEN];
char db[TSDB_DB_NAME_LEN + 1]; char db[TSDB_DB_NAME_LEN];
FCqWrite cqWrite; FCqWrite cqWrite;
} SCqCfg; } SCqCfg;
......
...@@ -710,7 +710,7 @@ void *readTable(void *sarg) { ...@@ -710,7 +710,7 @@ void *readTable(void *sarg) {
int32_t code = taos_errno(pSql); int32_t code = taos_errno(pSql);
if (code != 0) { if (code != 0) {
fprintf(stderr, "Failed to query:%s\n", taos_errstr(taos)); fprintf(stderr, "Failed to query:%s\n", taos_errstr(pSql));
taos_free_result(pSql); taos_free_result(pSql);
taos_close(taos); taos_close(taos);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
...@@ -756,7 +756,7 @@ void *readMetric(void *sarg) { ...@@ -756,7 +756,7 @@ void *readMetric(void *sarg) {
for (int j = 0; j < n; j++) { for (int j = 0; j < n; j++) {
char condition[BUFFER_SIZE - 30] = "\0"; char condition[BUFFER_SIZE - 30] = "\0";
char tempS[BUFFER_SIZE] = "\0"; char tempS[64] = "\0";
int m = 10 < num_of_tables ? 10 : num_of_tables; int m = 10 < num_of_tables ? 10 : num_of_tables;
...@@ -779,7 +779,7 @@ void *readMetric(void *sarg) { ...@@ -779,7 +779,7 @@ void *readMetric(void *sarg) {
int32_t code = taos_errno(pSql); int32_t code = taos_errno(pSql);
if (code != 0) { if (code != 0) {
fprintf(stderr, "Failed to query:%s\n", taos_errstr(taos)); fprintf(stderr, "Failed to query:%s\n", taos_errstr(pSql));
taos_free_result(pSql); taos_free_result(pSql);
taos_close(taos); taos_close(taos);
exit(1); exit(1);
...@@ -818,7 +818,9 @@ void queryDB(TAOS *taos, char *command) { ...@@ -818,7 +818,9 @@ void queryDB(TAOS *taos, char *command) {
} }
if (i == 0) { if (i == 0) {
fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(taos)); fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(pSql));
taos_free_result(pSql);
taos_close(taos); taos_close(taos);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
...@@ -914,7 +916,7 @@ void callBack(void *param, TAOS_RES *res, int code) { ...@@ -914,7 +916,7 @@ void callBack(void *param, TAOS_RES *res, int code) {
int64_t tmp_time = tb_info->timestamp; int64_t tmp_time = tb_info->timestamp;
if (code < 0) { if (code < 0) {
fprintf(stderr, "failed to insert data %d:reason; %s\n", code, taos_errstr(tb_info->taos)); fprintf(stderr, "failed to insert data %d:reason; %s\n", code, taos_errstr(res));
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
......
...@@ -643,6 +643,8 @@ int taosDumpDb(SDbInfo *dbInfo, SDumpArguments *arguments, FILE *fp) { ...@@ -643,6 +643,8 @@ int taosDumpDb(SDbInfo *dbInfo, SDumpArguments *arguments, FILE *fp) {
lseek(fd, 0, SEEK_SET); lseek(fd, 0, SEEK_SET);
while (read(fd, &tableRecord, sizeof(STableRecord)) > 0) { while (read(fd, &tableRecord, sizeof(STableRecord)) > 0) {
tableRecord.name[sizeof(tableRecord.name) - 1] = 0;
tableRecord.metric[sizeof(tableRecord.metric) - 1] = 0;
taosDumpTable(tableRecord.name, tableRecord.metric, arguments, fp); taosDumpTable(tableRecord.name, tableRecord.metric, arguments, fp);
} }
...@@ -902,6 +904,8 @@ int32_t taosDumpMetric(char *metric, SDumpArguments *arguments, FILE *fp) { ...@@ -902,6 +904,8 @@ int32_t taosDumpMetric(char *metric, SDumpArguments *arguments, FILE *fp) {
lseek(fd, 0, SEEK_SET); lseek(fd, 0, SEEK_SET);
while (read(fd, &tableRecord, sizeof(STableRecord)) > 0) { while (read(fd, &tableRecord, sizeof(STableRecord)) > 0) {
tableRecord.name[sizeof(tableRecord.name) - 1] = 0;
tableRecord.metric[sizeof(tableRecord.metric) - 1] = 0;
taosDumpTable(tableRecord.name, tableRecord.metric, arguments, fp); taosDumpTable(tableRecord.name, tableRecord.metric, arguments, fp);
} }
......
...@@ -32,8 +32,8 @@ struct SMnodeObj; ...@@ -32,8 +32,8 @@ struct SMnodeObj;
typedef struct SDnodeObj { typedef struct SDnodeObj {
int32_t dnodeId; int32_t dnodeId;
uint16_t dnodePort; uint16_t dnodePort;
char dnodeFqdn[TSDB_FQDN_LEN + 1]; char dnodeFqdn[TSDB_FQDN_LEN];
char dnodeEp[TSDB_EP_LEN + 1]; char dnodeEp[TSDB_EP_LEN];
int64_t createdTime; int64_t createdTime;
uint32_t lastAccess; uint32_t lastAccess;
int32_t openVnodes; int32_t openVnodes;
......
...@@ -67,7 +67,7 @@ int32_t mnodeInitProfile() { ...@@ -67,7 +67,7 @@ int32_t mnodeInitProfile() {
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mnodeProcessKillStreamMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mnodeProcessKillStreamMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mnodeProcessKillConnectionMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mnodeProcessKillConnectionMsg);
tsMnodeConnCache = taosCacheInitWithCb(tsMnodeTmr, CONN_CHECK_TIME, mnodeFreeConn); tsMnodeConnCache = taosCacheInitWithCb(CONN_CHECK_TIME, mnodeFreeConn);
return 0; return 0;
} }
......
...@@ -65,7 +65,7 @@ int32_t mnodeInitShow() { ...@@ -65,7 +65,7 @@ int32_t mnodeInitShow() {
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg); mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg);
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg); mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg);
tsMnodeShowCache = taosCacheInitWithCb(tsMnodeTmr, 10, mnodeFreeShowObj); tsMnodeShowCache = taosCacheInitWithCb(10, mnodeFreeShowObj);
return 0; return 0;
} }
......
...@@ -1769,8 +1769,8 @@ static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) { ...@@ -1769,8 +1769,8 @@ static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_OUT_OF_MEMORY; return TSDB_CODE_MND_OUT_OF_MEMORY;
} }
strncpy(pCreateMsg->tableId, pInfo->tableId, tListLen(pInfo->tableId)); tstrncpy(pCreateMsg->tableId, pInfo->tableId, sizeof(pInfo->tableId));
strcpy(pCreateMsg->db, pMsg->pDb->name); tstrncpy(pCreateMsg->db, pMsg->pDb->name, sizeof(pCreateMsg->db));
pCreateMsg->igExists = 1; pCreateMsg->igExists = 1;
pCreateMsg->getMeta = 1; pCreateMsg->getMeta = 1;
pCreateMsg->contLen = htonl(contLen); pCreateMsg->contLen = htonl(contLen);
......
...@@ -160,7 +160,7 @@ static void taosGetSystemTimezone() { ...@@ -160,7 +160,7 @@ static void taosGetSystemTimezone() {
/* load time zone string from /etc/timezone */ /* load time zone string from /etc/timezone */
FILE *f = fopen("/etc/timezone", "r"); FILE *f = fopen("/etc/timezone", "r");
char buf[64] = {0}; char buf[65] = {0};
if (f != NULL) { if (f != NULL) {
fread(buf, 64, 1, f); fread(buf, 64, 1, f);
fclose(f); fclose(f);
......
...@@ -88,8 +88,8 @@ bool httpParseTaosdAuthToken(HttpContext *pContext, char *token, int len) { ...@@ -88,8 +88,8 @@ bool httpParseTaosdAuthToken(HttpContext *pContext, char *token, int len) {
free(base64); free(base64);
return false; return false;
} else { } else {
strncpy(pContext->user, descrypt, TSDB_USER_LEN); tstrncpy(pContext->user, descrypt, sizeof(pContext->user));
strncpy(pContext->pass, descrypt + TSDB_USER_LEN, TSDB_PASSWORD_LEN); tstrncpy(pContext->pass, descrypt + TSDB_USER_LEN, TSDB_PASSWORD_LEN);
httpTrace("context:%p, fd:%d, ip:%s, taosd token:%s parsed success, user:%s", pContext, pContext->fd, httpTrace("context:%p, fd:%d, ip:%s, taosd token:%s parsed success, user:%s", pContext, pContext->fd,
pContext->ipstr, token, pContext->user); pContext->ipstr, token, pContext->user);
......
...@@ -156,7 +156,7 @@ static void dnodeBuildMonitorSql(char *sql, int32_t cmd) { ...@@ -156,7 +156,7 @@ static void dnodeBuildMonitorSql(char *sql, int32_t cmd) {
", io_read float, io_write float" ", io_read float, io_write float"
", req_http int, req_select int, req_insert int" ", req_http int, req_select int, req_insert int"
") tags (dnodeid int, fqdn binary(%d))", ") tags (dnodeid int, fqdn binary(%d))",
tsMonitorDbName, TSDB_FQDN_LEN + 1); tsMonitorDbName, TSDB_FQDN_LEN);
} else if (cmd == MONITOR_CMD_CREATE_TB_DN) { } else if (cmd == MONITOR_CMD_CREATE_TB_DN) {
snprintf(sql, SQL_LENGTH, "create table if not exists %s.dn%d using %s.dn tags(%d, '%s')", tsMonitorDbName, snprintf(sql, SQL_LENGTH, "create table if not exists %s.dn%d using %s.dn tags(%d, '%s')", tsMonitorDbName,
dnodeGetDnodeId(), tsMonitorDbName, dnodeGetDnodeId(), tsLocalEp); dnodeGetDnodeId(), tsMonitorDbName, dnodeGetDnodeId(), tsLocalEp);
......
...@@ -50,7 +50,8 @@ typedef struct SFillInfo { ...@@ -50,7 +50,8 @@ typedef struct SFillInfo {
char * nextValues; // next row of data char * nextValues; // next row of data
char** pData; // original result data block involved in filling data char** pData; // original result data block involved in filling data
int32_t capacityInRows; // data buffer size in rows int32_t capacityInRows; // data buffer size in rows
int8_t slidingUnit; // sliding time unit
int8_t precision; // time resoluation
SFillColInfo* pFillCol; // column info for fill operations SFillColInfo* pFillCol; // column info for fill operations
} SFillInfo; } SFillInfo;
...@@ -61,12 +62,13 @@ typedef struct SPoint { ...@@ -61,12 +62,13 @@ typedef struct SPoint {
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, char timeUnit, int16_t precision); int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, char timeUnit, int16_t precision);
SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
int32_t numOfCols, int64_t slidingTime, int32_t fillType, SFillColInfo* pFillCol); int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType,
SFillColInfo* pFillCol);
void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp); void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp);
void taosDestoryFillInfo(SFillInfo *pFillInfo); void* taosDestoryFillInfo(SFillInfo *pFillInfo);
void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey); void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
...@@ -74,9 +76,7 @@ void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput) ...@@ -74,9 +76,7 @@ void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput)
void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInput); void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInput);
TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterval, int8_t slidingTimeUnit, int8_t precision); int64_t getFilledNumOfRes(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows);
int64_t taosGetNumOfResultWithFill(SFillInfo* pFillInfo, int32_t numOfRows, int64_t ekey, int32_t maxNumOfRows);
int32_t taosNumOfRemainRows(SFillInfo *pFillInfo); int32_t taosNumOfRemainRows(SFillInfo *pFillInfo);
......
...@@ -1466,7 +1466,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -1466,7 +1466,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
tfree(pRuntimeEnv->pCtx); tfree(pRuntimeEnv->pCtx);
} }
taosDestoryFillInfo(pRuntimeEnv->pFillInfo); pRuntimeEnv->pFillInfo = taosDestoryFillInfo(pRuntimeEnv->pFillInfo);
destroyResultBuf(pRuntimeEnv->pResultBuf, pQInfo); destroyResultBuf(pRuntimeEnv->pResultBuf, pQInfo);
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
...@@ -3557,9 +3557,7 @@ bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) { ...@@ -3557,9 +3557,7 @@ bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) {
* first result row in the actual result set will fill nothing. * first result row in the actual result set will fill nothing.
*/ */
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->slidingTime, int32_t numOfTotal = getFilledNumOfRes(pFillInfo, pQuery->window.ekey, pQuery->rec.capacity);
pQuery->slidingTimeUnit, pQuery->precision);
int32_t numOfTotal = taosGetNumOfResultWithFill(pFillInfo, remain, ekey, pQuery->rec.capacity);
return numOfTotal > 0; return numOfTotal > 0;
} }
...@@ -3601,7 +3599,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data ...@@ -3601,7 +3599,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
} }
} }
int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int32_t numOfRows, int32_t *numOfInterpo) { int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int32_t *numOfInterpo) {
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo; SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo;
...@@ -4013,7 +4011,8 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool ...@@ -4013,7 +4011,8 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) { if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) {
SFillColInfo* pColInfo = taosCreateFillColInfo(pQuery); SFillColInfo* pColInfo = taosCreateFillColInfo(pQuery);
pRuntimeEnv->pFillInfo = taosInitFillInfo(pQuery->order.order, 0, 0, pQuery->rec.capacity, pQuery->numOfOutput, pRuntimeEnv->pFillInfo = taosInitFillInfo(pQuery->order.order, 0, 0, pQuery->rec.capacity, pQuery->numOfOutput,
pQuery->slidingTime, pQuery->fillType, pColInfo); pQuery->slidingTime, pQuery->slidingTimeUnit, pQuery->precision,
pQuery->fillType, pColInfo);
} }
// todo refactor // todo refactor
...@@ -4666,13 +4665,11 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { ...@@ -4666,13 +4665,11 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
limitResults(pRuntimeEnv); limitResults(pRuntimeEnv);
break; break;
} else { } else {
TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->slidingTime, taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pQuery->rec.rows, pQuery->window.ekey);
pQuery->slidingTimeUnit, pQuery->precision);
taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pQuery->rec.rows, ekey);
taosFillCopyInputDataFromFilePage(pRuntimeEnv->pFillInfo, (tFilePage**) pQuery->sdata); taosFillCopyInputDataFromFilePage(pRuntimeEnv->pFillInfo, (tFilePage**) pQuery->sdata);
numOfInterpo = 0; numOfInterpo = 0;
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, pQuery->rec.rows, &numOfInterpo); pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfInterpo);
if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
limitResults(pRuntimeEnv); limitResults(pRuntimeEnv);
break; break;
...@@ -4704,8 +4701,7 @@ static void tableQueryImpl(SQInfo *pQInfo) { ...@@ -4704,8 +4701,7 @@ static void tableQueryImpl(SQInfo *pQInfo) {
* So, we do keep in this procedure instead of launching retrieve procedure for next results. * So, we do keep in this procedure instead of launching retrieve procedure for next results.
*/ */
int32_t numOfInterpo = 0; int32_t numOfInterpo = 0;
int32_t remain = taosNumOfRemainRows(pRuntimeEnv->pFillInfo); pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfInterpo);
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, remain, &numOfInterpo);
if (pQuery->rec.rows > 0) { if (pQuery->rec.rows > 0) {
limitResults(pRuntimeEnv); limitResults(pRuntimeEnv);
...@@ -5242,7 +5238,7 @@ static int32_t createFilterInfo(void *pQInfo, SQuery *pQuery) { ...@@ -5242,7 +5238,7 @@ static int32_t createFilterInfo(void *pQInfo, SQuery *pQuery) {
if (pQuery->colList[i].numOfFilters > 0) { if (pQuery->colList[i].numOfFilters > 0) {
SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[j]; SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[j];
memcpy(&pFilterInfo->info, &pQuery->colList[i], sizeof(SColumnInfoData)); memcpy(&pFilterInfo->info, &pQuery->colList[i], sizeof(SColumnInfo));
pFilterInfo->info = pQuery->colList[i]; pFilterInfo->info = pQuery->colList[i];
pFilterInfo->numOfFilters = pQuery->colList[i].numOfFilters; pFilterInfo->numOfFilters = pQuery->colList[i].numOfFilters;
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "qfill.h"
#include "os.h" #include "os.h"
#include "qfill.h"
#include "qextbuffer.h" #include "qextbuffer.h"
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
...@@ -58,7 +58,7 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, ch ...@@ -58,7 +58,7 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, ch
} }
SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
int64_t slidingTime, int32_t fillType, SFillColInfo* pFillCol) { int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, SFillColInfo* pFillCol) {
if (fillType == TSDB_FILL_NONE) { if (fillType == TSDB_FILL_NONE) {
return NULL; return NULL;
} }
...@@ -72,8 +72,10 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_ ...@@ -72,8 +72,10 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_
pFillInfo->pFillCol = pFillCol; pFillInfo->pFillCol = pFillCol;
pFillInfo->numOfTags = numOfTags; pFillInfo->numOfTags = numOfTags;
pFillInfo->numOfCols = numOfCols; pFillInfo->numOfCols = numOfCols;
pFillInfo->precision = precision;
pFillInfo->slidingTime = slidingTime; pFillInfo->slidingTime = slidingTime;
pFillInfo->slidingUnit = slidingUnit;
pFillInfo->pData = malloc(POINTER_BYTES * numOfCols); pFillInfo->pData = malloc(POINTER_BYTES * numOfCols);
int32_t rowsize = 0; int32_t rowsize = 0;
...@@ -102,9 +104,9 @@ void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp) { ...@@ -102,9 +104,9 @@ void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp) {
pFillInfo->numOfTotal = 0; pFillInfo->numOfTotal = 0;
} }
void taosDestoryFillInfo(SFillInfo* pFillInfo) { void* taosDestoryFillInfo(SFillInfo* pFillInfo) {
if (pFillInfo == NULL) { if (pFillInfo == NULL) {
return; return NULL;
} }
tfree(pFillInfo->prevValues); tfree(pFillInfo->prevValues);
...@@ -119,6 +121,15 @@ void taosDestoryFillInfo(SFillInfo* pFillInfo) { ...@@ -119,6 +121,15 @@ void taosDestoryFillInfo(SFillInfo* pFillInfo) {
tfree(pFillInfo->pFillCol); tfree(pFillInfo->pFillCol);
tfree(pFillInfo); tfree(pFillInfo);
return NULL;
}
static TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterval, int8_t slidingTimeUnit, int8_t precision) {
if (order == TSDB_ORDER_ASC) {
return ekey;
} else {
return taosGetIntervalStartTimestamp(ekey, timeInterval, slidingTimeUnit, precision);
}
} }
void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) { void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) {
...@@ -126,8 +137,10 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) ...@@ -126,8 +137,10 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey)
return; return;
} }
pFillInfo->endKey = taosGetRevisedEndKey(endKey, pFillInfo->order, pFillInfo->slidingTime, pFillInfo->slidingUnit,
pFillInfo->precision);
pFillInfo->rowIdx = 0; pFillInfo->rowIdx = 0;
pFillInfo->endKey = endKey;
pFillInfo->numOfRows = numOfRows; pFillInfo->numOfRows = numOfRows;
// ensure the space // ensure the space
...@@ -165,36 +178,29 @@ void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInpu ...@@ -165,36 +178,29 @@ void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInpu
} }
} }
TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterval, int8_t slidingTimeUnit, int8_t precision) { int64_t getFilledNumOfRes(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows) {
if (order == TSDB_ORDER_ASC) { int64_t* tsList = (int64_t*) pFillInfo->pData[0];
return ekey;
} else {
return taosGetIntervalStartTimestamp(ekey, timeInterval, slidingTimeUnit, precision);
}
}
static int32_t taosGetTotalNumOfFilledRes(SFillInfo* pFillInfo, const TSKEY* tsArray, int32_t remain, int32_t numOfRows = taosNumOfRemainRows(pFillInfo);
int64_t nInterval, int64_t ekey) {
TSKEY ekey1 = taosGetRevisedEndKey(ekey, pFillInfo->order, pFillInfo->slidingTime, pFillInfo->slidingUnit,
if (remain > 0) { // still fill gap within current data block, not generating data after the result set. pFillInfo->precision);
TSKEY lastKey = tsArray[pFillInfo->numOfRows - 1];
int32_t total = (int32_t)(labs(lastKey - pFillInfo->start) / nInterval) + 1;
assert(total >= remain); int64_t numOfRes = -1;
return total; if (numOfRows > 0) { // still fill gap within current data block, not generating data after the result set.
TSKEY lastKey = tsList[pFillInfo->numOfRows - 1];
numOfRes = (int64_t)(labs(lastKey - pFillInfo->start) / pFillInfo->slidingTime) + 1;
assert(numOfRes >= numOfRows);
} else { // reach the end of data } else { // reach the end of data
if ((ekey < pFillInfo->start && FILL_IS_ASC_FILL(pFillInfo)) || if ((ekey1 < pFillInfo->start && FILL_IS_ASC_FILL(pFillInfo)) ||
(ekey > pFillInfo->start && !FILL_IS_ASC_FILL(pFillInfo))) { (ekey1 > pFillInfo->start && !FILL_IS_ASC_FILL(pFillInfo))) {
return 0; return 0;
} else { } else { // the numOfRes rows are all filled with specified policy
return (int32_t)(labs(ekey - pFillInfo->start) / nInterval) + 1; numOfRes = (labs(ekey1 - pFillInfo->start) / pFillInfo->slidingTime) + 1;
} }
} }
}
int64_t taosGetNumOfResultWithFill(SFillInfo* pFillInfo, int32_t numOfRows, int64_t ekey, int32_t maxNumOfRows) {
int32_t numOfRes = taosGetTotalNumOfFilledRes(pFillInfo, (int64_t*) pFillInfo->pData[0], numOfRows,
pFillInfo->slidingTime, ekey);
return (numOfRes > maxNumOfRows) ? maxNumOfRows : numOfRes; return (numOfRes > maxNumOfRows) ? maxNumOfRows : numOfRes;
} }
...@@ -496,8 +502,8 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu ...@@ -496,8 +502,8 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu
int64_t taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity) { int64_t taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity) {
int32_t remain = taosNumOfRemainRows(pFillInfo); // todo use iterator? int32_t remain = taosNumOfRemainRows(pFillInfo); // todo use iterator?
int32_t rows = taosGetNumOfResultWithFill(pFillInfo, remain, pFillInfo->endKey, capacity);
int32_t rows = getFilledNumOfRes(pFillInfo, pFillInfo->endKey, capacity);
int32_t numOfRes = generateDataBlockImpl(pFillInfo, output, remain, rows, pFillInfo->pData); int32_t numOfRes = generateDataBlockImpl(pFillInfo, output, remain, rows, pFillInfo->pData);
assert(numOfRes == rows); assert(numOfRes == rows);
......
...@@ -233,8 +233,6 @@ TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* ...@@ -233,8 +233,6 @@ TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond*
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo); STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL; pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
// pQueryHandle->outputCapacity = 2; // only allowed two rows to be loaded
changeQueryHandleForInterpQuery(pQueryHandle); changeQueryHandleForInterpQuery(pQueryHandle);
return pQueryHandle; return pQueryHandle;
} }
...@@ -618,54 +616,19 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo ...@@ -618,54 +616,19 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlock); SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlock);
/*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo); /*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
SDataRow row = getSDataRowInTableMem(pCheckInfo);
TSKEY k1 = TSKEY_INITIAL_VAL, k2 = TSKEY_INITIAL_VAL;
if (pCheckInfo->iter != NULL && tSkipListIterGet(pCheckInfo->iter) != NULL) { TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL;
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
SDataRow row = SL_GET_NODE_DATA(node);
k1 = dataRowKey(row);
if (k1 == binfo.window.skey) {
if (tSkipListIterNext(pCheckInfo->iter)) {
node = tSkipListIterGet(pCheckInfo->iter);
row = SL_GET_NODE_DATA(node);
k1 = dataRowKey(row);
} else {
k1 = TSKEY_INITIAL_VAL;
}
}
}
if (pCheckInfo->iiter != NULL && tSkipListIterGet(pCheckInfo->iiter) != NULL) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
SDataRow row = SL_GET_NODE_DATA(node);
k2 = dataRowKey(row);
if (k2 == binfo.window.skey) {
if (tSkipListIterNext(pCheckInfo->iiter)) {
node = tSkipListIterGet(pCheckInfo->iiter);
row = SL_GET_NODE_DATA(node);
k2 = dataRowKey(row);
} else {
k2 = TSKEY_INITIAL_VAL;
}
}
}
cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(binfo.rows-1); cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(binfo.rows-1);
if ((ASCENDING_TRAVERSE(pQueryHandle->order) && if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
((k1 != TSKEY_INITIAL_VAL && k1 <= binfo.window.ekey) || (k2 != TSKEY_INITIAL_VAL && k2 <= binfo.window.ekey))) || (!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
(!ASCENDING_TRAVERSE(pQueryHandle->order) &&
((k1 != TSKEY_INITIAL_VAL && k1 >= binfo.window.skey) || (k2 != TSKEY_INITIAL_VAL && k2 >= binfo.window.skey)))) {
if ((ASCENDING_TRAVERSE(pQueryHandle->order) && if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
((k1 != TSKEY_INITIAL_VAL && k1 < binfo.window.skey) || (k2 != TSKEY_INITIAL_VAL && k2 < binfo.window.skey))) || (!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) {
(!ASCENDING_TRAVERSE(pQueryHandle->order) &&
(((k1 != TSKEY_INITIAL_VAL && k1 > binfo.window.ekey) || (k2 != TSKEY_INITIAL_VAL && k2 > binfo.window.ekey))))) {
// do not load file block into buffer // do not load file block into buffer
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1;
...@@ -756,7 +719,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock ...@@ -756,7 +719,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
return pQueryHandle->realNumOfRows > 0; return pQueryHandle->realNumOfRows > 0;
} }
static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) { static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
int firstPos, lastPos, midPos = -1; int firstPos, lastPos, midPos = -1;
int numOfRows; int numOfRows;
TSKEY* keyList; TSKEY* keyList;
...@@ -868,37 +831,63 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap ...@@ -868,37 +831,63 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap
return numOfRows + num; return numOfRows + num;
} }
static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, int32_t capacity, static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SDataRow row,
int32_t numOfRows, SDataRow row, STSchema* pSchema) { STsdbMeta *pMeta, int32_t numOfCols, STable* pTable) {
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
int32_t numOfTableCols = schemaNCols(pSchema);
char* pData = NULL; char* pData = NULL;
for (int32_t i = 0; i < numOfCols; ++i) {
// the schema version info is embeded in SDataRow
STSchema* pSchema = tsdbGetTableSchemaByVersion(pMeta, pTable, dataRowVersion(row));
int32_t numOfRowCols = schemaNCols(pSchema);
int32_t i = 0, j = 0;
while(i < numOfCols && j < numOfRowCols) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (pSchema->columns[j].colId < pColInfo->info.colId) {
j++;
continue;
}
if (ASCENDING_TRAVERSE(pQueryHandle->order)) { if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes; pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else { } else {
pData = pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes; pData = pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes;
} }
int32_t offset = 0; if (pSchema->columns[j].colId == pColInfo->info.colId) {
for (int32_t j = 0; j < numOfTableCols; ++j) { void* value = tdGetRowDataOfCol(row, pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset);
if (pColInfo->info.colId == pSchema->columns[j].colId) { if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
offset = pSchema->columns[j].offset; memcpy(pData, value, varDataTLen(value));
break; } else {
memcpy(pData, value, pColInfo->info.bytes);
}
j++;
i++;
} else { // pColInfo->info.colId < pSchema->columns[j].colId, it is a NULL data
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pData, pColInfo->info.type);
} else {
setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
} }
i++;
} }
}
assert(offset != -1); // todo handle error
void* value = tdGetRowDataOfCol(row, pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + offset); while (i < numOfCols) { // the remain columns are all null data
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else {
pData = pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes;
}
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
memcpy(pData, value, varDataTLen(value)); setVardataNull(pData, pColInfo->info.type);
} else { } else {
memcpy(pData, value, pColInfo->info.bytes); setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
} }
i++;
} }
} }
...@@ -911,7 +900,16 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -911,7 +900,16 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
initTableMemIterator(pQueryHandle, pCheckInfo); initTableMemIterator(pQueryHandle, pCheckInfo);
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
// for search the endPos, so the order needs to reverse
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
STable* pTable = pCheckInfo->pTableObj;
int32_t endPos = cur->pos; int32_t endPos = cur->pos;
if (ASCENDING_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) { if (ASCENDING_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) {
endPos = blockInfo.rows - 1; endPos = blockInfo.rows - 1;
...@@ -920,8 +918,8 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -920,8 +918,8 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
endPos = 0; endPos = 0;
cur->mixBlock = (cur->pos != blockInfo.rows - 1); cur->mixBlock = (cur->pos != blockInfo.rows - 1);
} else { } else {
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; assert(pCols->numOfRows > 0);
endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pQueryHandle->window.ekey, order); endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pQueryHandle->window.ekey, order);
cur->mixBlock = true; cur->mixBlock = true;
} }
...@@ -933,8 +931,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -933,8 +931,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
int32_t numOfRows = 0; int32_t numOfRows = 0;
pQueryHandle->cur.win = TSWINDOW_INITIALIZER; pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
// no data in buffer, load data from file directly // no data in buffer, load data from file directly
if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) { if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
int32_t start = cur->pos; int32_t start = cur->pos;
...@@ -950,12 +947,11 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -950,12 +947,11 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
// todo opt in case of no data in buffer // todo opt in case of no data in buffer
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end); numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
// if the buffer is not full in case of descending order query, move the data in the front of the buffer // if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < pQueryHandle->outputCapacity) { if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < pQueryHandle->outputCapacity) {
int32_t emptySize = pQueryHandle->outputCapacity - numOfRows; int32_t emptySize = pQueryHandle->outputCapacity - numOfRows;
int32_t reqNumOfCols = taosArrayGetSize(pQueryHandle->pColumns);
for(int32_t i = 0; i < numOfCols; ++i) {
for(int32_t i = 0; i < reqNumOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes); memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
} }
...@@ -969,20 +965,15 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -969,20 +965,15 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
pQueryHandle->realNumOfRows = numOfRows; pQueryHandle->realNumOfRows = numOfRows;
cur->rows = numOfRows; cur->rows = numOfRows;
return; return;
} else if (pCheckInfo->iter != NULL && pCheckInfo->iiter == NULL) { } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
// } else if (pCheckInfo->iter == NULL && pCheckInfo->iiter != NULL) {
// } else { // iter and iiter are all not NULL, three-way merge data block
STSchema* pSchema = tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj);
SSkipListNode* node = NULL; SSkipListNode* node = NULL;
do { do {
node = tSkipListIterGet(pCheckInfo->iter); SDataRow row = getSDataRowInTableMem(pCheckInfo);
if (node == NULL) { if (row == NULL) {
break; break;
} }
SDataRow row = SL_GET_NODE_DATA(node); TSKEY key = dataRowKey(row);
TSKEY key = dataRowKey(row);
if ((key > pQueryHandle->window.ekey && ASCENDING_TRAVERSE(pQueryHandle->order)) || if ((key > pQueryHandle->window.ekey && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key < pQueryHandle->window.ekey && !ASCENDING_TRAVERSE(pQueryHandle->order))) { (key < pQueryHandle->window.ekey && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
break; break;
...@@ -995,7 +986,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -995,7 +986,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) || if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) { (key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
copyOneRowFromMem(pQueryHandle, pCheckInfo, pQueryHandle->outputCapacity, numOfRows, row, pSchema); copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, pMeta, numOfCols, pTable);
numOfRows += 1; numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key; cur->win.skey = key;
...@@ -1005,17 +996,16 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1005,17 +996,16 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
cur->lastKey = key + step; cur->lastKey = key + step;
cur->mixBlock = true; cur->mixBlock = true;
tSkipListIterNext(pCheckInfo->iter); moveToNextRow(pCheckInfo);
} else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it } else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it
tSkipListIterNext(pCheckInfo->iter); moveToNextRow(pCheckInfo);
} else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) || } else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key < tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) { (key < tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = tsArray[pos]; cur->win.skey = tsArray[pos];
} }
int32_t order = ASCENDING_TRAVERSE(pQueryHandle->order) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
int32_t end = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
tSkipListIterNext(pCheckInfo->iter); tSkipListIterNext(pCheckInfo->iter);
} }
...@@ -1093,9 +1083,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1093,9 +1083,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
// if the buffer is not full in case of descending order query, move the data in the front of the buffer // if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (numOfRows < pQueryHandle->outputCapacity) { if (numOfRows < pQueryHandle->outputCapacity) {
int32_t emptySize = pQueryHandle->outputCapacity - numOfRows; int32_t emptySize = pQueryHandle->outputCapacity - numOfRows;
for(int32_t i = 0; i < numOfCols; ++i) {
int32_t requiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns);
for(int32_t i = 0; i < requiredNumOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes); memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
} }
...@@ -1567,9 +1555,6 @@ void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) { ...@@ -1567,9 +1555,6 @@ void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) {
for(int32_t i = 0; i < numOfTables; ++i) { for(int32_t i = 0; i < numOfTables; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
if (pCheckInfo->pTableObj->tableId.uid == 12094628167747) {
printf("abc\n");
}
if (pCheckInfo->pTableObj->lastKey > key) { if (pCheckInfo->pTableObj->lastKey > key) {
key = pCheckInfo->pTableObj->lastKey; key = pCheckInfo->pTableObj->lastKey;
index = i; index = i;
...@@ -1652,9 +1637,9 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int ...@@ -1652,9 +1637,9 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
*skey = TSKEY_INITIAL_VAL; *skey = TSKEY_INITIAL_VAL;
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
STSchema* pSchema = tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj); STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
int32_t numOfTableCols = schemaNCols(pSchema); STable* pTable = pCheckInfo->pTableObj;
do { do {
SDataRow row = getSDataRowInTableMem(pCheckInfo); SDataRow row = getSDataRowInTableMem(pCheckInfo);
if (row == NULL) { if (row == NULL) {
...@@ -1662,10 +1647,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int ...@@ -1662,10 +1647,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
} }
TSKEY key = dataRowKey(row); TSKEY key = dataRowKey(row);
if ((key > maxKey && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key < maxKey && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
if ((key > maxKey && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key < maxKey && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
tsdbTrace("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pQueryHandle, key, pQueryHandle->window.skey, tsdbTrace("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pQueryHandle, key, pQueryHandle->window.skey,
pQueryHandle->window.ekey); pQueryHandle->window.ekey);
...@@ -1677,59 +1659,8 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int ...@@ -1677,59 +1659,8 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
} }
*ekey = key; *ekey = key;
char* pData = NULL; copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, pMeta, numOfCols, pTable);
int32_t i = 0, j = 0;
while(i < numOfCols && j < numOfTableCols) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (pSchema->columns[j].colId < pColInfo->info.colId) {
j++;
continue;
}
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else {
pData = pColInfo->pData + (maxRowsToRead - numOfRows - 1) * pColInfo->info.bytes;
}
if (pSchema->columns[j].colId == pColInfo->info.colId) {
void* value = tdGetRowDataOfCol(row, pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset);
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
memcpy(pData, value, varDataTLen(value));
} else {
memcpy(pData, value, pColInfo->info.bytes);
}
j++;
i++;
} else { // pColInfo->info.colId < pSchema->columns[j].colId, it is a NULL data
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pData, pColInfo->info.type);
} else {
setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
}
i++;
}
}
while (i < numOfCols) { // the remain columns are all null data
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else {
pData = pColInfo->pData + (maxRowsToRead - numOfRows - 1) * pColInfo->info.bytes;
}
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pData, pColInfo->info.type);
} else {
setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
}
i++;
}
if (++numOfRows >= maxRowsToRead) { if (++numOfRows >= maxRowsToRead) {
moveToNextRow(pCheckInfo); moveToNextRow(pCheckInfo);
break; break;
......
...@@ -37,8 +37,8 @@ typedef struct SCacheDataNode { ...@@ -37,8 +37,8 @@ typedef struct SCacheDataNode {
uint64_t expiredTime; // expiredTime expiredTime when this element should be remove from cache uint64_t expiredTime; // expiredTime expiredTime when this element should be remove from cache
uint64_t signature; uint64_t signature;
uint32_t size; // allocated size for current SCacheDataNode uint32_t size; // allocated size for current SCacheDataNode
uint16_t keySize : 15; uint16_t keySize: 15;
bool inTrash : 1; // denote if it is in trash or not bool inTrashCan: 1;// denote if it is in trash or not
T_REF_DECLARE() T_REF_DECLARE()
char *key; char *key;
char data[]; char data[];
...@@ -50,46 +50,49 @@ typedef struct STrashElem { ...@@ -50,46 +50,49 @@ typedef struct STrashElem {
SCacheDataNode *pData; SCacheDataNode *pData;
} STrashElem; } STrashElem;
/*
* to accommodate the old data which has the same key value of new one in hashList
* when an new node is put into cache, if an existed one with the same key:
* 1. if the old one does not be referenced, update it.
* 2. otherwise, move the old one to pTrash, addedTime the new one.
*
* when the node in pTrash does not be referenced, it will be release at the expired expiredTime
*/
typedef struct { typedef struct {
int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included. int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included.
int64_t refreshTime; int64_t refreshTime;
STrashElem * pTrash;
/* void * tmrCtrl;
* to accommodate the old datanode which has the same key value of new one in hashList void * pTimer;
* when an new node is put into cache, if an existed one with the same key: SCacheStatis statistics;
* 1. if the old one does not be referenced, update it. SHashObj * pHashTable;
* 2. otherwise, move the old one to pTrash, addedTime the new one.
*
* when the node in pTrash does not be referenced, it will be release at the expired expiredTime
*/
STrashElem * pTrash;
void * tmrCtrl;
void * pTimer;
SCacheStatis statistics;
SHashObj * pHashTable;
_hash_free_fn_t freeFp; _hash_free_fn_t freeFp;
int numOfElemsInTrash; // number of element in trash uint32_t numOfElemsInTrash; // number of element in trash
int16_t deleting; // set the deleting flag to stop refreshing ASAP. uint8_t deleting; // set the deleting flag to stop refreshing ASAP.
T_REF_DECLARE() pthread_t refreshWorker;
#if defined(LINUX) #if defined(LINUX)
pthread_rwlock_t lock; pthread_rwlock_t lock;
#else #else
pthread_mutex_t lock; pthread_mutex_t lock;
#endif #endif
} SCacheObj; } SCacheObj;
/** /**
* * initialize the cache object
* @param maxSessions maximum slots available for hash elements
* @param tmrCtrl timer ctrl
* @param refreshTime refresh operation interval time, the maximum survival time when one element is expired and * @param refreshTime refresh operation interval time, the maximum survival time when one element is expired and
* not referenced by other objects * not referenced by other objects
* @return * @return
*/ */
SCacheObj *taosCacheInit(void *tmrCtrl, int64_t refreshTimeInSeconds); SCacheObj *taosCacheInit(int64_t refreshTimeInSeconds);
SCacheObj *taosCacheInitWithCb(void *tmrCtrl, int64_t refreshTimeInSeconds, void (*freeCb)(void *data));
/**
* initialize the cache object and set the free object callback function
* @param refreshTimeInSeconds
* @param freeCb
* @return
*/
SCacheObj *taosCacheInitWithCb(int64_t refreshTimeInSeconds, void (*freeCb)(void *data));
/** /**
* add data into cache * add data into cache
......
...@@ -77,31 +77,7 @@ static FORCE_INLINE void taosFreeNode(void *data) { ...@@ -77,31 +77,7 @@ static FORCE_INLINE void taosFreeNode(void *data) {
* @param lifespan total survial expiredTime from now * @param lifespan total survial expiredTime from now
* @return SCacheDataNode * @return SCacheDataNode
*/ */
static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration);
uint64_t duration) {
size_t totalSize = size + sizeof(SCacheDataNode) + keyLen + 1;
SCacheDataNode *pNewNode = calloc(1, totalSize);
if (pNewNode == NULL) {
uError("failed to allocate memory, reason:%s", strerror(errno));
return NULL;
}
memcpy(pNewNode->data, pData, size);
pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + size;
pNewNode->keySize = keyLen;
memcpy(pNewNode->key, key, keyLen);
pNewNode->addedTime = (uint64_t)taosGetTimestampMs();
pNewNode->expiredTime = pNewNode->addedTime + duration;
pNewNode->signature = (uint64_t)pNewNode;
pNewNode->size = (uint32_t)totalSize;
return pNewNode;
}
/** /**
* addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash * addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash
...@@ -109,50 +85,15 @@ static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const ...@@ -109,50 +85,15 @@ static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const
* @param pCacheObj Cache object * @param pCacheObj Cache object
* @param pNode Cache slot object * @param pNode Cache slot object
*/ */
static void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) { static void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode);
if (pNode->inTrash) { /* node is already in trash */
return; /**
} * remove node in trash can
* @param pCacheObj
STrashElem *pElem = calloc(1, sizeof(STrashElem)); * @param pElem
pElem->pData = pNode; */
static void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem);
pElem->next = pCacheObj->pTrash;
if (pCacheObj->pTrash) {
pCacheObj->pTrash->prev = pElem;
}
pElem->prev = NULL;
pCacheObj->pTrash = pElem;
pNode->inTrash = true;
pCacheObj->numOfElemsInTrash++;
uTrace("key:%s %p move to trash, numOfElem in trash:%d", pNode->key, pNode, pCacheObj->numOfElemsInTrash);
}
static void taosRemoveFromTrash(SCacheObj *pCacheObj, STrashElem *pElem) {
if (pElem->pData->signature != (uint64_t)pElem->pData) {
uError("key:sig:%d %p data has been released, ignore", pElem->pData->signature, pElem->pData);
return;
}
pCacheObj->numOfElemsInTrash--;
if (pElem->prev) {
pElem->prev->next = pElem->next;
} else { /* pnode is the header, update header */
pCacheObj->pTrash = pElem->next;
}
if (pElem->next) {
pElem->next->prev = pElem->prev;
}
pElem->pData->signature = 0;
if (pCacheObj->freeFp) pCacheObj->freeFp(pElem->pData->data);
free(pElem->pData);
free(pElem);
}
/** /**
* remove nodes in trash with refCount == 0 in cache * remove nodes in trash with refCount == 0 in cache
* @param pNode * @param pNode
...@@ -160,42 +101,7 @@ static void taosRemoveFromTrash(SCacheObj *pCacheObj, STrashElem *pElem) { ...@@ -160,42 +101,7 @@ static void taosRemoveFromTrash(SCacheObj *pCacheObj, STrashElem *pElem) {
* @param force force model, if true, remove data in trash without check refcount. * @param force force model, if true, remove data in trash without check refcount.
* may cause corruption. So, forece model only applys before cache is closed * may cause corruption. So, forece model only applys before cache is closed
*/ */
static void taosTrashEmpty(SCacheObj *pCacheObj, bool force) { static void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force);
__cache_wr_lock(pCacheObj);
if (pCacheObj->numOfElemsInTrash == 0) {
if (pCacheObj->pTrash != NULL) {
uError("key:inconsistency data in cache, numOfElem in trash:%d", pCacheObj->numOfElemsInTrash);
}
pCacheObj->pTrash = NULL;
__cache_unlock(pCacheObj);
return;
}
STrashElem *pElem = pCacheObj->pTrash;
while (pElem) {
T_REF_VAL_CHECK(pElem->pData);
if (pElem->next == pElem) {
pElem->next = NULL;
}
if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
uTrace("key:%s %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData,
pCacheObj->numOfElemsInTrash - 1);
STrashElem *p = pElem;
pElem = pElem->next;
taosRemoveFromTrash(pCacheObj, p);
} else {
pElem = pElem->next;
}
}
assert(pCacheObj->numOfElemsInTrash >= 0);
__cache_unlock(pCacheObj);
}
/** /**
* release node * release node
...@@ -304,87 +210,20 @@ static FORCE_INLINE SCacheDataNode *taosAddToCacheImpl(SCacheObj *pCacheObj, con ...@@ -304,87 +210,20 @@ static FORCE_INLINE SCacheDataNode *taosAddToCacheImpl(SCacheObj *pCacheObj, con
return pNode; return pNode;
} }
static void doCleanupDataCache(SCacheObj *pCacheObj) { /**
__cache_wr_lock(pCacheObj); * do cleanup the taos cache
* @param pCacheObj
//if (taosHashGetSize(pCacheObj->pHashTable) > 0) { */
taosHashCleanup(pCacheObj->pHashTable); static void doCleanupDataCache(SCacheObj *pCacheObj);
//}
__cache_unlock(pCacheObj);
taosTrashEmpty(pCacheObj, true);
__cache_lock_destroy(pCacheObj);
memset(pCacheObj, 0, sizeof(SCacheObj));
free(pCacheObj);
}
/** /**
* refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime * refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime
* @param handle Cache object handle * @param handle Cache object handle
*/ */
static void taosCacheRefresh(void *handle, void *tmrId) { static void* taosCacheRefresh(void *handle);
SCacheObj *pCacheObj = (SCacheObj *)handle;
if (pCacheObj == NULL || T_REF_VAL_GET(pCacheObj) == 0) {
uTrace("object is destroyed. no refresh retry");
return;
}
int16_t ref = T_REF_INC(pCacheObj);
if (ref == 1) {
T_REF_DEC(pCacheObj);
return;
}
// todo add the ref before start the timer
int32_t num = taosHashGetSize(pCacheObj->pHashTable);
if (num == 0) {
ref = T_REF_DEC(pCacheObj);
if (ref == 0) {
doCleanupDataCache(pCacheObj);
} else {
taosTmrReset(taosCacheRefresh, pCacheObj->refreshTime, pCacheObj, pCacheObj->tmrCtrl, &pCacheObj->pTimer);
}
return;
}
uint64_t expiredTime = taosGetTimestampMs();
pCacheObj->statistics.refreshCount++;
SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
__cache_wr_lock(pCacheObj);
while (taosHashIterNext(pIter)) {
if (pCacheObj->deleting == 1) {
taosHashDestroyIter(pIter);
break;
}
SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) {
taosCacheReleaseNode(pCacheObj, pNode);
}
}
__cache_unlock(pCacheObj);
taosHashDestroyIter(pIter);
taosTrashEmpty(pCacheObj, false); SCacheObj *taosCacheInitWithCb(int64_t refreshTime, void (*freeCb)(void *data)) {
if (refreshTime <= 0) {
ref = T_REF_DEC(pCacheObj);
if (ref == 0) {
doCleanupDataCache(pCacheObj);
return;
} else {
taosTmrReset(taosCacheRefresh, pCacheObj->refreshTime, pCacheObj, pCacheObj->tmrCtrl, &pCacheObj->pTimer);
}
}
SCacheObj *taosCacheInitWithCb(void *tmrCtrl, int64_t refreshTime, void (*freeCb)(void *data)) {
if (tmrCtrl == NULL || refreshTime <= 0) {
return NULL; return NULL;
} }
...@@ -394,7 +233,7 @@ SCacheObj *taosCacheInitWithCb(void *tmrCtrl, int64_t refreshTime, void (*freeCb ...@@ -394,7 +233,7 @@ SCacheObj *taosCacheInitWithCb(void *tmrCtrl, int64_t refreshTime, void (*freeCb
return NULL; return NULL;
} }
pCacheObj->pHashTable = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false); pCacheObj->pHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false);
if (pCacheObj->pHashTable == NULL) { if (pCacheObj->pHashTable == NULL) {
free(pCacheObj); free(pCacheObj);
uError("failed to allocate memory, reason:%s", strerror(errno)); uError("failed to allocate memory, reason:%s", strerror(errno));
...@@ -406,25 +245,27 @@ SCacheObj *taosCacheInitWithCb(void *tmrCtrl, int64_t refreshTime, void (*freeCb ...@@ -406,25 +245,27 @@ SCacheObj *taosCacheInitWithCb(void *tmrCtrl, int64_t refreshTime, void (*freeCb
pCacheObj->freeFp = freeCb; pCacheObj->freeFp = freeCb;
pCacheObj->refreshTime = refreshTime * 1000; pCacheObj->refreshTime = refreshTime * 1000;
pCacheObj->tmrCtrl = tmrCtrl;
taosTmrReset(taosCacheRefresh, pCacheObj->refreshTime, pCacheObj, pCacheObj->tmrCtrl, &pCacheObj->pTimer);
if (__cache_lock_init(pCacheObj) != 0) { if (__cache_lock_init(pCacheObj) != 0) {
taosTmrStopA(&pCacheObj->pTimer);
taosHashCleanup(pCacheObj->pHashTable); taosHashCleanup(pCacheObj->pHashTable);
free(pCacheObj); free(pCacheObj);
uError("failed to init lock, reason:%s", strerror(errno)); uError("failed to init lock, reason:%s", strerror(errno));
return NULL; return NULL;
} }
T_REF_INC(pCacheObj); pthread_attr_t thattr = {0};
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
pthread_create(&pCacheObj->refreshWorker, &thattr, taosCacheRefresh, pCacheObj);
pthread_attr_destroy(&thattr);
return pCacheObj; return pCacheObj;
} }
SCacheObj *taosCacheInit(void *tmrCtrl, int64_t refreshTime) { SCacheObj *taosCacheInit(int64_t refreshTime) {
return taosCacheInitWithCb(tmrCtrl, refreshTime, NULL); return taosCacheInitWithCb(refreshTime, NULL);
} }
void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, size_t dataSize, int duration) { void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, size_t dataSize, int duration) {
...@@ -600,16 +441,188 @@ void taosCacheEmpty(SCacheObj *pCacheObj) { ...@@ -600,16 +441,188 @@ void taosCacheEmpty(SCacheObj *pCacheObj) {
__cache_unlock(pCacheObj); __cache_unlock(pCacheObj);
taosHashDestroyIter(pIter); taosHashDestroyIter(pIter);
taosTrashEmpty(pCacheObj, false); taosTrashCanEmpty(pCacheObj, false);
} }
void taosCacheCleanup(SCacheObj *pCacheObj) { void taosCacheCleanup(SCacheObj *pCacheObj) {
if (pCacheObj == NULL) { if (pCacheObj == NULL) {
return; return;
} }
int32_t ref = T_REF_DEC(pCacheObj); pCacheObj->deleting = 1;
if (ref == 0) { pthread_join(pCacheObj->refreshWorker, NULL);
doCleanupDataCache(pCacheObj);
doCleanupDataCache(pCacheObj);
}
SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size,
uint64_t duration) {
size_t totalSize = size + sizeof(SCacheDataNode) + keyLen + 1;
SCacheDataNode *pNewNode = calloc(1, totalSize);
if (pNewNode == NULL) {
uError("failed to allocate memory, reason:%s", strerror(errno));
return NULL;
}
memcpy(pNewNode->data, pData, size);
pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + size;
pNewNode->keySize = keyLen;
memcpy(pNewNode->key, key, keyLen);
pNewNode->addedTime = (uint64_t)taosGetTimestampMs();
pNewNode->expiredTime = pNewNode->addedTime + duration;
pNewNode->signature = (uint64_t)pNewNode;
pNewNode->size = (uint32_t)totalSize;
return pNewNode;
}
void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
if (pNode->inTrashCan) { /* node is already in trash */
return;
}
STrashElem *pElem = calloc(1, sizeof(STrashElem));
pElem->pData = pNode;
pElem->next = pCacheObj->pTrash;
if (pCacheObj->pTrash) {
pCacheObj->pTrash->prev = pElem;
}
pElem->prev = NULL;
pCacheObj->pTrash = pElem;
pNode->inTrashCan = true;
pCacheObj->numOfElemsInTrash++;
uTrace("key:%s %p move to trash, numOfElem in trash:%d", pNode->key, pNode, pCacheObj->numOfElemsInTrash);
}
void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) {
if (pElem->pData->signature != (uint64_t)pElem->pData) {
uError("key:sig:%d %p data has been released, ignore", pElem->pData->signature, pElem->pData);
return;
}
pCacheObj->numOfElemsInTrash--;
if (pElem->prev) {
pElem->prev->next = pElem->next;
} else { /* pnode is the header, update header */
pCacheObj->pTrash = pElem->next;
}
if (pElem->next) {
pElem->next->prev = pElem->prev;
}
pElem->pData->signature = 0;
if (pCacheObj->freeFp) pCacheObj->freeFp(pElem->pData->data);
free(pElem->pData);
free(pElem);
}
void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
__cache_wr_lock(pCacheObj);
if (pCacheObj->numOfElemsInTrash == 0) {
if (pCacheObj->pTrash != NULL) {
uError("key:inconsistency data in cache, numOfElem in trash:%d", pCacheObj->numOfElemsInTrash);
}
pCacheObj->pTrash = NULL;
__cache_unlock(pCacheObj);
return;
} }
STrashElem *pElem = pCacheObj->pTrash;
while (pElem) {
T_REF_VAL_CHECK(pElem->pData);
if (pElem->next == pElem) {
pElem->next = NULL;
}
if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
uTrace("key:%s %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData,
pCacheObj->numOfElemsInTrash - 1);
STrashElem *p = pElem;
pElem = pElem->next;
taosRemoveFromTrashCan(pCacheObj, p);
} else {
pElem = pElem->next;
}
}
assert(pCacheObj->numOfElemsInTrash >= 0);
__cache_unlock(pCacheObj);
}
void doCleanupDataCache(SCacheObj *pCacheObj) {
__cache_wr_lock(pCacheObj);
taosHashCleanup(pCacheObj->pHashTable);
__cache_unlock(pCacheObj);
taosTrashCanEmpty(pCacheObj, true);
__cache_lock_destroy(pCacheObj);
memset(pCacheObj, 0, sizeof(SCacheObj));
free(pCacheObj);
} }
void* taosCacheRefresh(void *handle) {
SCacheObj *pCacheObj = (SCacheObj *)handle;
if (pCacheObj == NULL) {
uTrace("object is destroyed. no refresh retry");
return NULL;
}
const int32_t SLEEP_DURATION = 500; //500 ms
int64_t totalTick = pCacheObj->refreshTime / SLEEP_DURATION;
int64_t count = 0;
while(1) {
taosMsleep(500);
// check if current cache object will be deleted every 500ms.
if (pCacheObj->deleting) {
break;
}
if (++count < totalTick) {
continue;
}
// reset the count value
count = 0;
size_t num = taosHashGetSize(pCacheObj->pHashTable);
if (num == 0) {
continue;
}
uint64_t expiredTime = taosGetTimestampMs();
pCacheObj->statistics.refreshCount++;
SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
__cache_wr_lock(pCacheObj);
while (taosHashIterNext(pIter)) {
SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) {
taosCacheReleaseNode(pCacheObj, pNode);
}
}
__cache_unlock(pCacheObj);
taosHashDestroyIter(pIter);
taosTrashCanEmpty(pCacheObj, false);
}
return NULL;
}
\ No newline at end of file
...@@ -19,8 +19,7 @@ int32_t tsMaxMeterConnections = 200; ...@@ -19,8 +19,7 @@ int32_t tsMaxMeterConnections = 200;
// test cache // test cache
TEST(testCase, client_cache_test) { TEST(testCase, client_cache_test) {
const int32_t REFRESH_TIME_IN_SEC = 2; const int32_t REFRESH_TIME_IN_SEC = 2;
void* tscTmr = taosTmrInit (tsMaxMgmtConnections*2, 200, 6000, "TSC"); SCacheObj* tscCacheHandle = taosCacheInit(REFRESH_TIME_IN_SEC);
SCacheObj* tscCacheHandle = taosCacheInit(tscTmr, REFRESH_TIME_IN_SEC);
const char* key1 = "test1"; const char* key1 = "test1";
char data1[] = "test11"; char data1[] = "test11";
...@@ -106,9 +105,7 @@ TEST(testCase, client_cache_test) { ...@@ -106,9 +105,7 @@ TEST(testCase, client_cache_test) {
TEST(testCase, cache_resize_test) { TEST(testCase, cache_resize_test) {
const int32_t REFRESH_TIME_IN_SEC = 2; const int32_t REFRESH_TIME_IN_SEC = 2;
void* tscTmr = taosTmrInit (1000*2, 200, 6000, "TSC"); auto* pCache = taosCacheInit(REFRESH_TIME_IN_SEC);
auto* pCache = taosCacheInit(tscTmr, REFRESH_TIME_IN_SEC);
char key[256] = {0}; char key[256] = {0};
char data[1024] = "abcdefghijk"; char data[1024] = "abcdefghijk";
......
...@@ -70,7 +70,7 @@ run general/http/restful_insert.sim ...@@ -70,7 +70,7 @@ run general/http/restful_insert.sim
run general/http/restful_limit.sim run general/http/restful_limit.sim
run general/http/restful_full.sim run general/http/restful_full.sim
run general/http/prepare.sim run general/http/prepare.sim
run general/http/telegraf.sim # run general/http/telegraf.sim
# run general/http/grafana_bug.sim # run general/http/grafana_bug.sim
# run general/http/grafana.sim # run general/http/grafana.sim
run general/import/basic.sim run general/import/basic.sim
......
...@@ -29,7 +29,7 @@ int main(int argc, char *argv[]) { ...@@ -29,7 +29,7 @@ int main(int argc, char *argv[]) {
for (int i = 1; i < argc; ++i) { for (int i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-c") == 0 && i < argc - 1) { if (strcmp(argv[i], "-c") == 0 && i < argc - 1) {
strncpy(configDir, argv[++i], MAX_FILE_NAME_LEN); tstrncpy(configDir, argv[++i], MAX_FILE_NAME_LEN);
} else if (strcmp(argv[i], "-f") == 0 && i < argc - 1) { } else if (strcmp(argv[i], "-f") == 0 && i < argc - 1) {
strcpy(scriptFile, argv[++i]); strcpy(scriptFile, argv[++i]);
} else if (strcmp(argv[i], "-a") == 0) { } else if (strcmp(argv[i], "-a") == 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册