提交 313a07a7 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/3.0_interval_hash_optimize

...@@ -121,6 +121,7 @@ extern SDiskCfg tsDiskCfg[]; ...@@ -121,6 +121,7 @@ extern SDiskCfg tsDiskCfg[];
// udf // udf
extern bool tsStartUdfd; extern bool tsStartUdfd;
extern char tsUdfdResFuncs[];
// schemaless // schemaless
extern char tsSmlChildTableName[]; extern char tsSmlChildTableName[];
......
...@@ -619,6 +619,8 @@ int32_t* taosGetErrno(); ...@@ -619,6 +619,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156) #define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156)
#define TSDB_CODE_RSMA_INVALID_SCHEMA TAOS_DEF_ERROR_CODE(0, 0x3157) #define TSDB_CODE_RSMA_INVALID_SCHEMA TAOS_DEF_ERROR_CODE(0, 0x3157)
#define TSDB_CODE_RSMA_REGEX_MATCH TAOS_DEF_ERROR_CODE(0, 0x3158) #define TSDB_CODE_RSMA_REGEX_MATCH TAOS_DEF_ERROR_CODE(0, 0x3158)
#define TSDB_CODE_RSMA_STREAM_STATE_OPEN TAOS_DEF_ERROR_CODE(0, 0x3159)
#define TSDB_CODE_RSMA_STREAM_STATE_COMMIT TAOS_DEF_ERROR_CODE(0, 0x3160)
//index //index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200) #define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
......
...@@ -164,6 +164,7 @@ int32_t tsTtlUnit = 86400; ...@@ -164,6 +164,7 @@ int32_t tsTtlUnit = 86400;
int32_t tsTtlPushInterval = 86400; int32_t tsTtlPushInterval = 86400;
int32_t tsGrantHBInterval = 60; int32_t tsGrantHBInterval = 60;
int32_t tsUptimeInterval = 300; // seconds int32_t tsUptimeInterval = 300; // seconds
char tsUdfdResFuncs[1024] = ""; // udfd resident funcs that teardown when udfd exits
#ifndef _STORAGE #ifndef _STORAGE
int32_t taosSetTfsCfg(SConfig *pCfg) { int32_t taosSetTfsCfg(SConfig *pCfg) {
...@@ -386,9 +387,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -386,9 +387,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4); tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4);
if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1;
// tsNumOfQnodeFetchThreads = tsNumOfCores / 2; // tsNumOfQnodeFetchThreads = tsNumOfCores / 2;
// tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); // tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4);
// if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1; // if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1;
tsNumOfSnodeSharedThreads = tsNumOfCores / 4; tsNumOfSnodeSharedThreads = tsNumOfCores / 4;
tsNumOfSnodeSharedThreads = TRANGE(tsNumOfSnodeSharedThreads, 2, 4); tsNumOfSnodeSharedThreads = TRANGE(tsNumOfSnodeSharedThreads, 2, 4);
...@@ -423,6 +424,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -423,6 +424,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, 0) != 0) return -1;
if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1; if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1;
if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1;
GRANT_CFG_ADD; GRANT_CFG_ADD;
return 0; return 0;
} }
...@@ -529,15 +531,15 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { ...@@ -529,15 +531,15 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem->stype = stype; pItem->stype = stype;
} }
/* /*
pItem = cfgGetItem(tsCfg, "numOfQnodeFetchThreads"); pItem = cfgGetItem(tsCfg, "numOfQnodeFetchThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfQnodeFetchThreads = numOfCores / 2; tsNumOfQnodeFetchThreads = numOfCores / 2;
tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4);
pItem->i32 = tsNumOfQnodeFetchThreads; pItem->i32 = tsNumOfQnodeFetchThreads;
pItem->stype = stype; pItem->stype = stype;
} }
*/ */
pItem = cfgGetItem(tsCfg, "numOfSnodeSharedThreads"); pItem = cfgGetItem(tsCfg, "numOfSnodeSharedThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
...@@ -695,7 +697,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -695,7 +697,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32; tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32;
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
// tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; // tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32; tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32;
tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64; tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64;
...@@ -720,6 +722,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -720,6 +722,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32; tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32;
tsStartUdfd = cfgGetItem(pCfg, "udf")->bval; tsStartUdfd = cfgGetItem(pCfg, "udf")->bval;
tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs));
if (tsQueryBufferSize >= 0) { if (tsQueryBufferSize >= 0) {
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
...@@ -944,10 +947,10 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { ...@@ -944,10 +947,10 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
} else if (strcasecmp("numOfQnodeQueryThreads", name) == 0) { } else if (strcasecmp("numOfQnodeQueryThreads", name) == 0) {
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
/* /*
} else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) { } else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) {
tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
*/ */
} else if (strcasecmp("numOfSnodeSharedThreads", name) == 0) { } else if (strcasecmp("numOfSnodeSharedThreads", name) == 0) {
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
} else if (strcasecmp("numOfSnodeUniqueThreads", name) == 0) { } else if (strcasecmp("numOfSnodeUniqueThreads", name) == 0) {
......
...@@ -51,7 +51,6 @@ target_sources( ...@@ -51,7 +51,6 @@ target_sources(
"src/tsdb/tsdbCacheRead.c" "src/tsdb/tsdbCacheRead.c"
"src/tsdb/tsdbRetention.c" "src/tsdb/tsdbRetention.c"
"src/tsdb/tsdbDiskData.c" "src/tsdb/tsdbDiskData.c"
"src/tsdb/tsdbCompress.c"
"src/tsdb/tsdbCompact.c" "src/tsdb/tsdbCompact.c"
"src/tsdb/tsdbMergeTree.c" "src/tsdb/tsdbMergeTree.c"
......
...@@ -146,6 +146,7 @@ struct SRSmaInfoItem { ...@@ -146,6 +146,7 @@ struct SRSmaInfoItem {
uint16_t nScanned; uint16_t nScanned;
int32_t maxDelay; // ms int32_t maxDelay; // ms
tmr_h tmrId; tmr_h tmrId;
void *pStreamState;
}; };
struct SRSmaInfo { struct SRSmaInfo {
...@@ -224,8 +225,10 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con ...@@ -224,8 +225,10 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type); int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer); int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer);
void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName); void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t version, char *outputName);
void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName); void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char *path, char *outputName);
void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName);
void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName);
static FORCE_INLINE void tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { static FORCE_INLINE void tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
int32_t ref = T_REF_INC(pRSmaInfo); int32_t ref = T_REF_INC(pRSmaInfo);
......
...@@ -92,6 +92,18 @@ void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char *path, ...@@ -92,6 +92,18 @@ void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char *path,
tdGetVndFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName); tdGetVndFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName);
} }
void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName) {
tdGetVndDirName(vgId, path, VNODE_RSMA_DIR, true, outputName);
int32_t rsmaLen = strlen(outputName);
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8, level);
}
void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName) {
tdGetVndDirName(vgId, path, VNODE_RSMA_DIR, true, outputName);
int32_t rsmaLen = strlen(outputName);
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi64 "%s%" PRIi8, suid, TD_DIRSEP, level);
}
static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) { static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) {
return lenWithHead - RSMA_QTASKINFO_HEAD_LEN; return lenWithHead - RSMA_QTASKINFO_HEAD_LEN;
} }
...@@ -130,6 +142,10 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { ...@@ -130,6 +142,10 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
taosTmrStopA(&pItem->tmrId); taosTmrStopA(&pItem->tmrId);
} }
if (isDeepFree && pItem->pStreamState) {
streamStateClose(pItem->pStreamState);
}
if (isDeepFree && pInfo->taskInfo[i]) { if (isDeepFree && pInfo->taskInfo[i]) {
tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1); tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1);
} else { } else {
...@@ -290,12 +306,33 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat ...@@ -290,12 +306,33 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
SRetention *pRetention = SMA_RETENTION(pSma); SRetention *pRetention = SMA_RETENTION(pSma);
STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma); STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma);
SVnode *pVnode = pSma->pVnode; SVnode *pVnode = pSma->pVnode;
char taskInfDir[TSDB_FILENAME_LEN] = {0};
void *pStreamState = NULL;
// set the backend of stream state
tdRSmaQTaskInfoGetFullPathEx(TD_VID(pVnode), pRSmaInfo->suid, idx + 1, tfsGetPrimaryPath(pVnode->pTfs), taskInfDir);
if (!taosCheckExistFile(taskInfDir)) {
char *s = strdup(taskInfDir);
if (taosMulMkDir(taosDirName(s)) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(s);
return TSDB_CODE_FAILED;
}
taosMemoryFree(s);
}
pStreamState = streamStateOpen(taskInfDir, NULL, true);
if (!pStreamState) {
terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN;
return TSDB_CODE_FAILED;
}
SReadHandle handle = { SReadHandle handle = {
.meta = pVnode->pMeta, .meta = pVnode->pMeta,
.vnode = pVnode, .vnode = pVnode,
.initTqReader = 1, .initTqReader = 1,
.pStateBackend = pStreamState,
}; };
pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle); pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle);
if (!pRSmaInfo->taskInfo[idx]) { if (!pRSmaInfo->taskInfo[idx]) {
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE; terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
...@@ -303,6 +340,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat ...@@ -303,6 +340,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
} }
SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]); SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE; // fetch the data when reboot pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE; // fetch the data when reboot
pItem->pStreamState = pStreamState;
if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) { if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) {
int64_t msInterval = int64_t msInterval =
convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND); convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND);
...@@ -322,7 +360,6 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat ...@@ -322,7 +360,6 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
pItem->fetchLevel = pItem->level; pItem->fetchLevel = pItem->level;
taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId); taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
smaInfo("vgId:%d, item:%p table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64 smaInfo("vgId:%d, item:%p table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64
", finally maxdelay:%" PRIi32, ", finally maxdelay:%" PRIi32,
...@@ -1226,16 +1263,17 @@ int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer) ...@@ -1226,16 +1263,17 @@ int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer)
if (tdRSmaRestoreQTaskInfoInit(pSma, &nTables) < 0) { if (tdRSmaRestoreQTaskInfoInit(pSma, &nTables) < 0) {
goto _err; goto _err;
} }
if (nTables <= 0) { if (nTables <= 0) {
smaDebug("vgId:%d, no need to restore rsma task %" PRIi8 " since no tables", SMA_VID(pSma), type); smaDebug("vgId:%d, no need to restore rsma task %" PRIi8 " since no tables", SMA_VID(pSma), type);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#if 0
// step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore // step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore
if (tdRSmaRestoreQTaskInfoReload(pSma, type, qtaskFileVer) < 0) { if (tdRSmaRestoreQTaskInfoReload(pSma, type, qtaskFileVer) < 0) {
goto _err; goto _err;
} }
#endif
// step 3: reload ts data from checkpoint // step 3: reload ts data from checkpoint
if (tdRSmaRestoreTSDataReload(pSma) < 0) { if (tdRSmaRestoreTSDataReload(pSma) < 0) {
...@@ -1440,6 +1478,50 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIte ...@@ -1440,6 +1478,50 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIte
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
SSma *pSma = pRSmaStat->pSma;
SVnode *pVnode = pSma->pVnode;
int32_t vid = SMA_VID(pSma);
if (taosHashGetSize(pInfoHash) <= 0) {
return TSDB_CODE_SUCCESS;
}
int64_t fsMaxVer = tdRSmaFSMaxVer(pSma, pRSmaStat);
if (pRSmaStat->commitAppliedVer <= fsMaxVer) {
smaDebug("vgId:%d, rsma persist, no need as applied %" PRIi64 " not larger than fsMaxVer %" PRIi64, vid,
pRSmaStat->commitAppliedVer, fsMaxVer);
return TSDB_CODE_SUCCESS;
}
void *infoHash = NULL;
while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
continue;
}
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i);
if (pItem && pItem->pStreamState) {
if (streamStateCommit(pItem->pStreamState) < 0) {
terrno = TSDB_CODE_RSMA_STREAM_STATE_COMMIT;
goto _err;
}
smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 " level %d", vid, pRSmaInfo->suid,
i + 1);
}
}
}
return TSDB_CODE_SUCCESS;
_err:
smaError("vgId:%d, rsma persist failed since %s", vid, terrstr());
return TSDB_CODE_FAILED;
}
#if 0
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
SSma *pSma = pRSmaStat->pSma; SSma *pSma = pRSmaStat->pSma;
SVnode *pVnode = pSma->pVnode; SVnode *pVnode = pSma->pVnode;
...@@ -1459,7 +1541,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { ...@@ -1459,7 +1541,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
int64_t fsMaxVer = tdRSmaFSMaxVer(pSma, pRSmaStat); int64_t fsMaxVer = tdRSmaFSMaxVer(pSma, pRSmaStat);
if (pRSmaStat->commitAppliedVer <= fsMaxVer) { if (pRSmaStat->commitAppliedVer <= fsMaxVer) {
smaDebug("vgId:%d, rsma persist, no need as applied %" PRIi64 " not larger than fsMaxVer %" PRIi64, vid, smaDebug("vgId:%d, rsma persist, no need as applied %" PRIi64 " not larger than fsMaxVer %" PRIi64, vid,
pRSmaStat->commitAppliedVer, fsMaxVer); pRSmaStat->commitAppliedVer, fsMaxVer);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1579,6 +1661,8 @@ _err: ...@@ -1579,6 +1661,8 @@ _err:
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
#endif
/** /**
* @brief trigger to get rsma result in async mode * @brief trigger to get rsma result in async mode
* *
...@@ -1891,7 +1975,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { ...@@ -1891,7 +1975,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
if ((pEnv->flag & SMA_ENV_FLG_CLOSE) && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) { if ((pEnv->flag & SMA_ENV_FLG_CLOSE) && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) {
smaDebug("vgId:%d, exec task end, flag:%" PRIi8 ", nBufItems:%" PRIi64, SMA_VID(pSma), pEnv->flag, smaDebug("vgId:%d, exec task end, flag:%" PRIi8 ", nBufItems:%" PRIi64, SMA_VID(pSma), pEnv->flag,
atomic_load_64(&pRSmaStat->nBufItems)); atomic_load_64(&pRSmaStat->nBufItems));
break; break;
} }
} }
......
...@@ -141,11 +141,8 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con ...@@ -141,11 +141,8 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
if (pRsp->withSchema) { ASSERT(!pRsp->withSchema);
ASSERT(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum); ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0);
} else {
ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0);
}
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
if (pRsp->blockNum > 0) { if (pRsp->blockNum > 0) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdb.h"
// Integer =====================================================
typedef struct {
int8_t rawCopy;
int64_t prevVal;
int32_t nVal;
int32_t nBuf;
uint8_t *pBuf;
} SIntCompressor;
#define I64_SAFE_ADD(a, b) (((a) >= 0 && (b) <= INT64_MAX - (b)) || ((a) < 0 && (b) >= INT64_MIN - (a)))
#define SIMPLE8B_MAX ((uint64_t)1152921504606846974LL)
static int32_t tsdbCmprI64(SIntCompressor *pCompressor, int64_t val) {
int32_t code = 0;
// raw copy
if (pCompressor->rawCopy) {
memcpy(pCompressor->pBuf + pCompressor->nBuf, &val, sizeof(val));
pCompressor->nBuf += sizeof(val);
pCompressor->nVal++;
goto _exit;
}
if (!I64_SAFE_ADD(val, pCompressor->prevVal)) {
pCompressor->rawCopy = 1;
// TODO: decompress and copy
pCompressor->nVal++;
goto _exit;
}
int64_t diff = val - pCompressor->prevVal;
uint8_t zigzag = ZIGZAGE(int64_t, diff);
if (zigzag >= SIMPLE8B_MAX) {
pCompressor->rawCopy = 1;
// TODO: decompress and copy
pCompressor->nVal++;
goto _exit;
}
_exit:
return code;
}
// Timestamp =====================================================
// Float =====================================================
\ No newline at end of file
...@@ -462,6 +462,7 @@ typedef struct SPartitionDataInfo { ...@@ -462,6 +462,7 @@ typedef struct SPartitionDataInfo {
typedef struct STimeWindowAggSupp { typedef struct STimeWindowAggSupp {
int8_t calTrigger; int8_t calTrigger;
int64_t waterMark; int64_t waterMark;
int64_t deleteMark;
TSKEY maxTs; TSKEY maxTs;
TSKEY minTs; TSKEY minTs;
SColumnInfoData timeWindowData; // query time window info for scalar function execution. SColumnInfoData timeWindowData; // query time window info for scalar function execution.
...@@ -1090,7 +1091,7 @@ int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupI ...@@ -1090,7 +1091,7 @@ int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupI
int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
int32_t releaseOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult); int32_t releaseOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult);
int32_t saveOutput(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult, int32_t resSize); int32_t saveOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -4166,9 +4166,8 @@ int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupI ...@@ -4166,9 +4166,8 @@ int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupI
}; };
char* value = NULL; char* value = NULL;
int32_t size = pAggSup->resultRowSize; int32_t size = pAggSup->resultRowSize;
/*if (streamStateGet(pTaskInfo->streamInfo.pState, &key, (void**)&value, &size) < 0) {*/
/*value = taosMemoryCalloc(1, size);*/ tSimpleHashPut(pAggSup->pResultRowHashTable, &key, sizeof(SWinKey), NULL, 0);
/*}*/
if (streamStateAddIfNotExist(pTaskInfo->streamInfo.pState, &key, (void**)&value, &size) < 0) { if (streamStateAddIfNotExist(pTaskInfo->streamInfo.pState, &key, (void**)&value, &size) < 0) {
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
...@@ -4186,7 +4185,7 @@ int32_t releaseOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pR ...@@ -4186,7 +4185,7 @@ int32_t releaseOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pR
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t saveOutput(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult, int32_t resSize) { int32_t saveOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult, int32_t resSize) {
streamStatePut(pTaskInfo->streamInfo.pState, pKey, pResult, resSize); streamStatePut(pTaskInfo->streamInfo.pState, pKey, pResult, resSize);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -4259,8 +4258,9 @@ int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock ...@@ -4259,8 +4258,9 @@ int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock
} }
} }
} }
releaseOutputBuf(pTaskInfo, &key, pRow);
pBlock->info.rows += pRow->numOfRows; pBlock->info.rows += pRow->numOfRows;
releaseOutputBuf(pTaskInfo, &key, pRow);
} }
blockDataUpdateTsWindow(pBlock, 0); blockDataUpdateTsWindow(pBlock, 0);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -867,6 +867,10 @@ static int32_t saveWinResultRow(SResultRow* result, uint64_t groupId, SHashObj* ...@@ -867,6 +867,10 @@ static int32_t saveWinResultRow(SResultRow* result, uint64_t groupId, SHashObj*
return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap); return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap);
} }
static int32_t saveWinResultInfo(TSKEY ts, uint64_t groupId, SHashObj* pUpdatedMap) {
return saveWinResult(ts, -1, -1, groupId, pUpdatedMap);
}
static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) { static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) {
return saveResult(result->win.skey, result->pageId, result->offset, groupId, pUpdated); return saveResult(result->win.skey, result->pageId, result->offset, groupId, pUpdated);
} }
...@@ -918,12 +922,16 @@ static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) { ...@@ -918,12 +922,16 @@ static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
} }
} }
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup) { bool isOverdue(TSKEY ekey, STimeWindowAggSupp* pTwSup) {
ASSERT(pSup->maxTs == INT64_MIN || pSup->maxTs > 0); ASSERT(pTwSup->maxTs == INT64_MIN || pTwSup->maxTs > 0);
return pSup->maxTs != INT64_MIN && ts < pSup->maxTs - pSup->waterMark; return pTwSup->maxTs != INT64_MIN && ekey < pTwSup->maxTs - pTwSup->waterMark;
} }
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup) { return isOverdue(pWin->ekey, pSup); } bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pTwSup) { return isOverdue(pWin->ekey, pTwSup); }
bool needDeleteWindowBuf(STimeWindow* pWin, STimeWindowAggSupp* pTwSup) {
return pTwSup->maxTs != INT64_MIN && pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark;
}
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
int32_t scanFlag) { int32_t scanFlag) {
...@@ -1374,6 +1382,41 @@ static bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, ...@@ -1374,6 +1382,41 @@ static bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData,
return true; return true;
} }
static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, int32_t numOfOutput) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SWinKey key = {.ts = ts, .groupId = groupId};
tSimpleHashRemove(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey));
streamStateDel(pOperator->pTaskInfo->streamInfo.pState, &key);
return true;
}
static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, int32_t numOfOutput, SSDataBlock* pBlock,
SArray* pUpWins, SHashObj* pUpdatedMap) {
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
TSKEY* endTsCols = (TSKEY*)pEndTsCol->pData;
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
uint64_t* pGpDatas = (uint64_t*)pGpCol->pData;
for (int32_t i = 0; i < pBlock->info.rows; i++) {
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC);
while (win.ekey <= endTsCols[i]) {
uint64_t winGpId = pGpDatas[i];
bool res = doDeleteWindow(pOperator, win.skey, winGpId, numOfOutput);
SWinKey winRes = {.ts = win.skey, .groupId = winGpId};
if (pUpWins && res) {
taosArrayPush(pUpWins, &winRes);
}
if (pUpdatedMap) {
taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey));
}
getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
}
}
}
bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId) { bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId) {
size_t bytes = sizeof(TSKEY); size_t bytes = sizeof(TSKEY);
SET_RES_WINDOW_KEY(pAggSup->keyBuf, &ts, bytes, groupId); SET_RES_WINDOW_KEY(pAggSup->keyBuf, &ts, bytes, groupId);
...@@ -1383,8 +1426,6 @@ bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId) ...@@ -1383,8 +1426,6 @@ bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId)
// window has been closed // window has been closed
return false; return false;
} }
// SFilePage* bufPage = getBufPage(pAggSup->pResultBuf, p1->pageId);
// dBufSetBufPageRecycled(pAggSup->pResultBuf, bufPage);
tSimpleHashRemove(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); tSimpleHashRemove(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
return true; return true;
} }
...@@ -1512,6 +1553,49 @@ static int32_t closeIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pSup ...@@ -1512,6 +1553,49 @@ static int32_t closeIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pSup
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval,
SHashObj* pPullDataMap, SHashObj* closeWins, SOperatorInfo* pOperator) {
qDebug("===stream===close interval window");
void* pIte = NULL;
size_t keyLen = 0;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
void* key = tSimpleHashGetKey(pIte, &keyLen);
SWinKey* pWinKey = (SWinKey*)key;
void* chIds = taosHashGet(pPullDataMap, pWinKey, sizeof(SWinKey));
STimeWindow win = {
.skey = pWinKey->ts,
.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1,
};
if (isCloseWindow(&win, pTwSup)) {
if (chIds && pPullDataMap) {
SArray* chAy = *(SArray**)chIds;
int32_t size = taosArrayGetSize(chAy);
qDebug("===stream===window %" PRId64 " wait child size:%d", pWinKey->ts, size);
for (int32_t i = 0; i < size; i++) {
qDebug("===stream===window %" PRId64 " wait child id:%d", pWinKey->ts, *(int32_t*)taosArrayGet(chAy, i));
}
continue;
} else if (pPullDataMap) {
qDebug("===stream===close window %" PRId64, pWinKey->ts);
}
if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
int32_t code = saveWinResultInfo(pWinKey->ts, pWinKey->groupId, closeWins);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
tSimpleHashIterateRemove(pHashMap, pWinKey, sizeof(SWinKey), &pIte, &iter);
if (needDeleteWindowBuf(&win, pTwSup)) {
streamStateDel(pOperator->pTaskInfo->streamInfo.pState, pWinKey);
}
}
}
return TSDB_CODE_SUCCESS;
}
static void closeChildIntervalWindow(SArray* pChildren, TSKEY maxTs) { static void closeChildIntervalWindow(SArray* pChildren, TSKEY maxTs) {
int32_t size = taosArrayGetSize(pChildren); int32_t size = taosArrayGetSize(pChildren);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
...@@ -4918,8 +5002,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR ...@@ -4918,8 +5002,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
SExprSupp* pSup = &pOperatorInfo->exprSupp; SExprSupp* pSup = &pOperatorInfo->exprSupp;
SInterval* pInterval = &iaInfo->interval; SInterval* pInterval = &iaInfo->interval;
int32_t startPos = 0; int32_t startPos = 0;
int64_t* tsCols = extractTsCol(pBlock, iaInfo); int64_t* tsCols = extractTsCol(pBlock, iaInfo);
TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols); TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
...@@ -4938,7 +5022,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR ...@@ -4938,7 +5022,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
win.skey = miaInfo->curTs; win.skey = miaInfo->curTs;
win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup); int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) { if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
T_LONG_JMP(pTaskInfo->env, ret); T_LONG_JMP(pTaskInfo->env, ret);
} }
...@@ -4963,7 +5047,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR ...@@ -4963,7 +5047,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
currWin.ekey = taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; currWin.ekey = taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
startPos = currPos; startPos = currPos;
ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup); ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) { if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
T_LONG_JMP(pTaskInfo->env, ret); T_LONG_JMP(pTaskInfo->env, ret);
} }
...@@ -5032,7 +5116,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5032,7 +5116,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
pMiaInfo->prefetchedBlock = pBlock; pMiaInfo->prefetchedBlock = pBlock;
cleanupAfterGroupResultGen(pMiaInfo, pRes); cleanupAfterGroupResultGen(pMiaInfo, pRes);
break; break;
} else { } else {
// continue // continue
} }
} }
...@@ -5197,7 +5281,7 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table ...@@ -5197,7 +5281,7 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet( SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(
iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(p1 != NULL); ASSERT(p1 != NULL);
// finalizeResultRows(iaInfo->aggSup.pResultBuf, p1, pResultBlock, pTaskInfo); // finalizeResultRows(iaInfo->aggSup.pResultBuf, p1, pResultBlock, pTaskInfo);
tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -5222,7 +5306,7 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t ...@@ -5222,7 +5306,7 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t
STimeWindow* prevWin = &prevGrpWin->window; STimeWindow* prevWin = &prevGrpWin->window;
if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) { if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
// finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock); // finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock);
tdListPopNode(miaInfo->groupIntervals, listNode); tdListPopNode(miaInfo->groupIntervals, listNode);
} }
} }
...@@ -5382,7 +5466,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5382,7 +5466,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
if (listNode != NULL) { if (listNode != NULL) {
SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data); SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
// finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes); // finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes);
pRes->info.groupId = grpWin->groupId; pRes->info.groupId = grpWin->groupId;
} }
} }
...@@ -5591,7 +5675,7 @@ static void doStreamIntervalAggImpl2(SOperatorInfo* pOperatorInfo, SSDataBlock* ...@@ -5591,7 +5675,7 @@ static void doStreamIntervalAggImpl2(SOperatorInfo* pOperatorInfo, SSDataBlock*
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC); TSDB_ORDER_ASC);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) {
saveWinResultRow(pResult, tableGroupId, pUpdatedMap); saveWinResultInfo(pResult->win.skey, tableGroupId, pUpdatedMap);
} }
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
...@@ -5600,7 +5684,7 @@ static void doStreamIntervalAggImpl2(SOperatorInfo* pOperatorInfo, SSDataBlock* ...@@ -5600,7 +5684,7 @@ static void doStreamIntervalAggImpl2(SOperatorInfo* pOperatorInfo, SSDataBlock*
.ts = nextWin.skey, .ts = nextWin.skey,
.groupId = tableGroupId, .groupId = tableGroupId,
}; };
saveOutput(pTaskInfo, &key, pResult, pInfo->aggSup.resultRowSize); saveOutputBuf(pTaskInfo, &key, pResult, pInfo->aggSup.resultRowSize);
releaseOutputBuf(pTaskInfo, &key, pResult); releaseOutputBuf(pTaskInfo, &key, pResult);
int32_t prevEndPos = (forwardRows - 1) * step + startPos; int32_t prevEndPos = (forwardRows - 1) * step + startPos;
ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0); ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0);
...@@ -5645,7 +5729,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5645,7 +5729,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResult(pOperator, pInfo->binfo.pRes, &pInfo->groupResInfo);
// doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) { if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
qDebug("===stream===single interval is done"); qDebug("===stream===single interval is done");
...@@ -5671,13 +5756,14 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5671,13 +5756,14 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
printDataBlock(pBlock, "single interval recv"); printDataBlock(pBlock, "single interval recv");
if (pBlock->info.type == STREAM_CLEAR) { if (pBlock->info.type == STREAM_CLEAR) {
doClearWindows(&pInfo->aggSup, &pOperator->exprSupp, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, NULL, NULL);
NULL);
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
continue; continue;
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) { } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
doDeleteSpecifyIntervalWindow(&pInfo->aggSup, &pInfo->twAggSup, pBlock, pInfo->pDelWins, &pInfo->interval, // doDeleteSpecifyIntervalWindow(&pInfo->aggSup, &pInfo->twAggSup, pBlock, pInfo->pDelWins, &pInfo->interval,
pUpdatedMap); // pUpdatedMap);
doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pInfo->pDelWins,
pUpdatedMap);
continue; continue;
} else if (pBlock->info.type == STREAM_GET_ALL) { } else if (pBlock->info.type == STREAM_GET_ALL) {
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap); getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
...@@ -5704,9 +5790,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5704,9 +5790,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
maxTs = TMAX(maxTs, pBlock->info.window.ekey); maxTs = TMAX(maxTs, pBlock->info.window.ekey);
minTs = TMIN(minTs, pBlock->info.window.skey); minTs = TMIN(minTs, pBlock->info.window.skey);
doStreamIntervalAggImpl(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap); // doStreamIntervalAggImpl(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap);
// new disc buf // new disc buf
/*doStreamIntervalAggImpl2(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);*/ doStreamIntervalAggImpl2(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);
} }
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs); pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
...@@ -5741,8 +5827,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5741,8 +5827,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
#endif #endif
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap, closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap,
pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); pOperator);
void* pIte = NULL; void* pIte = NULL;
while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
...@@ -5751,7 +5837,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5751,7 +5837,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
taosArraySort(pUpdated, resultrowComparAsc); taosArraySort(pUpdated, resultrowComparAsc);
// new disc buf // new disc buf
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset); // finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated,
// pSup->rowEntryInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
removeDeleteResults(pUpdatedMap, pInfo->pDelWins); removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
...@@ -5762,9 +5849,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5762,9 +5849,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); // doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
// new disc buf // new disc buf
// doBuildResult(pOperator, pInfo->binfo.pRes, &pInfo->groupResInfo); doBuildResult(pOperator, pInfo->binfo.pRes, &pInfo->groupResInfo);
printDataBlock(pInfo->binfo.pRes, "single interval"); printDataBlock(pInfo->binfo.pRes, "single interval");
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
} }
...@@ -5809,6 +5896,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -5809,6 +5896,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
.calTrigger = pIntervalPhyNode->window.triggerType, .calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN, .maxTs = INT64_MIN,
.minTs = INT64_MAX, .minTs = INT64_MAX,
.deleteMark = INT64_MAX,
}; };
ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY); ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY);
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
......
...@@ -41,6 +41,8 @@ typedef struct SUdfdContext { ...@@ -41,6 +41,8 @@ typedef struct SUdfdContext {
uv_mutex_t udfsMutex; uv_mutex_t udfsMutex;
SHashObj * udfsHash; SHashObj * udfsHash;
SArray* residentFuncs;
bool printVersion; bool printVersion;
} SUdfdContext; } SUdfdContext;
...@@ -67,6 +69,7 @@ typedef struct SUdf { ...@@ -67,6 +69,7 @@ typedef struct SUdf {
EUdfState state; EUdfState state;
uv_mutex_t lock; uv_mutex_t lock;
uv_cond_t condReady; uv_cond_t condReady;
bool resident;
char name[TSDB_FUNC_NAME_LEN]; char name[TSDB_FUNC_NAME_LEN];
int8_t funcType; int8_t funcType;
...@@ -200,6 +203,14 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { ...@@ -200,6 +203,14 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
if (udf->initFunc) { if (udf->initFunc) {
udf->initFunc(); udf->initFunc();
} }
udf->resident = false;
for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
char* funcName = taosArrayGet(global.residentFuncs, i);
if (strcmp(setup->udfName, funcName) == 0) {
udf->resident = true;
break;
}
}
udf->state = UDF_STATE_READY; udf->state = UDF_STATE_READY;
uv_cond_broadcast(&udf->condReady); uv_cond_broadcast(&udf->condReady);
uv_mutex_unlock(&udf->lock); uv_mutex_unlock(&udf->lock);
...@@ -345,7 +356,7 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { ...@@ -345,7 +356,7 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
uv_mutex_lock(&global.udfsMutex); uv_mutex_lock(&global.udfsMutex);
udf->refCount--; udf->refCount--;
if (udf->refCount == 0) { if (udf->refCount == 0 && !udf->resident) {
unloadUdf = true; unloadUdf = true;
taosHashRemove(global.udfsHash, udf->name, strlen(udf->name)); taosHashRemove(global.udfsHash, udf->name, strlen(udf->name));
} }
...@@ -576,9 +587,9 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { ...@@ -576,9 +587,9 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc)); uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc));
char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
char *mergeSuffix = "_merge"; char *mergeSuffix = "_merge";
strncpy(finishFuncName, processFuncName, sizeof(finishFuncName)); strncpy(mergeFuncName, processFuncName, sizeof(mergeFuncName));
strncat(finishFuncName, mergeSuffix, strlen(mergeSuffix)); strncat(mergeFuncName, mergeSuffix, strlen(mergeSuffix));
uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggMergeFunc)); uv_dlsym(&udf->lib, mergeFuncName, (void **)(&udf->aggMergeFunc));
} }
return 0; return 0;
} }
...@@ -919,8 +930,6 @@ static int32_t udfdRun() { ...@@ -919,8 +930,6 @@ static int32_t udfdRun() {
uv_run(global.loop, UV_RUN_DEFAULT); uv_run(global.loop, UV_RUN_DEFAULT);
uv_loop_close(global.loop); uv_loop_close(global.loop);
uv_mutex_destroy(&global.udfsMutex);
taosHashCleanup(global.udfsHash);
return 0; return 0;
} }
...@@ -941,6 +950,47 @@ void udfdConnectMnodeThreadFunc(void *args) { ...@@ -941,6 +950,47 @@ void udfdConnectMnodeThreadFunc(void *args) {
} }
} }
int32_t udfdInitResidentFuncs() {
if (strlen(tsUdfdResFuncs) == 0) {
return TSDB_CODE_SUCCESS;
}
global.residentFuncs = taosArrayInit(2, TSDB_FUNC_NAME_LEN);
char* pSave = tsUdfdResFuncs;
char* token;
while ((token = strtok_r(pSave, ",", &pSave)) != NULL) {
char func[TSDB_FUNC_NAME_LEN] = {0};
strncpy(func, token, strlen(token));
taosArrayPush(global.residentFuncs, func);
}
return TSDB_CODE_SUCCESS;
}
int32_t udfdDeinitResidentFuncs() {
for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
char* funcName = taosArrayGet(global.residentFuncs, i);
SUdf** udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName));
if (udfInHash) {
taosHashRemove(global.udfsHash, funcName, strlen(funcName));
SUdf* udf = *udfInHash;
if (udf->destroyFunc) {
(udf->destroyFunc)();
}
uv_dlclose(&udf->lib);
taosMemoryFree(udf);
}
}
taosArrayDestroy(global.residentFuncs);
return TSDB_CODE_SUCCESS;
}
int32_t udfdCleanup() {
uv_mutex_destroy(&global.udfsMutex);
taosHashCleanup(global.udfsHash);
return 0;
}
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
if (!taosCheckSystemIsLittleEnd()) { if (!taosCheckSystemIsLittleEnd()) {
printf("failed to start since on non-little-end machines\n"); printf("failed to start since on non-little-end machines\n");
...@@ -978,6 +1028,8 @@ int main(int argc, char *argv[]) { ...@@ -978,6 +1028,8 @@ int main(int argc, char *argv[]) {
return -5; return -5;
} }
udfdInitResidentFuncs();
uv_thread_t mnodeConnectThread; uv_thread_t mnodeConnectThread;
uv_thread_create(&mnodeConnectThread, udfdConnectMnodeThreadFunc, NULL); uv_thread_create(&mnodeConnectThread, udfdConnectMnodeThreadFunc, NULL);
...@@ -986,5 +1038,7 @@ int main(int argc, char *argv[]) { ...@@ -986,5 +1038,7 @@ int main(int argc, char *argv[]) {
removeListeningPipe(); removeListeningPipe();
udfdCloseClientRpc(); udfdCloseClientRpc();
udfdDeinitResidentFuncs();
udfdCleanup();
return 0; return 0;
} }
...@@ -915,20 +915,30 @@ static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) ...@@ -915,20 +915,30 @@ static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo)
} }
static int32_t stbSplSplitScanNodeWithoutPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) { static int32_t stbSplSplitScanNodeWithoutPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE); SLogicNode* pSplitNode = pInfo->pSplitNode;
if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pInfo->pSplitNode->pParent) &&
NULL == pInfo->pSplitNode->pParent->pLimit && NULL == pInfo->pSplitNode->pParent->pSlimit) {
pSplitNode = pInfo->pSplitNode->pParent;
}
int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pSplitNode, SUBPLAN_TYPE_MERGE);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT)); (SNode*)splCreateScanSubplan(pCxt, pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
} }
++(pCxt->groupId); ++(pCxt->groupId);
return code; return code;
} }
static int32_t stbSplSplitScanNodeWithPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) { static int32_t stbSplSplitScanNodeWithPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
int32_t code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, NULL, pInfo->pSplitNode, true); SLogicNode* pSplitNode = pInfo->pSplitNode;
if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pInfo->pSplitNode->pParent) &&
NULL == pInfo->pSplitNode->pParent->pLimit && NULL == pInfo->pSplitNode->pParent->pSlimit) {
pSplitNode = pInfo->pSplitNode->pParent;
}
int32_t code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pSplitNode, NULL, pSplitNode, true);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT)); (SNode*)splCreateScanSubplan(pCxt, pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
} }
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE; pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
++(pCxt->groupId); ++(pCxt->groupId);
......
...@@ -140,15 +140,9 @@ int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void* ...@@ -140,15 +140,9 @@ int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void*
if (streamStateGet(pState, key, pVal, pVLen) == 0) { if (streamStateGet(pState, key, pVal, pVLen) == 0) {
return 0; return 0;
} }
void* tmp = taosMemoryCalloc(1, size); *pVal = tdbRealloc(NULL, size);
if (streamStatePut(pState, key, &tmp, size) == 0) { memset(*pVal, 0, size);
taosMemoryFree(tmp); return 0;
int32_t code = streamStateGet(pState, key, pVal, pVLen);
ASSERT(code == 0);
return code;
}
taosMemoryFree(tmp);
return -1;
} }
int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) { int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) {
...@@ -196,9 +190,14 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key ...@@ -196,9 +190,14 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
} }
if (tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL) < 0) {
taosMemoryFree(pCur);
return NULL;
}
int32_t c; int32_t c;
if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) { if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
tdbTbcClose(pCur->pCur);
taosMemoryFree(pCur); taosMemoryFree(pCur);
return NULL; return NULL;
} }
...@@ -217,9 +216,14 @@ SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const SWinKey* key ...@@ -217,9 +216,14 @@ SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const SWinKey* key
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
} }
if (tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL) < 0) {
taosMemoryFree(pCur);
return NULL;
}
int32_t c; int32_t c;
if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) { if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
tdbTbcClose(pCur->pCur);
taosMemoryFree(pCur); taosMemoryFree(pCur);
return NULL; return NULL;
} }
......
...@@ -248,6 +248,7 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { ...@@ -248,6 +248,7 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
return 0; return 0;
} }
// loop to write the dirty pages to file
SRBTreeIter iter = tRBTreeIterCreate(&pPager->rbt, 1); SRBTreeIter iter = tRBTreeIterCreate(&pPager->rbt, 1);
SRBTreeNode *pNode = NULL; SRBTreeNode *pNode = NULL;
while ((pNode = tRBTreeIterNext(&iter)) != NULL) { while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
...@@ -257,37 +258,23 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { ...@@ -257,37 +258,23 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
pPage->isDirty = 0;
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
} }
tRBTreeCreate(&pPager->rbt, pageCmpFn); tdbTrace("tdbttl commit:%p, %d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize);
/* pPager->dbOrigSize = pPager->dbFileSize;
// loop to write the dirty pages to file
for (pPage = pPager->pDirty; pPage; pPage = pPage->pDirtyNext) {
// TODO: update the page footer
ret = tdbPagerWritePageToDB(pPager, pPage);
if (ret < 0) {
ASSERT(0);
return -1;
}
}
// release the page // release the page
for (pPage = pPager->pDirty; pPage; pPage = pPager->pDirty) { iter = tRBTreeIterCreate(&pPager->rbt, 1);
pPager->pDirty = pPage->pDirtyNext; while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
pPage->pDirtyNext = NULL; pPage = (SPage *)pNode;
pPage->isDirty = 0; pPage->isDirty = 0;
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
tdbPCacheRelease(pPager->pCache, pPage, pTxn); tdbPCacheRelease(pPager->pCache, pPage, pTxn);
} }
*/
tdbTrace("tdbttl commit:%p, %d", pPager, pPager->dbOrigSize); tRBTreeCreate(&pPager->rbt, pageCmpFn);
pPager->dbOrigSize = pPager->dbFileSize;
// sync the db file // sync the db file
tdbOsFSync(pPager->fd); tdbOsFSync(pPager->fd);
......
...@@ -424,6 +424,7 @@ int walLoadMeta(SWal* pWal) { ...@@ -424,6 +424,7 @@ int walLoadMeta(SWal* pWal) {
// find existing meta file // find existing meta file
int metaVer = walFindCurMetaVer(pWal); int metaVer = walFindCurMetaVer(pWal);
if (metaVer == -1) { if (metaVer == -1) {
wDebug("vgId:%d wal find meta ver %d", pWal->cfg.vgId, metaVer);
return -1; return -1;
} }
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
......
...@@ -50,6 +50,7 @@ ...@@ -50,6 +50,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "tcompression.h" #include "tcompression.h"
#include "lz4.h" #include "lz4.h"
#include "tRealloc.h"
#include "tlog.h" #include "tlog.h"
#ifdef TD_TSZ #ifdef TD_TSZ
...@@ -814,24 +815,24 @@ int32_t tsCompressFloatImp(const char *const input, const int32_t nelements, cha ...@@ -814,24 +815,24 @@ int32_t tsCompressFloatImp(const char *const input, const int32_t nelements, cha
uint32_t predicted = prev_value; uint32_t predicted = prev_value;
uint32_t diff = curr.bits ^ predicted; uint32_t diff = curr.bits ^ predicted;
int32_t leading_zeros = FLOAT_BYTES * BITS_PER_BYTE; int32_t clz = FLOAT_BYTES * BITS_PER_BYTE;
int32_t trailing_zeros = leading_zeros; int32_t ctz = clz;
if (diff) { if (diff) {
trailing_zeros = BUILDIN_CTZ(diff); ctz = BUILDIN_CTZ(diff);
leading_zeros = BUILDIN_CLZ(diff); clz = BUILDIN_CLZ(diff);
} }
uint8_t nbytes = 0; uint8_t nbytes = 0;
uint8_t flag; uint8_t flag;
if (trailing_zeros > leading_zeros) { if (ctz > clz) {
nbytes = (uint8_t)(FLOAT_BYTES - trailing_zeros / BITS_PER_BYTE); nbytes = (uint8_t)(FLOAT_BYTES - ctz / BITS_PER_BYTE);
if (nbytes > 0) nbytes--; if (nbytes > 0) nbytes--;
flag = ((uint8_t)1 << 3) | nbytes; flag = ((uint8_t)1 << 3) | nbytes;
} else { } else {
nbytes = (uint8_t)(FLOAT_BYTES - leading_zeros / BITS_PER_BYTE); nbytes = (uint8_t)(FLOAT_BYTES - clz / BITS_PER_BYTE);
if (nbytes > 0) nbytes--; if (nbytes > 0) nbytes--;
flag = nbytes; flag = nbytes;
} }
...@@ -994,3 +995,621 @@ int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, co ...@@ -994,3 +995,621 @@ int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, co
return tdszDecompress(SZ_DOUBLE, input + 1, compressedSize - 1, nelements, output); return tdszDecompress(SZ_DOUBLE, input + 1, compressedSize - 1, nelements, output);
} }
#endif #endif
/*************************************************************************
* STREAM COMPRESSION
*************************************************************************/
#define I64_SAFE_ADD(a, b) (((a) >= 0 && (b) <= INT64_MAX - (b)) || ((a) < 0 && (b) >= INT64_MIN - (a)))
typedef struct SCompressor SCompressor;
static int32_t tCompBool(SCompressor *pCmprsor, const void *pData, int32_t nData);
static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData);
static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nData);
static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nData);
static int32_t tCompTimestamp(SCompressor *pCmprsor, const void *pData, int32_t nData);
static int32_t tCompBinary(SCompressor *pCmprsor, const void *pData, int32_t nData);
static struct {
int8_t type;
int32_t bytes;
int8_t isVarLen;
int32_t (*cmprFn)(SCompressor *, const void *, int32_t nData);
} DATA_TYPE_INFO[] = {
{TSDB_DATA_TYPE_NULL, 0, 0, NULL}, // TSDB_DATA_TYPE_NULL
{TSDB_DATA_TYPE_BOOL, 1, 0, tCompBool}, // TSDB_DATA_TYPE_BOOL
{TSDB_DATA_TYPE_TINYINT, 1, 0, tCompInt}, // TSDB_DATA_TYPE_TINYINT
{TSDB_DATA_TYPE_SMALLINT, 2, 0, tCompInt}, // TSDB_DATA_TYPE_SMALLINT
{TSDB_DATA_TYPE_INT, 4, 0, tCompInt}, // TSDB_DATA_TYPE_INT
{TSDB_DATA_TYPE_BIGINT, 8, 0, tCompInt}, // TSDB_DATA_TYPE_BIGINT
{TSDB_DATA_TYPE_FLOAT, 4, 0, tCompFloat}, // TSDB_DATA_TYPE_FLOAT
{TSDB_DATA_TYPE_DOUBLE, 8, 0, tCompDouble}, // TSDB_DATA_TYPE_DOUBLE
{TSDB_DATA_TYPE_VARCHAR, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_VARCHAR
{TSDB_DATA_TYPE_TIMESTAMP, 8, 0, tCompTimestamp}, // pTSDB_DATA_TYPE_TIMESTAMP
{TSDB_DATA_TYPE_NCHAR, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_NCHAR
{TSDB_DATA_TYPE_UTINYINT, 1, 0, tCompInt}, // TSDB_DATA_TYPE_UTINYINT
{TSDB_DATA_TYPE_USMALLINT, 2, 0, tCompInt}, // TSDB_DATA_TYPE_USMALLINT
{TSDB_DATA_TYPE_UINT, 4, 0, tCompInt}, // TSDB_DATA_TYPE_UINT
{TSDB_DATA_TYPE_UBIGINT, 8, 0, tCompInt}, // TSDB_DATA_TYPE_UBIGINT
{TSDB_DATA_TYPE_JSON, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_JSON
{TSDB_DATA_TYPE_VARBINARY, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_VARBINARY
{TSDB_DATA_TYPE_DECIMAL, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_DECIMAL
{TSDB_DATA_TYPE_BLOB, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_BLOB
{TSDB_DATA_TYPE_MEDIUMBLOB, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_MEDIUMBLOB
};
struct SCompressor {
int8_t type;
int8_t cmprAlg;
int8_t autoAlloc;
int32_t nVal;
uint8_t *aBuf[2];
int64_t nBuf[2];
union {
// Timestamp ----
struct {
int64_t ts_prev_val;
int64_t ts_prev_delta;
uint8_t *ts_flag_p;
};
// Integer ----
struct {
int64_t i_prev;
int32_t i_selector;
int32_t i_start;
int32_t i_end;
uint64_t i_aZigzag[241];
int8_t i_aBitN[241];
};
// Float ----
struct {
uint32_t f_prev;
uint8_t *f_flag_p;
};
// Double ----
struct {
uint64_t d_prev;
uint8_t *d_flag_p;
};
};
};
// Timestamp =====================================================
static int32_t tCompSetCopyMode(SCompressor *pCmprsor) {
int32_t code = 0;
if (pCmprsor->nVal) {
if (pCmprsor->autoAlloc) {
code = tRealloc(&pCmprsor->aBuf[1], sizeof(int64_t) * pCmprsor->nVal);
if (code) return code;
}
pCmprsor->nBuf[1] = 0;
int64_t n = 1;
int64_t value;
int64_t delta;
uint64_t vZigzag;
while (n < pCmprsor->nBuf[0]) {
uint8_t aN[2];
aN[0] = pCmprsor->aBuf[0][n] & 0xf;
aN[1] = pCmprsor->aBuf[0][n] >> 4;
n++;
for (int32_t i = 0; i < 2; i++) {
vZigzag = 0;
for (uint8_t j = 0; j < aN[i]; j++) {
vZigzag |= (((uint64_t)pCmprsor->aBuf[0][n]) << (8 * j));
n++;
}
int64_t delta_of_delta = ZIGZAG_DECODE(int64_t, vZigzag);
if (pCmprsor->nBuf[1] == 0) {
delta = 0;
value = delta_of_delta;
} else {
delta = delta_of_delta + delta;
value = delta + value;
}
memcpy(pCmprsor->aBuf[1] + pCmprsor->nBuf[1], &value, sizeof(int64_t));
pCmprsor->nBuf[1] += sizeof(int64_t);
if (n >= pCmprsor->nBuf[0]) break;
}
}
ASSERT(n == pCmprsor->nBuf[0]);
if (pCmprsor->autoAlloc) {
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[1] + 1);
if (code) return code;
}
memcpy(pCmprsor->aBuf[0] + 1, pCmprsor->aBuf[1], pCmprsor->nBuf[1]);
pCmprsor->nBuf[0] = 1 + pCmprsor->nBuf[1];
}
pCmprsor->aBuf[0][0] = 0;
return code;
}
static int32_t tCompTimestamp(SCompressor *pCmprsor, const void *pData, int32_t nData) {
int32_t code = 0;
int64_t ts = *(int64_t *)pData;
ASSERT(pCmprsor->type == TSDB_DATA_TYPE_TIMESTAMP);
ASSERT(nData == 8);
if (pCmprsor->aBuf[0][0] == 1) {
if (pCmprsor->nVal == 0) {
pCmprsor->ts_prev_val = ts;
pCmprsor->ts_prev_delta = -ts;
}
if (!I64_SAFE_ADD(ts, -pCmprsor->ts_prev_val)) {
code = tCompSetCopyMode(pCmprsor);
if (code) return code;
goto _copy_cmpr;
}
int64_t delta = ts - pCmprsor->ts_prev_val;
if (!I64_SAFE_ADD(delta, -pCmprsor->ts_prev_delta)) {
code = tCompSetCopyMode(pCmprsor);
if (code) return code;
goto _copy_cmpr;
}
int64_t delta_of_delta = delta - pCmprsor->ts_prev_delta;
uint64_t vZigzag = ZIGZAG_ENCODE(int64_t, delta_of_delta);
pCmprsor->ts_prev_val = ts;
pCmprsor->ts_prev_delta = delta;
if ((pCmprsor->nVal & 0x1) == 0) {
if (pCmprsor->autoAlloc) {
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17);
if (code) return code;
}
pCmprsor->ts_flag_p = pCmprsor->aBuf[0] + pCmprsor->nBuf[0];
pCmprsor->nBuf[0]++;
pCmprsor->ts_flag_p[0] = 0;
while (vZigzag) {
pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (vZigzag & 0xff);
pCmprsor->nBuf[0]++;
pCmprsor->ts_flag_p[0]++;
vZigzag >>= 8;
}
} else {
while (vZigzag) {
pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (vZigzag & 0xff);
pCmprsor->nBuf[0]++;
pCmprsor->ts_flag_p[0] += 0x10;
vZigzag >>= 8;
}
}
} else {
_copy_cmpr:
if (pCmprsor->autoAlloc) {
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(ts));
if (code) return code;
}
memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], &ts, sizeof(ts));
pCmprsor->nBuf[0] += sizeof(ts);
}
pCmprsor->nVal++;
return code;
}
// Integer =====================================================
#define SIMPLE8B_MAX ((uint64_t)1152921504606846974LL)
static const uint8_t BIT_PER_INTEGER[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60};
static const int32_t SELECTOR_TO_ELEMS[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1};
static const uint8_t BIT_TO_SELECTOR[] = {0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 11, 11, 12, 12, 12,
13, 13, 13, 13, 13, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 15,
15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15,
15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15};
static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData) {
int32_t code = 0;
ASSERT(nData == DATA_TYPE_INFO[pCmprsor->type].bytes);
if (pCmprsor->aBuf[0][0] == 0) {
int64_t val;
switch (pCmprsor->type) {
case TSDB_DATA_TYPE_TINYINT:
val = *(int8_t *)pData;
break;
case TSDB_DATA_TYPE_SMALLINT:
val = *(int16_t *)pData;
break;
case TSDB_DATA_TYPE_INT:
val = *(int32_t *)pData;
break;
case TSDB_DATA_TYPE_BIGINT:
val = *(int64_t *)pData;
break;
case TSDB_DATA_TYPE_UTINYINT:
val = *(uint8_t *)pData;
break;
case TSDB_DATA_TYPE_USMALLINT:
val = *(uint16_t *)pData;
break;
case TSDB_DATA_TYPE_UINT:
val = *(uint32_t *)pData;
break;
case TSDB_DATA_TYPE_UBIGINT:
val = *(int64_t *)pData;
break;
default:
ASSERT(0);
break;
}
if (!I64_SAFE_ADD(val, -pCmprsor->i_prev)) {
// TODO
goto _copy_cmpr;
}
int64_t diff = val - pCmprsor->i_prev;
uint64_t vZigzag = ZIGZAG_ENCODE(int64_t, diff);
if (vZigzag >= SIMPLE8B_MAX) {
// TODO
goto _copy_cmpr;
}
int8_t nBit = (vZigzag) ? (64 - BUILDIN_CLZL(vZigzag)) : 0;
pCmprsor->i_prev = val;
while (1) {
int32_t nEle = (pCmprsor->i_end + 241 - pCmprsor->i_start) % 241;
if (nEle + 1 <= SELECTOR_TO_ELEMS[pCmprsor->i_selector] && nEle + 1 <= SELECTOR_TO_ELEMS[BIT_TO_SELECTOR[nBit]]) {
if (pCmprsor->i_selector < BIT_TO_SELECTOR[nBit]) {
pCmprsor->i_selector = BIT_TO_SELECTOR[nBit];
}
pCmprsor->i_end = (pCmprsor->i_end + 1) % 241;
pCmprsor->i_aZigzag[pCmprsor->i_end] = vZigzag;
pCmprsor->i_aBitN[pCmprsor->i_end] = nBit;
break;
} else {
while (nEle < SELECTOR_TO_ELEMS[pCmprsor->i_selector]) {
pCmprsor->i_selector++;
}
nEle = SELECTOR_TO_ELEMS[pCmprsor->i_selector];
if (pCmprsor->autoAlloc) {
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(uint64_t));
if (code) return code;
}
uint64_t *bp = (uint64_t *)(pCmprsor->aBuf[0] + pCmprsor->nBuf[0]);
pCmprsor->nBuf[0] += sizeof(uint64_t);
bp[0] = pCmprsor->i_selector;
uint8_t bits = BIT_PER_INTEGER[pCmprsor->i_selector];
for (int32_t iVal = 0; iVal < nEle; iVal++) {
bp[0] |= ((pCmprsor->i_aZigzag[pCmprsor->i_start] & ((((uint64_t)1) << bits) - 1)) << (bits * iVal + 4));
pCmprsor->i_start = (pCmprsor->i_start + 1) % 241;
}
// reset and continue
pCmprsor->i_selector = 0;
for (int32_t iVal = pCmprsor->i_start; iVal < pCmprsor->i_end; iVal = (iVal + 1) % 241) {
if (pCmprsor->i_selector < BIT_TO_SELECTOR[pCmprsor->i_aBitN[iVal]]) {
pCmprsor->i_selector = BIT_TO_SELECTOR[pCmprsor->i_aBitN[iVal]];
}
}
}
}
} else {
_copy_cmpr:
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + nData);
if (code) return code;
memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], pData, nData);
pCmprsor->nBuf[0] += nData;
}
pCmprsor->nVal++;
return code;
}
// Float =====================================================
static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nData) {
int32_t code = 0;
ASSERT(nData == sizeof(float));
union {
float f;
uint32_t u;
} val = {.f = *(float *)pData};
uint32_t diff = val.u ^ pCmprsor->f_prev;
pCmprsor->f_prev = val.u;
int32_t clz, ctz;
if (diff) {
clz = BUILDIN_CLZ(diff);
ctz = BUILDIN_CTZ(diff);
} else {
clz = 32;
ctz = 32;
}
uint8_t nBytes;
if (clz < ctz) {
nBytes = sizeof(uint32_t) - ctz / BITS_PER_BYTE;
if (nBytes) diff >>= (32 - nBytes * BITS_PER_BYTE);
} else {
nBytes = sizeof(uint32_t) - clz / BITS_PER_BYTE;
}
if (nBytes == 0) nBytes++;
if ((pCmprsor->nVal & 0x1) == 0) {
if (pCmprsor->autoAlloc) {
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 9);
if (code) return code;
}
pCmprsor->f_flag_p = &pCmprsor->aBuf[0][pCmprsor->nBuf[0]];
pCmprsor->nBuf[0]++;
if (clz < ctz) {
pCmprsor->f_flag_p[0] = (0x08 | (nBytes - 1));
} else {
pCmprsor->f_flag_p[0] = nBytes - 1;
}
} else {
if (clz < ctz) {
pCmprsor->f_flag_p[0] |= ((0x08 | (nBytes - 1)) << 4);
} else {
pCmprsor->f_flag_p[0] |= ((nBytes - 1) << 4);
}
}
for (; nBytes; nBytes--) {
pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (diff & 0xff);
pCmprsor->nBuf[0]++;
diff >>= BITS_PER_BYTE;
}
pCmprsor->nVal++;
return code;
}
// Double =====================================================
static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nData) {
int32_t code = 0;
ASSERT(nData == sizeof(double));
union {
double d;
uint64_t u;
} val = {.d = *(double *)pData};
uint64_t diff = val.u ^ pCmprsor->d_prev;
pCmprsor->d_prev = val.u;
int32_t clz, ctz;
if (diff) {
clz = BUILDIN_CLZL(diff);
ctz = BUILDIN_CTZL(diff);
} else {
clz = 64;
ctz = 64;
}
uint8_t nBytes;
if (clz < ctz) {
nBytes = sizeof(uint64_t) - ctz / BITS_PER_BYTE;
if (nBytes) diff >>= (64 - nBytes * BITS_PER_BYTE);
} else {
nBytes = sizeof(uint64_t) - clz / BITS_PER_BYTE;
}
if (nBytes == 0) nBytes++;
if ((pCmprsor->nVal & 0x1) == 0) {
if (pCmprsor->autoAlloc) {
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17);
if (code) return code;
}
pCmprsor->d_flag_p = &pCmprsor->aBuf[0][pCmprsor->nBuf[0]];
pCmprsor->nBuf[0]++;
if (clz < ctz) {
pCmprsor->d_flag_p[0] = (0x08 | (nBytes - 1));
} else {
pCmprsor->d_flag_p[0] = nBytes - 1;
}
} else {
if (clz < ctz) {
pCmprsor->d_flag_p[0] |= ((0x08 | (nBytes - 1)) << 4);
} else {
pCmprsor->d_flag_p[0] |= ((nBytes - 1) << 4);
}
}
for (; nBytes; nBytes--) {
pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (diff & 0xff);
pCmprsor->nBuf[0]++;
diff >>= BITS_PER_BYTE;
}
pCmprsor->nVal++;
return code;
}
// Binary =====================================================
static int32_t tCompBinary(SCompressor *pCmprsor, const void *pData, int32_t nData) {
int32_t code = 0;
if (nData) {
if (pCmprsor->autoAlloc) {
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + nData);
if (code) return code;
}
memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], pData, nData);
pCmprsor->nBuf[0] += nData;
}
pCmprsor->nVal++;
return code;
}
// Bool =====================================================
static const uint8_t BOOL_CMPR_TABLE[] = {0b01, 0b0100, 0b010000, 0b01000000};
static int32_t tCompBool(SCompressor *pCmprsor, const void *pData, int32_t nData) {
int32_t code = 0;
bool vBool = *(int8_t *)pData;
int32_t mod4 = pCmprsor->nVal & 3;
if (mod4 == 0) {
pCmprsor->nBuf[0]++;
if (pCmprsor->autoAlloc) {
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0]);
if (code) return code;
}
pCmprsor->aBuf[0][pCmprsor->nBuf[0] - 1] = 0;
}
if (vBool) {
pCmprsor->aBuf[0][pCmprsor->nBuf[0] - 1] |= BOOL_CMPR_TABLE[mod4];
}
pCmprsor->nVal++;
return code;
}
// SCompressor =====================================================
int32_t tCompressorCreate(SCompressor **ppCmprsor) {
int32_t code = 0;
*ppCmprsor = (SCompressor *)taosMemoryCalloc(1, sizeof(SCompressor));
if ((*ppCmprsor) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
code = tRealloc(&(*ppCmprsor)->aBuf[0], 1024);
if (code) {
taosMemoryFree(*ppCmprsor);
*ppCmprsor = NULL;
goto _exit;
}
_exit:
return code;
}
int32_t tCompressorDestroy(SCompressor *pCmprsor) {
int32_t code = 0;
if (pCmprsor) {
int32_t nBuf = sizeof(pCmprsor->aBuf) / sizeof(pCmprsor->aBuf[0]);
for (int32_t iBuf = 0; iBuf < nBuf; iBuf++) {
tFree(pCmprsor->aBuf[iBuf]);
}
taosMemoryFree(pCmprsor);
}
return code;
}
int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg, int8_t autoAlloc) {
int32_t code = 0;
pCmprsor->type = type;
pCmprsor->cmprAlg = cmprAlg;
pCmprsor->autoAlloc = autoAlloc;
pCmprsor->nVal = 0;
switch (type) {
case TSDB_DATA_TYPE_TIMESTAMP:
pCmprsor->ts_prev_val = 0;
pCmprsor->ts_prev_delta = 0;
pCmprsor->ts_flag_p = NULL;
pCmprsor->aBuf[0][0] = 1; // For timestamp, 1 means compressed, 0 otherwise
pCmprsor->nBuf[0] = 1;
break;
case TSDB_DATA_TYPE_BOOL:
pCmprsor->nBuf[0] = 0;
break;
case TSDB_DATA_TYPE_BINARY:
pCmprsor->nBuf[0] = 0;
break;
case TSDB_DATA_TYPE_FLOAT:
pCmprsor->f_prev = 0;
pCmprsor->f_flag_p = NULL;
pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility)
pCmprsor->nBuf[0] = 1;
break;
case TSDB_DATA_TYPE_DOUBLE:
pCmprsor->d_prev = 0;
pCmprsor->d_flag_p = NULL;
pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility)
pCmprsor->nBuf[0] = 1;
break;
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_UBIGINT:
pCmprsor->i_prev = 0;
pCmprsor->i_selector = 0;
pCmprsor->i_start = 0;
pCmprsor->i_end = 0;
pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility)
pCmprsor->nBuf[0] = 1;
break;
default:
break;
}
return code;
}
int32_t tCompGen(SCompressor *pCmprsor, const uint8_t **ppData, int64_t *nData) {
int32_t code = 0;
if (pCmprsor->nVal == 0) {
*ppData = NULL;
*nData = 0;
return code;
}
if (pCmprsor->cmprAlg == TWO_STAGE_COMP /*|| IS_VAR_DATA_TYPE(pCmprsor->type)*/) {
code = tRealloc(&pCmprsor->aBuf[1], pCmprsor->nBuf[0] + 1);
if (code) return code;
int64_t ret = LZ4_compress_default(pCmprsor->aBuf[0], pCmprsor->aBuf[1] + 1, pCmprsor->nBuf[0], pCmprsor->nBuf[0]);
if (ret) {
pCmprsor->aBuf[1][0] = 0;
pCmprsor->nBuf[1] = ret + 1;
} else {
pCmprsor->aBuf[1][0] = 1;
memcpy(pCmprsor->aBuf[1] + 1, pCmprsor->aBuf[0], pCmprsor->nBuf[0]);
pCmprsor->nBuf[1] = pCmprsor->nBuf[0] + 1;
}
*ppData = pCmprsor->aBuf[1];
*nData = pCmprsor->nBuf[1];
} else {
*ppData = pCmprsor->aBuf[0];
*nData = pCmprsor->nBuf[0];
}
return code;
}
int32_t tCompress(SCompressor *pCmprsor, const void *pData, int64_t nData) {
return DATA_TYPE_INFO[pCmprsor->type].cmprFn(pCmprsor, pData, nData);
}
\ No newline at end of file
...@@ -621,6 +621,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is m ...@@ -621,6 +621,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is m
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_SCHEMA, "Rsma invalid schema") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_SCHEMA, "Rsma invalid schema")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REGEX_MATCH, "Rsma regex match") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REGEX_MATCH, "Rsma regex match")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_STREAM_STATE_OPEN, "Rsma stream state open")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_STREAM_STATE_COMMIT, "Rsma stream state commit")
//index //index
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding") TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
......
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#ifdef LINUX
#include <unistd.h>
#endif
#ifdef WINDOWS
#include <windows.h>
#endif
#include "taosudf.h"
TAOS* taos = NULL;
DLL_EXPORT int32_t gpd_init() {
taos = taos_connect("localhost", "root", "taosdata", "", 7100);
return 0;
}
DLL_EXPORT int32_t gpd_destroy() {
taos_close(taos);
taos_cleanup();
return 0;
}
DLL_EXPORT int32_t gpd(SUdfDataBlock* block, SUdfColumn *resultCol) {
SUdfColumnMeta *meta = &resultCol->colMeta;
meta->bytes = 4;
meta->type = TSDB_DATA_TYPE_INT;
meta->scale = 0;
meta->precision = 0;
SUdfColumnData *resultData = &resultCol->colData;
resultData->numOfRows = block->numOfRows;
for (int32_t i = 0; i < resultData->numOfRows; ++i) {
int j = 0;
for (; j < block->numOfCols; ++j) {
if (udfColDataIsNull(block->udfCols[j], i)) {
udfColDataSetNull(resultCol, i);
break;
}
}
if ( j == block->numOfCols) {
int32_t luckyNum = 88;
udfColDataSet(resultCol, i, (char *)&luckyNum, false);
}
}
TAOS_RES* res = taos_query(taos, "create database if not exists gpd");
if (taos_errno(res) != 0) {
char* errstr = taos_errstr(res);
}
res = taos_query(taos, "create table gpd.st (ts timestamp, f int) tags(t int)");
if (taos_errno(res) != 0) {
char* errstr = taos_errstr(res);
}
taos_query(taos, "insert into gpd.t using gpd.st tags(1) values(now, 1) ");
if (taos_errno(res) != 0) {
char* errstr = taos_errstr(res);
}
taos_query(taos, "select * from gpd.t");
if (taos_errno(res) != 0) {
char* errstr = taos_errstr(res);
}
//to simulate actual processing delay by udf
#ifdef LINUX
usleep(1 * 1000); // usleep takes sleep time in us (1 millionth of a second)
#endif
#ifdef WINDOWS
Sleep(1);
#endif
return 0;
}
...@@ -144,18 +144,18 @@ if $data20 != 8.000000000 then ...@@ -144,18 +144,18 @@ if $data20 != 8.000000000 then
return -1 return -1
endi endi
sql drop function bit_and; #sql drop function bit_and;
sql show functions; #sql show functions;
if $rows != 1 then #if $rows != 1 then
return -1 # return -1
endi #endi
if $data00 != @l2norm@ then #if $data00 != @l2norm@ then
return -1 # return -1
endi # endi
sql drop function l2norm; #sql drop function l2norm;
sql show functions; #sql show functions;
if $rows != 0 then #if $rows != 0 then
return -1 # return -1
endi #endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册