diff --git a/include/client/taos.h b/include/client/taos.h index cf410a42daf1e9c401af767497a603aa12c7a536..a59e203644afe66fe166cdcc4c04afb539a1a289 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -225,6 +225,9 @@ DLL_EXPORT int taos_get_tables_vgId(TAOS *taos, const char *db, const char *tabl DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList); +// set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner +DLL_EXPORT void taos_set_hb_quit(int8_t quitByKill); + /* --------------------------schemaless INTERFACE------------------------------- */ DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision); diff --git a/include/common/tcommon.h b/include/common/tcommon.h index a97c68be4920333f6a08f02fdb6753724db035a3..c9eaae74cd0dcd8ff78f38c605f18458dad43562 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -379,6 +379,8 @@ typedef struct STUidTagInfo { #define UD_GROUPID_COLUMN_INDEX 1 #define UD_TAG_COLUMN_INDEX 2 +int32_t taosGenCrashJsonMsg(int signum, char **pMsg, int64_t clusterId, int64_t startTime); + #ifdef __cplusplus } #endif diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index aade34e965a705bd201fcca59d341d152d989a7e..78fd9bed5ddff78b11383c096769e767f92a6648 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -273,6 +273,7 @@ typedef struct SStreamId { typedef struct SCheckpointInfo { int64_t id; int64_t version; // offset in WAL + int64_t currentVer;// current offset in WAL, not serialize it } SCheckpointInfo; typedef struct SStreamStatus { @@ -345,7 +346,7 @@ typedef struct SStreamMeta { FTaskExpand* expandFunc; int32_t vgId; SRWLatch lock; - int8_t walScan; + int32_t walScan; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); @@ -537,6 +538,7 @@ void streamTaskInputFail(SStreamTask* pTask); int32_t streamTryExec(SStreamTask* pTask); int32_t streamSchedExec(SStreamTask* pTask); int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock); +bool streamTaskShouldStop(const SStreamStatus* pStatus); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); diff --git a/include/util/tlog.h b/include/util/tlog.h index 0071b3d32cd72be79ab5adf8bdba20b92c18f4f7..541b7589b799f4c1821430b692c31aa179e669b6 100644 --- a/include/util/tlog.h +++ b/include/util/tlog.h @@ -102,7 +102,6 @@ bool taosAssertRelease(bool condition); void taosLogCrashInfo(char *nodeType, char *pMsg, int64_t msgLen, int signum, void *sigInfo); void taosReadCrashInfo(char *filepath, char **pMsg, int64_t *pMsgLen, TdFilePtr *pFd); void taosReleaseCrashLogFile(TdFilePtr pFile, bool truncateFile); -int32_t taosGenCrashJsonMsg(int signum, char **pMsg, int64_t clusterId, int64_t startTime); // clang-format off #define uFatal(...) { if (uDebugFlag & DEBUG_FATAL) { taosPrintLog("UTL FATAL", DEBUG_FATAL, tsLogEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} diff --git a/packaging/docker/bin/entrypoint.sh b/packaging/docker/bin/entrypoint.sh index f4be349c0de0ea0df382fc6fee033120c5c48007..f2811de7bd28c9cb324e64d0e7193f1c11869626 100755 --- a/packaging/docker/bin/entrypoint.sh +++ b/packaging/docker/bin/entrypoint.sh @@ -42,8 +42,9 @@ if [ "$DISABLE_ADAPTER" = "0" ]; then done fi -# if has mnode ep set or the host is first ep or not for cluster, just start. -if [ -f "$DATA_DIR/dnode/mnodeEpSet.json" ] || +# if dnode has been created or has mnode ep set or the host is first ep or not for cluster, just start. +if [ -f "$DATA_DIR/dnode/dnode.json" ] || + [ -f "$DATA_DIR/dnode/mnodeEpSet.json" ] || [ "$TAOS_FQDN" = "$FIRST_EP_HOST" ]; then $@ # others will first wait the first ep ready. diff --git a/packaging/tools/makeclient.sh b/packaging/tools/makeclient.sh index b473f3b52728714276b8493ea0bc96094e880e34..28dc770755a9af46bff1f429a7107194e41ae75d 100755 --- a/packaging/tools/makeclient.sh +++ b/packaging/tools/makeclient.sh @@ -197,7 +197,8 @@ if [[ $productName == "TDengine" ]]; then mkdir -p ${install_dir}/connector if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then if [ "$osType" != "Darwin" ]; then - [ -f ${build_dir}/lib/*.jar ] && cp ${build_dir}/lib/*.jar ${install_dir}/connector || : + jars=$(ls ${build_dir}/lib/*.jar 2>/dev/null|wc -l) + [ "${jars}" != "0" ] && cp ${build_dir}/lib/*.jar ${install_dir}/connector || : fi git clone --depth 1 https://github.com/taosdata/driver-go ${install_dir}/connector/go rm -rf ${install_dir}/connector/go/.git ||: diff --git a/packaging/tools/makepkg.sh b/packaging/tools/makepkg.sh index e4df233d678f90d55ef50ba05e088397faa8e6b0..a59083525714fa25724c5d8d93ffd799c0685091 100755 --- a/packaging/tools/makepkg.sh +++ b/packaging/tools/makepkg.sh @@ -338,7 +338,20 @@ if [ "$verMode" == "cluster" ] || [ "$verMode" == "cloud" ]; then connector_dir="${code_dir}/connector" mkdir -p ${install_dir}/connector if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then - [ -f ${build_dir}/lib/*.jar ] && cp ${build_dir}/lib/*.jar ${install_dir}/connector || : + tmp_pwd=`pwd` + cd ${install_dir}/connector + if [ ! -d taos-connector-jdbc ];then + git clone -b main --depth=1 https://github.com/taosdata/taos-connector-jdbc.git ||: + fi + cd taos-connector-jdbc + mvn clean package -Dmaven.test.skip=true + echo ${build_dir}/lib/ + cp target/*.jar ${build_dir}/lib/ + cd ${install_dir}/connector + rm -rf taos-connector-jdbc + cd ${tmp_pwd} + jars=$(ls ${build_dir}/lib/*.jar 2>/dev/null|wc -l) + [ "${jars}" != "0" ] && cp ${build_dir}/lib/*.jar ${install_dir}/connector || : git clone --depth 1 https://github.com/taosdata/driver-go ${install_dir}/connector/go rm -rf ${install_dir}/connector/go/.git ||: diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 41f87379a9c7766348c342b4f97273cb4704ab1a..46d44d744327d16d77bd4055644d4c797550e40e 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -80,6 +80,7 @@ typedef struct { int64_t appId; // ctl int8_t threadStop; + int8_t quitByKill; TdThread thread; TdThreadMutex lock; // used when app init and cleanup SHashObj* appSummary; diff --git a/source/client/inc/clientSml.h b/source/client/inc/clientSml.h index 92896e6f23ffd0b37e3387ac800417519f19dd6a..43a7e8ba45c9eb271cd07803ec38d4d22b1fe2b0 100644 --- a/source/client/inc/clientSml.h +++ b/source/client/inc/clientSml.h @@ -70,7 +70,7 @@ extern "C" { #define VALUE_LEN 6 #define OTD_JSON_FIELDS_NUM 4 -#define MAX_RETRY_TIMES 100 +#define MAX_RETRY_TIMES 10 typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType; typedef enum { diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index c9c2e7a5f82453035100f2e4a67d806c70a7ff57..8d082ab60b14e4da23bab139828392f0ba45f3e6 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -845,7 +845,12 @@ static void hbStopThread() { return; } - taosThreadJoin(clientHbMgr.thread, NULL); + // thread quit mode kill or inner exit from self-thread + if (clientHbMgr.quitByKill) { + taosThreadKill(clientHbMgr.thread, 0); + } else { + taosThreadJoin(clientHbMgr.thread, NULL); + } tscDebug("hb thread stopped"); } @@ -1037,3 +1042,8 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); } + +// set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner +void taos_set_hb_quit(int8_t quitByKill) { + clientHbMgr.quitByKill = quitByKill; +} diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 17150286e1407e65a38e040877da7e053ca2e946..763d4b69152a52a92781a556234318ebcc9a1fd7 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -535,7 +535,7 @@ static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSm if (index) { if (colField[*index].type != kv->type) { uError("SML:0x%" PRIx64 " point type and db type mismatch. point type: %d, db type: %d, key: %s", info->id, colField[*index].type, kv->type, kv->key); - return TSDB_CODE_TSC_INVALID_VALUE; + return TSDB_CODE_SML_INVALID_DATA; } if ((colField[*index].type == TSDB_DATA_TYPE_VARCHAR && @@ -1494,8 +1494,8 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL do { code = smlModifyDBSchemas(info); - if (code == 0) break; - taosMsleep(500); + if (code == 0 || code == TSDB_CODE_SML_INVALID_DATA) break; + taosMsleep(100); uInfo("SML:0x%" PRIx64 " smlModifyDBSchemas retry code:%s, times:%d", info->id, tstrerror(code), retryNum); } while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES); diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index d6ab974c6ced8305ddad0ebc86f072a08bf9c978..f379084cf558a6e9d4225f52163beb1cfea8a8ce 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -2503,9 +2503,11 @@ _exit: int32_t tColDataAddValueByBind(SColData *pColData, TAOS_MULTI_BIND *pBind) { int32_t code = 0; - ASSERT(pColData->type == pBind->buffer_type); - - if (IS_VAR_DATA_TYPE(pBind->buffer_type)) { // var-length data type + if (!(pBind->num == 1 && pBind->is_null && *pBind->is_null)) { + ASSERT(pColData->type == pBind->buffer_type); + } + + if (IS_VAR_DATA_TYPE(pColData->type)) { // var-length data type for (int32_t i = 0; i < pBind->num; ++i) { if (pBind->is_null && pBind->is_null[i]) { code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0); diff --git a/source/common/src/tmisce.c b/source/common/src/tmisce.c index 59afce1bbbf342ac61392c8e26dd44628c9505de..c195f5387cb2e01fb128895465980a2569b4d04a 100644 --- a/source/common/src/tmisce.c +++ b/source/common/src/tmisce.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "tmisce.h" +#include "tjson.h" #include "tglobal.h" #include "tlog.h" #include "tname.h" @@ -87,3 +88,63 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet) { return ep; } + +int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime) { + SJson* pJson = tjsonCreateObject(); + if (pJson == NULL) return -1; + char tmp[4096] = {0}; + + tjsonAddDoubleToObject(pJson, "reportVersion", 1); + + tjsonAddIntegerToObject(pJson, "clusterId", clusterId); + tjsonAddIntegerToObject(pJson, "startTime", startTime); + + // Do NOT invoke the taosGetFqdn here. + // this function may be invoked when memory exception occurs,so we should assume that it is running in a memory locked + // environment. The lock operation by taosGetFqdn may cause this program deadlock. + tjsonAddStringToObject(pJson, "fqdn", tsLocalFqdn); + + tjsonAddIntegerToObject(pJson, "pid", taosGetPId()); + + taosGetAppName(tmp, NULL); + tjsonAddStringToObject(pJson, "appName", tmp); + + if (taosGetOsReleaseName(tmp, sizeof(tmp)) == 0) { + tjsonAddStringToObject(pJson, "os", tmp); + } + + float numOfCores = 0; + if (taosGetCpuInfo(tmp, sizeof(tmp), &numOfCores) == 0) { + tjsonAddStringToObject(pJson, "cpuModel", tmp); + tjsonAddDoubleToObject(pJson, "numOfCpu", numOfCores); + } else { + tjsonAddDoubleToObject(pJson, "numOfCpu", tsNumOfCores); + } + + snprintf(tmp, sizeof(tmp), "%" PRId64 " kB", tsTotalMemoryKB); + tjsonAddStringToObject(pJson, "memory", tmp); + + tjsonAddStringToObject(pJson, "version", version); + tjsonAddStringToObject(pJson, "buildInfo", buildinfo); + tjsonAddStringToObject(pJson, "gitInfo", gitinfo); + + tjsonAddIntegerToObject(pJson, "crashSig", signum); + tjsonAddIntegerToObject(pJson, "crashTs", taosGetTimestampUs()); + +#ifdef _TD_DARWIN_64 + taosLogTraceToBuf(tmp, sizeof(tmp), 4); +#elif !defined(WINDOWS) + taosLogTraceToBuf(tmp, sizeof(tmp), 3); +#else + taosLogTraceToBuf(tmp, sizeof(tmp), 8); +#endif + + tjsonAddStringToObject(pJson, "stackInfo", tmp); + + char* pCont = tjsonToString(pJson); + tjsonDelete(pJson); + + *pMsg = pCont; + + return TSDB_CODE_SUCCESS; +} diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 0244a4fd6e34c8f362047a9829ecc56285ff31fa..16e7ffc5367c857dad1414c788dc46d9f6263b42 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -119,6 +119,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) pVnode->pFetchQ->threadId); while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); + tqNotifyClose(pVnode->pImpl->pTq); dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ); while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10); @@ -141,7 +142,6 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) dInfo("vgId:%d, vnode is closed", pVnode->vgId); if (commitAndRemoveWal) { - char path[TSDB_FILENAME_LEN] = {0}; snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP); dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path); tfsRmdir(pMgmt->pTfs, path); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index df220340688ae7b0230dae3a3242e0fc9ff6d2b9..e0b3e2bf7407d9b06bde081315f3428a87caff3b 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1437,7 +1437,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, return 0; } -static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb, SRpcMsg *pReq) { +static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) { SSdb *pSdb = pMnode->pSdb; SVgObj *pVgroup = NULL; void *pIter = NULL; @@ -1459,7 +1459,7 @@ static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb, SRpcMsg *pReq) { pHead->vgId = htonl(pVgroup->vgId); tSerializeSVTrimDbReq((char *)pHead + sizeof(SMsgHead), contLen, &trimReq); - SRpcMsg rpcMsg = {.msgType = TDMT_VND_TRIM, .pCont = pHead, .contLen = contLen, .info = pReq->info}; + SRpcMsg rpcMsg = {.msgType = TDMT_VND_TRIM, .pCont = pHead, .contLen = contLen}; SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup); int32_t code = tmsgSendReq(&epSet, &rpcMsg); if (code != 0) { @@ -1495,7 +1495,7 @@ static int32_t mndProcessTrimDbReq(SRpcMsg *pReq) { goto _OVER; } - code = mndTrimDb(pMnode, pDb, pReq); + code = mndTrimDb(pMnode, pDb); _OVER: if (code != 0) { diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index db17e4f533128191474690d2ace5764cf6c4b3b2..acc0d29382a17a0fef955bdec489b50d4a7a2233 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -180,16 +180,9 @@ int32_t tqStreamTasksScanWal(STQ* pTq); // tq util char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); -void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId); int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver); -int32_t launchTaskForWalBlock(SStreamTask* pTask, SFetchRet* pRet, STqOffset* pOffset); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); -void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver); -void saveOffsetForAllTasks(STQ* pTq, int64_t ver); -void initOffsetForAllRestoreTasks(STQ* pTq); -int32_t transferToWalReadTask(SStreamMeta* pStreamMeta, SArray* pTaskList); - #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 81f7c3d52ae37a14bb95e636d3c94171ae27c1bb..416bc6cdc73a794b77eac85c724f6f21aec83a47 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -190,6 +190,7 @@ int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg); int tqInit(); void tqCleanUp(); STQ* tqOpen(const char* path, SVnode* pVnode); +void tqNotifyClose(STQ*); void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 36fcb35791e8819cba53f53ca586fae7e3958de8..a78239a4b5f6bf95d6ac3dc92078a3f4b478a1a9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -154,6 +154,31 @@ void tqClose(STQ* pTq) { taosMemoryFree(pTq); } +void tqNotifyClose(STQ* pTq) { + if (pTq != NULL) { + taosWLockLatch(&pTq->pStreamMeta->lock); + + void* pIter = NULL; + while (1) { + pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); + if (pIter == NULL) { + break; + } + + SStreamTask* pTask = *(SStreamTask**)pIter; + tqDebug("vgId:%d s-task:%s set dropping flag", pTq->pStreamMeta->vgId, pTask->id.idStr); + pTask->status.taskStatus = TASK_STATUS__STOP; + + int64_t st = taosGetTimestampMs(); + qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); + int64_t el = taosGetTimestampMs() - st; + tqDebug("vgId:%d s-task:%s is closed in %" PRId64 "ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el); + } + + taosWUnLockLatch(&pTq->pStreamMeta->lock); + } +} + static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, int32_t type) { int32_t len = 0; @@ -573,6 +598,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->pMsgCb = &pTq->pVnode->msgCb; pTask->pMeta = pTq->pStreamMeta; pTask->chkInfo.version = ver; + pTask->chkInfo.currentVer = ver; // expand executor if (pTask->fillHistory) { @@ -966,14 +992,21 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { int32_t* pRef = taosMemoryMalloc(sizeof(int32_t)); *pRef = 1; + taosWLockLatch(&pTq->pStreamMeta->lock); + void* pIter = NULL; while (1) { pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } + SStreamTask* pTask = *(SStreamTask**)pIter; - if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue; + if (pTask->taskLevel != TASK_LEVEL__SOURCE) { + continue; + } - qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->id.taskId, ver); + qDebug("s-task:%s delete req enqueue, ver: %" PRId64, pTask->id.idStr, ver); if (!failed) { SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); @@ -983,15 +1016,13 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { atomic_add_fetch_32(pRefBlock->dataRef, 1); if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pRefBlock) < 0) { - qError("stream task input del failed, task id %d", pTask->id.taskId); - atomic_sub_fetch_32(pRef, 1); taosFreeQitem(pRefBlock); continue; } if (streamSchedExec(pTask) < 0) { - qError("stream task launch failed, task id %d", pTask->id.taskId); + qError("s-task:%s stream task launch failed", pTask->id.idStr); continue; } @@ -1000,8 +1031,9 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { } } + taosWUnLockLatch(&pTq->pStreamMeta->lock); + int32_t ref = atomic_sub_fetch_32(pRef, 1); - /*A(ref >= 0);*/ if (ref == 0) { blockDataDestroy(pDelBlock); taosMemoryFree(pRef); @@ -1032,23 +1064,9 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { } blockDataDestroy(pDelBlock); #endif - return 0; } -static int32_t addSubmitBlockNLaunchTask(STqOffsetStore* pOffsetStore, SStreamTask* pTask, SStreamDataSubmit2* pSubmit, - const char* key, int64_t ver) { - doSaveTaskOffset(pOffsetStore, key, ver); - int32_t code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)pSubmit, ver); - - // remove the offset, if all functions are completed successfully. - if (code == TSDB_CODE_SUCCESS) { - tqOffsetDelete(pOffsetStore, key); - } - - return code; -} - int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { #if 0 void* pIter = NULL; @@ -1309,9 +1327,7 @@ int32_t tqStartStreamTasks(STQ* pTq) { return -1; } - tqInfo("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks); - initOffsetForAllRestoreTasks(pTq); - + tqDebug("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks); pRunReq->head.vgId = vgId; pRunReq->streamId = 0; pRunReq->taskId = WAL_READ_TASKS_ID; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 25ab7209d2ca33c928ba5b648348bcc329189d1f..2cda12c0e10fb02bd5928b4f349f75b7d1fad12b 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -1023,6 +1023,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } // update the table list handle for each stream scanner/wal reader + taosWLockLatch(&pTq->pStreamMeta->lock); while (1) { pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); if (pIter == NULL) { @@ -1039,5 +1040,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } } + taosWUnLockLatch(&pTq->pStreamMeta->lock); + return 0; } diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index cba51cdee4f868352ae2a339efd1c2eb817b58a6..c164d037e0bc6e124aa5edfbe6a3570502afbfcf 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -15,121 +15,125 @@ #include "tq.h" -static int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle); -static int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList); +static int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle); // this function should be executed by stream threads. // there is a case that the WAL increases more fast than the restore procedure, and this restore procedure // will not stop eventually. -int tqStreamTasksScanWal(STQ* pTq) { +int32_t tqStreamTasksScanWal(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; int64_t st = taosGetTimestampMs(); while (1) { - tqInfo("vgId:%d continue check if data in wal are available", vgId); + int32_t scan = pMeta->walScan; + tqDebug("vgId:%d continue check if data in wal are available, scan:%d", vgId, scan); // check all restore tasks - bool allFull = true; - streamTaskReplayWal(pTq->pStreamMeta, pTq->pOffsetStore, &allFull); + bool shouldIdle = true; + createStreamRunReq(pTq->pStreamMeta, &shouldIdle); int32_t times = 0; - if (allFull) { + if (shouldIdle) { taosWLockLatch(&pMeta->lock); pMeta->walScan -= 1; times = pMeta->walScan; + ASSERT(pMeta->walScan >= 0); + if (pMeta->walScan <= 0) { taosWUnLockLatch(&pMeta->lock); break; } taosWUnLockLatch(&pMeta->lock); - tqInfo("vgId:%d scan wal for stream tasks for %d times", vgId, times); + tqDebug("vgId:%d scan wal for stream tasks for %d times", vgId, times); } } - double el = (taosGetTimestampMs() - st) / 1000.0; - tqInfo("vgId:%d scan wal for stream tasks completed, elapsed time:%.2f sec", vgId, el); + int64_t el = (taosGetTimestampMs() - st); + tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%"PRId64" ms", vgId, el); return 0; } -//int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList) { -// int32_t numOfTask = taosArrayGetSize(pTaskList); -// if (numOfTask <= 0) { -// return TSDB_CODE_SUCCESS; -// } -// -// // todo: add lock -// for (int32_t i = 0; i < numOfTask; ++i) { -// SStreamTask* pTask = taosArrayGetP(pTaskList, i); -// tqDebug("vgId:%d transfer s-task:%s state restore -> ready, checkpoint:%" PRId64 " checkpoint id:%" PRId64, -// pStreamMeta->vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->chkInfo.id); -// taosHashRemove(pStreamMeta->pWalReadTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); -// -// // NOTE: do not change the following order -// atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); -// taosHashPut(pStreamMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES); -// } -// -// return TSDB_CODE_SUCCESS; -//} - -int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle) { +static SArray* extractTaskIdList(SStreamMeta* pStreamMeta, int32_t numOfTasks) { + SArray* pTaskIdList = taosArrayInit(numOfTasks, sizeof(int32_t)); void* pIter = NULL; - int32_t vgId = pStreamMeta->vgId; - - *pScanIdle = true; - - bool allWalChecked = true; - tqDebug("vgId:%d start to check wal to extract new submit block", vgId); - while (1) { + taosWLockLatch(&pStreamMeta->lock); + while(1) { pIter = taosHashIterate(pStreamMeta->pTasks, pIter); if (pIter == NULL) { break; } SStreamTask* pTask = *(SStreamTask**)pIter; - if (pTask->taskLevel != TASK_LEVEL__SOURCE) { + taosArrayPush(pTaskIdList, &pTask->id.taskId); + } + + taosWUnLockLatch(&pStreamMeta->lock); + return pTaskIdList; +} + +int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { + *pScanIdle = true; + bool noNewDataInWal = true; + int32_t vgId = pStreamMeta->vgId; + + int32_t numOfTasks = taosHashGetSize(pStreamMeta->pTasks); + if (numOfTasks == 0) { + return TSDB_CODE_SUCCESS; + } + + tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks); + SArray* pTaskIdList = extractTaskIdList(pStreamMeta, numOfTasks); + + // update the new task number + numOfTasks = taosArrayGetSize(pTaskIdList); + for (int32_t i = 0; i < numOfTasks; ++i) { + int32_t* pTaskId = taosArrayGet(pTaskIdList, i); + SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, *pTaskId); + if (pTask == NULL) { continue; } - int8_t status = pTask->status.taskStatus; - if (status == TASK_STATUS__RECOVER_PREPARE || status == TASK_STATUS__WAIT_DOWNSTREAM) { - tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, - pTask->status.taskStatus); + int32_t status = pTask->status.taskStatus; + if (pTask->taskLevel != TASK_LEVEL__SOURCE) { + tqDebug("s-task:%s not source task, no need to start", pTask->id.idStr); + streamMetaReleaseTask(pStreamMeta, pTask); continue; } - // check if offset value exists - char key[128] = {0}; - createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId); + if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__RECOVER_PREPARE || + status == TASK_STATUS__WAIT_DOWNSTREAM) { + tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, status); + streamMetaReleaseTask(pStreamMeta, pTask); + continue; + } if (tInputQueueIsFull(pTask)) { tqDebug("vgId:%d s-task:%s input queue is full, do nothing", vgId, pTask->id.idStr); + streamMetaReleaseTask(pStreamMeta, pTask); continue; } *pScanIdle = false; - // check if offset value exists - STqOffset* pOffset = tqOffsetRead(pOffsetStore, key); - ASSERT(pOffset != NULL); - // seek the stored version and extract data from WAL - int32_t code = walReadSeekVer(pTask->exec.pWalReader, pOffset->val.version); + int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit + streamMetaReleaseTask(pStreamMeta, pTask); continue; } // append the data for the stream - tqDebug("vgId:%d wal reader seek to ver:%" PRId64 " %s", vgId, pOffset->val.version, pTask->id.idStr); + tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); SPackedData packData = {0}; code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData); if (code != TSDB_CODE_SUCCESS) { // failed, continue + streamMetaReleaseTask(pStreamMeta, pTask); continue; } @@ -137,28 +141,32 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto if (p == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("%s failed to create data submit for stream since out of memory", pTask->id.idStr); + streamMetaReleaseTask(pStreamMeta, pTask); continue; } - allWalChecked = false; + noNewDataInWal = false; - tqDebug("s-task:%s submit data extracted from WAL", pTask->id.idStr); code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver); if (code == TSDB_CODE_SUCCESS) { - pOffset->val.version = walReaderGetCurrentVer(pTask->exec.pWalReader); + pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, - pOffset->val.version); + pTask->chkInfo.currentVer); } else { - // do nothing + tqError("s-task:%s append input queue failed, ver:%"PRId64, pTask->id.idStr, pTask->chkInfo.currentVer); } streamDataSubmitDestroy(p); taosFreeQitem(p); + streamMetaReleaseTask(pStreamMeta, pTask); } - if (allWalChecked) { + // all wal are checked, and no new data available in wal. + if (noNewDataInWal) { *pScanIdle = true; } + + taosArrayDestroy(pTaskIdList); return 0; } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index ab1be15271cdaf87af15131e7d54931ec8afe02b..128ddedf6da01eeaf672cc993fea7f354eed50da 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -25,21 +25,6 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { return taosStrdup(buf); } -// stream_task:stream_id:task_id -void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId) { - int32_t n = 12; - char* p = dst; - - memcpy(p, "stream_task:", n); - p += n; - - int32_t inc = tintToHex(streamId, p); - p += inc; - - *(p++) = ':'; - tintToHex(taskId, p); -} - int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver) { int32_t code = tAppendDataToInputQueue(pTask, pQueueItem); if (code < 0) { @@ -55,75 +40,6 @@ int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueI return TSDB_CODE_SUCCESS; } -void initOffsetForAllRestoreTasks(STQ* pTq) { - void* pIter = NULL; - - while(1) { - pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); - if (pIter == NULL) { - break; - } - - SStreamTask* pTask = *(SStreamTask**)pIter; - if (pTask->taskLevel != TASK_LEVEL__SOURCE) { - continue; - } - - if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { - tqDebug("s-task:%s skip push data, since not ready, status %d", pTask->id.idStr, pTask->status.taskStatus); - continue; - } - - char key[128] = {0}; - createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId); - - STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key); - if (pOffset == NULL) { - doSaveTaskOffset(pTq->pOffsetStore, key, pTask->chkInfo.version); - } - } -} - -void saveOffsetForAllTasks(STQ* pTq, int64_t ver) { - void* pIter = NULL; - - while(1) { - pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); - if (pIter == NULL) { - break; - } - - SStreamTask* pTask = *(SStreamTask**)pIter; - if (pTask->taskLevel != TASK_LEVEL__SOURCE) { - continue; - } - - if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { - tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, - pTask->status.taskStatus); - continue; - } - - char key[128] = {0}; - createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId); - - STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key); - if (pOffset == NULL) { - doSaveTaskOffset(pTq->pOffsetStore, key, ver); - } - } -} - -void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver) { - STqOffset offset = {0}; - tqOffsetResetToLog(&offset.val, ver); - - tstrncpy(offset.subKey, pKey, tListLen(offset.subKey)); - - // keep the offset info in the offset store - tqOffsetWrite(pOffsetStore, &offset); -} - static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t subType) { pRsp->reqOffset = pReq->reqOffset; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 1732ec04a77f54d10ab57a31aae663ea5e47e9d2..546cd18cdae64aa8725593417b8d685c5d5af1d9 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1108,6 +1108,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT pScanBaseInfo->dataReader = NULL; // let's seek to the next version in wal file + int64_t firstVer = walGetFirstVer(pInfo->tqReader->pWalReader->pWal); + if (pOffset->version + 1 < firstVer){ + pOffset->version = firstVer - 1; + } + if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) { qError("tqSeekVer failed ver:%" PRId64 ", %s", pOffset->version + 1, id); return -1; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 007a6f63d16d59da4b648f43803a058963a6c487..be2bd0e6e2ac0d8e099a24f8c077d54db868f102 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2541,6 +2541,20 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } while (1) { + if (isTaskKilled(pTaskInfo)) { + + if (pInfo->pUpdated != NULL) { + pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated); + } + + if (pInfo->pUpdatedMap != NULL) { + tSimpleHashCleanup(pInfo->pUpdatedMap); + pInfo->pUpdatedMap = NULL; + } + + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + } + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { pOperator->status = OP_RES_TO_RETURN; @@ -2635,6 +2649,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) { taosArrayPush(pInfo->pUpdated, pIte); } + tSimpleHashCleanup(pInfo->pUpdatedMap); pInfo->pUpdatedMap = NULL; taosArraySort(pInfo->pUpdated, winKeyCmprImpl); diff --git a/source/libs/function/CMakeLists.txt b/source/libs/function/CMakeLists.txt index 9d11d7b376b0de0e3f8b8fb5ad533cab72c90c19..f23b4d3e8762951a791529930781f657294c7d13 100644 --- a/source/libs/function/CMakeLists.txt +++ b/source/libs/function/CMakeLists.txt @@ -79,6 +79,26 @@ ENDIF () target_link_libraries( udf1 PUBLIC os ${LINK_JEMALLOC}) + +add_library(udf1_dup STATIC MODULE test/udf1_dup.c) +target_include_directories( + udf1_dup + PUBLIC + "${TD_SOURCE_DIR}/include/libs/function" + "${TD_SOURCE_DIR}/include/util" + "${TD_SOURCE_DIR}/include/common" + "${TD_SOURCE_DIR}/include/client" + "${TD_SOURCE_DIR}/include/os" + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" +) + +IF (TD_LINUX_64 AND JEMALLOC_ENABLED) + ADD_DEPENDENCIES(udf1_dup jemalloc) +ENDIF () + +target_link_libraries( + udf1_dup PUBLIC os ${LINK_JEMALLOC}) + add_library(udf2 STATIC MODULE test/udf2.c) target_include_directories( udf2 @@ -99,6 +119,26 @@ target_link_libraries( udf2 PUBLIC os ${LINK_JEMALLOC} ) +add_library(udf2_dup STATIC MODULE test/udf2_dup.c) +target_include_directories( + udf2_dup + PUBLIC + "${TD_SOURCE_DIR}/include/libs/function" + "${TD_SOURCE_DIR}/include/util" + "${TD_SOURCE_DIR}/include/common" + "${TD_SOURCE_DIR}/include/client" + "${TD_SOURCE_DIR}/include/os" + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" +) + +IF (TD_LINUX_64 AND JEMALLOC_ENABLED) + ADD_DEPENDENCIES(udf2_dup jemalloc) +ENDIF () + +target_link_libraries( + udf2_dup PUBLIC os ${LINK_JEMALLOC} +) + #SET(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR}/build/bin) add_executable(udfd src/udfd.c) target_include_directories( diff --git a/source/libs/function/test/udf1_dup.c b/source/libs/function/test/udf1_dup.c new file mode 100644 index 0000000000000000000000000000000000000000..c251192da34568741f0058c61999e5aebf1c56d2 --- /dev/null +++ b/source/libs/function/test/udf1_dup.c @@ -0,0 +1,42 @@ +#include +#include +#include +#ifdef LINUX +#include +#endif +#ifdef WINDOWS +#include +#endif +#include "taosudf.h" + + +DLL_EXPORT int32_t udf1_dup_init() { return 0; } + +DLL_EXPORT int32_t udf1_dup_destroy() { return 0; } + +DLL_EXPORT int32_t udf1_dup(SUdfDataBlock *block, SUdfColumn *resultCol) { + SUdfColumnData *resultData = &resultCol->colData; + for (int32_t i = 0; i < block->numOfRows; ++i) { + int j = 0; + for (; j < block->numOfCols; ++j) { + if (udfColDataIsNull(block->udfCols[j], i)) { + udfColDataSetNull(resultCol, i); + break; + } + } + if (j == block->numOfCols) { + int32_t luckyNum = 2; + udfColDataSet(resultCol, i, (char *)&luckyNum, false); + } + } + // to simulate actual processing delay by udf +#ifdef LINUX + usleep(1 * 1000); // usleep takes sleep time in us (1 millionth of a second) +#endif +#ifdef WINDOWS + Sleep(1); +#endif + resultData->numOfRows = block->numOfRows; + return 0; +} + diff --git a/source/libs/function/test/udf2_dup.c b/source/libs/function/test/udf2_dup.c new file mode 100644 index 0000000000000000000000000000000000000000..1a98190823c4510b24b1bac2d053dc3060973744 --- /dev/null +++ b/source/libs/function/test/udf2_dup.c @@ -0,0 +1,78 @@ +#include +#include +#include +#include + +#include "taosudf.h" + +DLL_EXPORT int32_t udf2_dup_init() { return 0; } + +DLL_EXPORT int32_t udf2_dup_destroy() { return 0; } + +DLL_EXPORT int32_t udf2_dup_start(SUdfInterBuf* buf) { + *(int64_t*)(buf->buf) = 0; + buf->bufLen = sizeof(double); + buf->numOfResult = 1; + return 0; +} + +DLL_EXPORT int32_t udf2_dup(SUdfDataBlock* block, SUdfInterBuf* interBuf, SUdfInterBuf* newInterBuf) { + double sumSquares = 0; + if (interBuf->numOfResult == 1) { + sumSquares = *(double*)interBuf->buf; + } + int8_t numNotNull = 0; + for (int32_t i = 0; i < block->numOfCols; ++i) { + SUdfColumn* col = block->udfCols[i]; + if (!(col->colMeta.type == TSDB_DATA_TYPE_INT || col->colMeta.type == TSDB_DATA_TYPE_DOUBLE)) { + return TSDB_CODE_UDF_INVALID_INPUT; + } + } + for (int32_t i = 0; i < block->numOfCols; ++i) { + for (int32_t j = 0; j < block->numOfRows; ++j) { + SUdfColumn* col = block->udfCols[i]; + if (udfColDataIsNull(col, j)) { + continue; + } + switch (col->colMeta.type) { + case TSDB_DATA_TYPE_INT: { + char* cell = udfColDataGetData(col, j); + int32_t num = *(int32_t*)cell; + sumSquares += (double)num * num; + break; + } + case TSDB_DATA_TYPE_DOUBLE: { + char* cell = udfColDataGetData(col, j); + double num = *(double*)cell; + sumSquares += num * num; + break; + } + default: + break; + } + ++numNotNull; + } + } + + *(double*)(newInterBuf->buf) = sumSquares; + newInterBuf->bufLen = sizeof(double); + + if (interBuf->numOfResult == 0 && numNotNull == 0) { + newInterBuf->numOfResult = 0; + } else { + newInterBuf->numOfResult = 1; + } + return 0; +} + +DLL_EXPORT int32_t udf2_dup_finish(SUdfInterBuf* buf, SUdfInterBuf* resultData) { + if (buf->numOfResult == 0) { + resultData->numOfResult = 0; + return 0; + } + double sumSquares = *(double*)(buf->buf); + *(double*)(resultData->buf) = sqrt(sumSquares) + 100; + resultData->bufLen = sizeof(double); + resultData->numOfResult = 1; + return 0; +} diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index 01a635e4b2d34bbb7982d45400429ac323e9547e..922a0f45ff38fba30fc4b766a3b6427026b49b36 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -251,7 +251,7 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in goto _return; } - if (bind[c].buffer_type != pColSchema->type) { + if ((!(rowNum == 1 && bind[c].is_null && *bind[c].is_null)) && bind[c].buffer_type != pColSchema->type) { // for rowNum ==1 , connector may not set buffer_type code = buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type"); goto _return; } diff --git a/source/libs/parser/src/parTokenizer.c b/source/libs/parser/src/parTokenizer.c index 312584994fe593085a83dca11e763b9ef4d4410b..49da87dc1685f3c9d2d03d861eb3dce8e029fd38 100644 --- a/source/libs/parser/src/parTokenizer.c +++ b/source/libs/parser/src/parTokenizer.c @@ -692,7 +692,7 @@ SToken tStrGetToken(const char* str, int32_t* i, bool isPrevOptr, bool* pIgnoreC len = tGetToken(&str[*i + t0.n + 1], &type); // only id and string are valid - if ((TK_NK_STRING != t0.type) && (TK_NK_ID != t0.type)) { + if (((TK_NK_STRING != t0.type) && (TK_NK_ID != t0.type)) || ((TK_NK_STRING != type) && (TK_NK_ID != type))) { t0.type = TK_NK_ILLEGAL; t0.n = 0; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 52bb03466c2800c93f663f706300eb2b7695d8ae..effbbc161e465e0fac6ced24136c318ccc30d2a5 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2526,7 +2526,7 @@ static bool tbCntScanOptIsEligibleAggFuncs(SNodeList* pAggFuncs) { return false; } } - return true; + return LIST_LENGTH(pAggFuncs) > 0; } static bool tbCntScanOptIsEligibleAgg(SAggLogicNode* pAgg) { diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 0f000f1f5017e6bf1cd737ee424a4ed72654ac5f..86ba91f76de730b73d94535d38e1c31b67604714 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -16,7 +16,7 @@ #include "streamInc.h" #include "ttimer.h" -#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 100000 +#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 3000 int32_t streamInit() { int8_t old; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 9a6ff302efc3b470f9e71caf07e2d0ae9d70a13f..325d315262c5cfc64ee3b5e9b9ff0e77230405e5 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -17,6 +17,11 @@ #define STREAM_EXEC_MAX_BATCH_NUM 100 +bool streamTaskShouldStop(const SStreamStatus* pStatus) { + int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus); + return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING); +} + static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) { int32_t code = TSDB_CODE_SUCCESS; void* pExecutor = pTask->exec.pExecutor; @@ -66,7 +71,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* // pExecutor while (1) { - if (pTask->status.taskStatus == TASK_STATUS__DROPPING) { + if (streamTaskShouldStop(&pTask->status)) { return 0; } @@ -106,7 +111,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* continue; } - qDebug("task %d(child %d) executed and get block", pTask->id.taskId, pTask->selfChildId); + qDebug("s-task:%s (child %d) executed and get block", pTask->id.idStr, pTask->selfChildId); SSDataBlock block = {0}; assignOneDataBlock(&block, output); @@ -134,7 +139,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { int32_t batchCnt = 0; while (1) { - if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { + if (streamTaskShouldStop(&pTask->status)) { taosArrayDestroy(pRes); return 0; } @@ -270,7 +275,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { } } - if (pTask->status.taskStatus == TASK_STATUS__DROPPING) { + if (streamTaskShouldStop(&pTask->status)) { if (pInput) { streamFreeQitem(pInput); } @@ -301,7 +306,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { ", checkPoint id:%" PRId64 " -> %" PRId64, pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId); - pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId}; + pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer}; taosWLockLatch(&pTask->pMeta->lock); streamMetaSaveTask(pTask->pMeta, pTask); @@ -368,7 +373,7 @@ int32_t streamTryExec(SStreamTask* pTask) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); qDebug("s-task:%s exec completed", pTask->id.idStr); - if (!taosQueueEmpty(pTask->inputQueue->queue)) { + if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status))) { streamSchedExec(pTask); } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index c9ea0c382aa4c0f93345e8683553253de91e5405..065e9d280f41834a35fff2409a9006e8a61bbd6f 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -84,11 +84,6 @@ void streamMetaClose(SStreamMeta* pMeta) { tdbClose(pMeta->db); void* pIter = NULL; -// while(pMeta->walScan) { -// qDebug("wait stream daemon quit"); -// taosMsleep(100); -// } - while (1) { pIter = taosHashIterate(pMeta->pTasks, pIter); if (pIter == NULL) { @@ -102,7 +97,6 @@ void streamMetaClose(SStreamMeta* pMeta) { } tFreeStreamTask(pTask); - /*streamMetaReleaseTask(pMeta, pTask);*/ } taosHashCleanup(pMeta->pTasks); @@ -197,10 +191,12 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { taosRLockLatch(&pMeta->lock); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); - if (ppTask != NULL && (atomic_load_8(&((*ppTask)->status.taskStatus)) != TASK_STATUS__DROPPING)) { - atomic_add_fetch_32(&(*ppTask)->refCnt, 1); - taosRUnLockLatch(&pMeta->lock); - return *ppTask; + if (ppTask != NULL) { + if (!streamTaskShouldStop(&(*ppTask)->status)) { + atomic_add_fetch_32(&(*ppTask)->refCnt, 1); + taosRUnLockLatch(&pMeta->lock); + return *ppTask; + } } taosRUnLockLatch(&pMeta->lock); @@ -211,7 +207,7 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { int32_t left = atomic_sub_fetch_32(&pTask->refCnt, 1); ASSERT(left >= 0); if (left == 0) { - ASSERT(atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING); + ASSERT(streamTaskShouldStop(&pTask->status)); tFreeStreamTask(pTask); } } @@ -222,11 +218,8 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask* pTask = *ppTask; taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn); - /*if (pTask->timer) { - * taosTmrStop(pTask->timer);*/ - /*pTask->timer = NULL;*/ - /*}*/ - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); + + atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__STOP); taosWLockLatch(&pMeta->lock); streamMetaReleaseTask(pMeta, pTask); diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index 7d2c8aa4e5454c4c9cf22fe0f7bdb40ab90690ac..2b2a0daf7b4d0bf7e69552c442a047bd9f32e0a5 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -1008,7 +1008,7 @@ int32_t taosGetFqdn(char *fqdn) { // hints.ai_family = AF_INET; strcpy(fqdn, hostname); strcpy(fqdn + strlen(hostname), ".local"); -#else // __APPLE__ +#else // linux struct addrinfo hints = {0}; struct addrinfo *result = NULL; hints.ai_flags = AI_CANONNAME; @@ -1020,7 +1020,7 @@ int32_t taosGetFqdn(char *fqdn) { } strcpy(fqdn, result->ai_canonname); freeaddrinfo(result); -#endif // __APPLE__ +#endif // linux return 0; } diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index a3d3c399abcd6040cf042dec045024549e2d3179..2a18f420cd0a74f6205f522784047c67c547a4b0 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -17,7 +17,6 @@ #include "tlog.h" #include "os.h" #include "tconfig.h" -#include "tutil.h" #include "tjson.h" #include "tglobal.h" @@ -781,65 +780,6 @@ bool taosAssertDebug(bool condition, const char *file, int32_t line, const char return true; } -int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime) { - SJson* pJson = tjsonCreateObject(); - if (pJson == NULL) return -1; - char tmp[4096] = {0}; - - tjsonAddDoubleToObject(pJson, "reportVersion", 1); - - tjsonAddIntegerToObject(pJson, "clusterId", clusterId); - tjsonAddIntegerToObject(pJson, "startTime", startTime); - - taosGetFqdn(tmp); - tjsonAddStringToObject(pJson, "fqdn", tmp); - - tjsonAddIntegerToObject(pJson, "pid", taosGetPId()); - - taosGetAppName(tmp, NULL); - tjsonAddStringToObject(pJson, "appName", tmp); - - if (taosGetOsReleaseName(tmp, sizeof(tmp)) == 0) { - tjsonAddStringToObject(pJson, "os", tmp); - } - - float numOfCores = 0; - if (taosGetCpuInfo(tmp, sizeof(tmp), &numOfCores) == 0) { - tjsonAddStringToObject(pJson, "cpuModel", tmp); - tjsonAddDoubleToObject(pJson, "numOfCpu", numOfCores); - } else { - tjsonAddDoubleToObject(pJson, "numOfCpu", tsNumOfCores); - } - - snprintf(tmp, sizeof(tmp), "%" PRId64 " kB", tsTotalMemoryKB); - tjsonAddStringToObject(pJson, "memory", tmp); - - tjsonAddStringToObject(pJson, "version", version); - tjsonAddStringToObject(pJson, "buildInfo", buildinfo); - tjsonAddStringToObject(pJson, "gitInfo", gitinfo); - - tjsonAddIntegerToObject(pJson, "crashSig", signum); - tjsonAddIntegerToObject(pJson, "crashTs", taosGetTimestampUs()); - -#ifdef _TD_DARWIN_64 - taosLogTraceToBuf(tmp, sizeof(tmp), 4); -#elif !defined(WINDOWS) - taosLogTraceToBuf(tmp, sizeof(tmp), 3); -#else - taosLogTraceToBuf(tmp, sizeof(tmp), 8); -#endif - - tjsonAddStringToObject(pJson, "stackInfo", tmp); - - char* pCont = tjsonToString(pJson); - tjsonDelete(pJson); - - *pMsg = pCont; - - return TSDB_CODE_SUCCESS; -} - - void taosLogCrashInfo(char* nodeType, char* pMsg, int64_t msgLen, int signum, void *sigInfo) { const char *flags = "UTL FATAL "; ELogLevel level = DEBUG_FATAL; diff --git a/tests/parallel_test/container_build.sh b/tests/parallel_test/container_build.sh index 80236cf604d63e4d9c8fdddfa81aeaf16fb64944..cfb5a26723602cee56f5e3332cc8a47bd4a3ec92 100755 --- a/tests/parallel_test/container_build.sh +++ b/tests/parallel_test/container_build.sh @@ -68,8 +68,8 @@ docker run \ -v ${REP_REAL_PATH}/community/contrib/libuv/:${REP_DIR}/community/contrib/libuv \ -v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \ -v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \ + -v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \ --rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=true;make -j || exit 1" - # -v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \ if [[ -d ${WORKDIR}/debugNoSan ]] ;then echo "delete ${WORKDIR}/debugNoSan" diff --git a/tests/script/tsim/query/tableCount.sim b/tests/script/tsim/query/tableCount.sim index d8d9bb9b03e227ce3e54d0d1c84071268b768774..ac5e23273af20afb9ff223b885e54c85cdce11ef 100644 --- a/tests/script/tsim/query/tableCount.sim +++ b/tests/script/tsim/query/tableCount.sim @@ -104,4 +104,9 @@ if $data62 != 5 then return -1 endi +sql select distinct db_name from information_schema.ins_tables; +print $rows +if $rows != 4 then + return -1 +endi system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/basic1.sim b/tests/script/tsim/stream/basic1.sim index 15ca6bf7c924bb7cffba8f0607cb0a5c1fea95cb..65032817b31c6aa2060b7c681d87801d73f7c84b 100644 --- a/tests/script/tsim/stream/basic1.sim +++ b/tests/script/tsim/stream/basic1.sim @@ -37,7 +37,7 @@ if $loop_count == 20 then endi if $rows != 4 then - print =====rows=$rows, expect 4 + print =====rows=$rows expect 4 goto loop0 endi diff --git a/tests/system-test/0-others/TS-3131.tsql b/tests/system-test/0-others/TS-3131.tsql new file mode 100644 index 0000000000000000000000000000000000000000..6d4b4c53f4800a416020b9e0c6f829eee5a56909 --- /dev/null +++ b/tests/system-test/0-others/TS-3131.tsql @@ -0,0 +1,71 @@ +drop database if exists d0; +create database d0 replica 1 keep 365 minRows 100 maxRows 4096 comp 2 vgroups 2 precision 'ms'; +use d0; +create table if not exists almlog (starttime timestamp,endtime timestamp,durationtime int, alarmno int, alarmtext nchar(256),isactive nchar(64)) tags (mcid nchar(16)); +create table if not exists mplog (starttime timestamp,mpid int, paravalue nchar(256),mptype nchar(32)) tags (mcid nchar(16)); +create table if not exists mdlog (starttime timestamp,endtime timestamp,durationtime int, statuscode int, npcgmname nchar(256),attr int) tags (mcid nchar(16)); +create table if not exists nrglog (updatetime timestamp,energyvalue double,enerygyincrease double) tags (mcid nchar(16),enerygytype nchar(16)); + +create table almlog_m201 using almlog tags("m201"); +create table almlog_m0103 using almlog tags("m0103"); +create table almlog_m0103_20031 using almlog tags("m0103"); +create table almlog_m0103_20032 using almlog tags("m0103"); +create table almlog_m0103_3003 using almlog tags("m0103"); +create table almlog_m0103_20033 using almlog tags("m0103"); +create table almlog_m0103_30031 using almlog tags("m0103"); +create table almlog_m0201 using almlog tags("m0201"); +create table almlog_m0102 using almlog tags("m0102"); +create table almlog_m0101 using almlog tags("m0101"); +create table almlog_m1002 using almlog tags("m1002"); + +create table mplog_m0204_4 using mplog tags("m0204"); +create table mplog_m0204_5 using mplog tags("m0204"); +create table mplog_m0204_6 using mplog tags("m0204"); +create table mplog_m0204_12 using mplog tags("m0204"); +create table mplog_m0204 using mplog tags("m0204"); +create table mplog_m201 using mplog tags("m201"); +create table mplog_m0102 using mplog tags("m0102"); +create table mplog_m1101 using mplog tags("m1101"); + +create table mdlog_m0102 using mplog tags("m0102"); +create table mdlog_m0504 using mplog tags("m0504"); +create table mdlog_m0505 using mplog tags("m0505"); +create table mdlog_m0507 using mplog tags("m0507"); +create table mdlog_m1002 using mplog tags("m1002"); +create table mdlog_m3201 using mplog tags("m3201"); +create table mdlog_m0201 using mplog tags("m0201"); +create table mdlog_m1102 using mplog tags("m1102"); +create table mdlog_m201 using mplog tags("m201"); + +create table nrglog_m201_electricvalue1 using nrglog tags("m201","electricValue1"); +create table nrglog_m201_oilvalue1 using nrglog tags("m201","oilValue1"); +create table nrglog_m201_gasvalue1 using nrglog tags("m201","gasValue1"); +create table nrglog_m201_watervalue1 using nrglog tags("m201","waterValue1"); +create table nrglog_m0101_oilvalue1 using nrglog tags("m0101","oilValue1"); +create table nrglog_m0101_watervalue1 using nrglog tags("m0101","waterValue1"); +create table nrglog_m0102_gasvalue1 using nrglog tags("m0102","gasValue1"); +create table nrglog_m1903 using nrglog tags("m1903",NULL); +create table nrglog_m2802 using nrglog tags("m2802",NULL); +create table nrglog_m2101 using nrglog tags("m2101",NULL); +create table nrglog_m0102 using nrglog tags("m0102",NULL); +create table nrglog_m0101_electricvalue1 using nrglog tags("m0101","electricValue1"); +create table nrglog_m0101_gasvalue1 using nrglog tags("m0101","gasValue1"); +create table nrglog_m0102_electricvalue1 using nrglog tags("m0102","electricValue1"); +create table nrglog_m0102_oilvalue1 using nrglog tags("m0102","oilValue1"); +create table nrglog_m0102_watervalue1 using nrglog tags("m0102","waterValue1"); + + +insert into almlog_m0103 values(now,now+1s,10,0,'','dismissed'); +insert into almlog_m0103_20031 values(now,now+1s,10,20031,'','dismissed'); +insert into almlog_m0103_20032 values(now,now+1s,10,20032,'','dismissed'); +insert into almlog_m0103_3003 values(now,now+1s,10,3003,'','dismissed'); +insert into almlog_m0103_20033 values(now,now+1s,10,20033,'','dismissed'); +insert into almlog_m0103_30031 values(now,now+1s,10,30031,'','dismissed'); + +flush database d0; + +show table tags from almlog; + +select *,tbname from d0.almlog where mcid='m0103'; + +select table_name from information_schema.ins_tables where db_name='d0'; diff --git a/tests/system-test/0-others/compa4096.json b/tests/system-test/0-others/compa4096.json index 5cc5d2084dd52ccc198446698f9a745664aeb054..5e203ded45311e88654af35c498853246b687925 100644 --- a/tests/system-test/0-others/compa4096.json +++ b/tests/system-test/0-others/compa4096.json @@ -53,18 +53,8 @@ "sample_format": "csv", "sample_file": "./sample.csv", "tags_file": "", - "columns": [ - { - "type": "INT", - "count": 4094 - } - ], - "tags": [ - { - "type": "TINYINT", - "count": 1 - } - ] + "columns": [{ "type": "INT","count": 4093}], + "tags": [{"type": "TINYINT", "count": 1},{"type": "NCHAR","count": 1}] } ] } diff --git a/tests/system-test/0-others/compatibility.py b/tests/system-test/0-others/compatibility.py index 1922ebfb2f9d6c48c911214b3fe13d68a77e5d26..33d1dac4b5627dc67f98fdd72930841735ed7383 100644 --- a/tests/system-test/0-others/compatibility.py +++ b/tests/system-test/0-others/compatibility.py @@ -98,9 +98,12 @@ class TDTestCase: def buildTaosd(self,bPath): # os.system(f"mv {bPath}/build_bak {bPath}/build ") - os.system(f" cd {bPath} ") - + os.system(f" cd {bPath} ") + def is_list_same_as_ordered_list(self,unordered_list, ordered_list): + sorted_list = sorted(unordered_list) + return sorted_list == ordered_list + def run(self): scriptsPath = os.path.dirname(os.path.realpath(__file__)) distro_id = distro.id() @@ -146,6 +149,8 @@ class TDTestCase: tdLog.info(" LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y ") os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y") os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '") + os.system("LD_LIBRARY_PATH=/usr/lib taos -f 0-others/TS-3131.tsql") + cmd = f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taos -h localhost ;" if os.system(cmd) == 0: raise Exception("failed to execute system command. cmd: %s" % cmd) @@ -220,6 +225,17 @@ class TDTestCase: tdLog.exit("%s(%d) failed" % args) tdsql.query("show streams;") tdsql.checkRows(2) + tdsql.query("select *,tbname from d0.almlog where mcid='m0103';") + tdsql.checkRows(6) + expectList = [0,3003,20031,20032,20033,30031] + resultList = [] + for i in range(6): + resultList.append(tdsql.queryResult[i][3]) + print(resultList) + if self.is_list_same_as_ordered_list(resultList,expectList): + print("The unordered list is the same as the ordered list.") + else: + tdlog.error("The unordered list is not the same as the ordered list.") tdsql.execute("insert into test.d80 values (now+1s, 11, 103, 0.21);") tdsql.execute("insert into test.d9 values (now+5s, 4.3, 104, 0.4);") diff --git a/tests/system-test/0-others/udf_create.py b/tests/system-test/0-others/udf_create.py index f467e802ac5d5f8adb15ef6b705d273e744270a3..ee1a0ef5b343ce1ec29edc2e5a7062b40525ee88 100644 --- a/tests/system-test/0-others/udf_create.py +++ b/tests/system-test/0-others/udf_create.py @@ -47,17 +47,27 @@ class TDTestCase: if platform.system().lower() == 'windows': self.libudf1 = subprocess.Popen('(for /r %s %%i in ("udf1.d*") do @echo %%i)|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") + self.libudf1_dup = subprocess.Popen('(for /r %s %%i in ("udf1_dup.d*") do @echo %%i)|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") self.libudf2 = subprocess.Popen('(for /r %s %%i in ("udf2.d*") do @echo %%i)|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") + self.libudf2_dup = subprocess.Popen('(for /r %s %%i in ("udf2_dup.d*") do @echo %%i)|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") if (not tdDnodes.dnodes[0].remoteIP == ""): tdDnodes.dnodes[0].remote_conn.get(tdDnodes.dnodes[0].config["path"]+'/debug/build/lib/libudf1.so',projPath+"\\debug\\build\\lib\\") + tdDnodes.dnodes[0].remote_conn.get(tdDnodes.dnodes[0].config["path"]+'/debug/build/lib/libudf1_dup.so',projPath+"\\debug\\build\\lib\\") tdDnodes.dnodes[0].remote_conn.get(tdDnodes.dnodes[0].config["path"]+'/debug/build/lib/libudf2.so',projPath+"\\debug\\build\\lib\\") + tdDnodes.dnodes[0].remote_conn.get(tdDnodes.dnodes[0].config["path"]+'/debug/build/lib/libudf2_dup.so',projPath+"\\debug\\build\\lib\\") self.libudf1 = self.libudf1.replace('udf1.dll','libudf1.so') + self.libudf1_dup = self.libudf1_dup.replace('udf1_dup.dll','libudf1_dup.so') self.libudf2 = self.libudf2.replace('udf2.dll','libudf2.so') + self.libudf2_dup = self.libudf2_dup.replace('udf2_dup.dll','libudf2_dup.so') else: self.libudf1 = subprocess.Popen('find %s -name "libudf1.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") + self.libudf1_dup = subprocess.Popen('find %s -name "libudf1_dup.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") self.libudf2 = subprocess.Popen('find %s -name "libudf2.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") + self.libudf2_dup = subprocess.Popen('find %s -name "libudf2_dup.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") self.libudf1 = self.libudf1.replace('\r','').replace('\n','') + self.libudf1_dup = self.libudf1_dup.replace('\r','').replace('\n','') self.libudf2 = self.libudf2.replace('\r','').replace('\n','') + self.libudf2_dup = self.libudf2_dup.replace('\r','').replace('\n','') def prepare_data(self): @@ -174,10 +184,12 @@ class TDTestCase: # create scalar functions tdSql.execute("create function udf1 as '%s' outputtype int;"%self.libudf1) + tdSql.execute("create function udf1_dup as '%s' outputtype int;"%self.libudf1_dup) # create aggregate functions tdSql.execute("create aggregate function udf2 as '%s' outputtype double bufSize 8;"%self.libudf2) + tdSql.execute("create aggregate function udf2_dup as '%s' outputtype double bufSize 8;"%self.libudf2_dup) functions = tdSql.getResult("show functions") function_nums = len(functions) @@ -188,6 +200,13 @@ class TDTestCase: # scalar functions + # udf1_dup + tdSql.query("select udf1(num1) ,udf1_dup(num1) from tb") + tdSql.checkData(1,0,1) + tdSql.checkData(1,1,2) + tdSql.checkData(2,0,1) + tdSql.checkData(2,1,2) + tdSql.execute("use db ") tdSql.query("select num1 , udf1(num1) ,num2 ,udf1(num2),num3 ,udf1(num3),num4 ,udf1(num4) from tb") tdSql.checkData(0,0,None) @@ -238,6 +257,10 @@ class TDTestCase: # aggregate functions + tdSql.query("select udf2(num1) ,udf2_dup(num1) from tb") + val = tdSql.queryResult[0][0] + 100 + tdSql.checkData(0,1,val) + tdSql.query("select udf2(num1) ,udf2(num2), udf2(num3) from tb") tdSql.checkData(0,0,15.362291496) tdSql.checkData(0,1,10000949.553189287) diff --git a/tests/system-test/1-insert/delete_stable.py b/tests/system-test/1-insert/delete_stable.py index 313b6ce731fe98aaed6fdfe1f9fe71fa4313d3cd..8ebe7b6692e31bba12bdc0a3cbc885112eb96562 100644 --- a/tests/system-test/1-insert/delete_stable.py +++ b/tests/system-test/1-insert/delete_stable.py @@ -29,6 +29,9 @@ class TDTestCase: tdLog.debug("start to execute %s" % __file__) tdSql.init(conn.cursor()) self.dbname = 'db_test' + self.ns_dbname = 'ns_test' + self.us_dbname = 'us_test' + self.ms_dbname = 'ms_test' self.setsql = TDSetSql() self.stbname = 'stb' self.ntbname = 'ntb' @@ -220,11 +223,45 @@ class TDTestCase: tdSql.query(f'select {func}(*) from {self.stbname}') tdSql.execute(f'drop table {self.stbname}') tdSql.execute(f'drop database {self.dbname}') + + def precision_now_check(self): + for dbname in [self.ms_dbname, self.us_dbname, self.ns_dbname]: + self.ts = 1537146000000 + if dbname == self.us_dbname: + self.ts = int(self.ts*1000) + precision = "us" + elif dbname == self.ns_dbname: + precision = "ns" + self.ts = int(self.ts*1000000) + else: + precision = "ms" + self.ts = int(self.ts) + tdSql.execute(f'drop database if exists {dbname}') + tdSql.execute(f'create database if not exists {dbname} precision "{precision}"') + tdSql.execute(f'use {dbname}') + self.base_data = { + 'tinyint':self.tinyint_val + } + self.column_dict = { + 'col1': 'tinyint' + } + for col_name,col_type in self.column_dict.items(): + tdSql.execute(f'create table if not exists {self.stbname} (ts timestamp,{col_name} {col_type}) tags(t1 int)') + for i in range(self.tbnum): + tdSql.execute(f'create table if not exists {self.stbname}_{i} using {self.stbname} tags(1)') + self.insert_base_data(col_type,f'{self.stbname}_{i}',self.rowNum,self.base_data) + tdSql.query(f'select * from {self.stbname}') + tdSql.checkEqual(tdSql.queryRows, self.tbnum*self.rowNum) + tdSql.execute(f'delete from {self.stbname} where ts < now()') + tdSql.query(f'select * from {self.stbname}') + tdSql.checkEqual(tdSql.queryRows, 0) + def run(self): self.delete_data_stb() tdDnodes.stoptaosd(1) tdDnodes.starttaosd(1) self.delete_data_stb() + self.precision_now_check() def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__) diff --git a/tests/system-test/7-tmq/subscribeDb3.py b/tests/system-test/7-tmq/subscribeDb3.py index bddb196f4ab9af2ccf0d85af8614f13ea638288d..1de9b62bcd824d24a6f542183a3399a9ff056bdf 100644 --- a/tests/system-test/7-tmq/subscribeDb3.py +++ b/tests/system-test/7-tmq/subscribeDb3.py @@ -82,7 +82,7 @@ class TDTestCase: tdSql.query("select * from %s.notifyinfo"%cdbName) #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) if tdSql.getRows() == 2 : - print(tdSql.getData(0, 1), tdSql.getData(1, 1)) + tdLog.info("row[0][1]: %d, row[1][1]: %d"%(tdSql.getData(0, 1), tdSql.getData(1, 1))) if tdSql.getData(1, 1) == 1: break time.sleep(0.1) @@ -122,6 +122,7 @@ class TDTestCase: os.system(shellCmd) def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): + tdLog.info("start create tables......") tsql.execute("create database if not exists %s vgroups %d wal_retention_period 3600"%(dbName, vgroups)) tsql.execute("use %s" %dbName) tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) @@ -137,11 +138,11 @@ class TDTestCase: tsql.execute(sql) event.set() - tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) + tdLog.info("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) return def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs): - tdLog.debug("start to insert data ............") + tdLog.info("start to insert data ............") tsql.execute("use %s" %dbName) pre_insert = "insert into " sql = pre_insert @@ -163,7 +164,7 @@ class TDTestCase: if sql != pre_insert: #print("insert sql:%s"%sql) tsql.execute(sql) - tdLog.debug("insert data ............ [OK]") + tdLog.info("insert data ............ [OK]") return def prepareEnv(self, **parameterDict): @@ -286,7 +287,7 @@ class TDTestCase: prepareEnvThread.start() tdLog.info("create topics from db") - topicName1 = 'topic_db1' + topicName1 = 'topic_db11' tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) consumerId = 0 diff --git a/tests/system-test/7-tmq/tmqConsumerGroup.py b/tests/system-test/7-tmq/tmqConsumerGroup.py index d146dca4497b9f60d204ca067f3bcd299c204dae..f05f600f27b1d1c3194c85a03edfbcac1aeee3b7 100644 --- a/tests/system-test/7-tmq/tmqConsumerGroup.py +++ b/tests/system-test/7-tmq/tmqConsumerGroup.py @@ -42,7 +42,7 @@ class TDTestCase: 'showRow': 1} topicNameList = ['topic1', 'topic2'] - expectRowsList = [] + queryRowsList = [] tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1) tdSql.execute("alter database %s wal_retention_period 3600" % (paraDict['dbName'])) @@ -60,7 +60,7 @@ class TDTestCase: tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) tdSql.query(queryString) - expectRowsList.append(tdSql.getRows()) + queryRowsList.append(tdSql.getRows()) # create one stb2 paraDict["stbName"] = 'stb2' @@ -77,7 +77,7 @@ class TDTestCase: tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) # tdSql.query(queryString) - # expectRowsList.append(tdSql.getRows()) + # queryRowsList.append(tdSql.getRows()) # init consume info, and start tmq_sim, then check consume result tdLog.info("insert consume info to consume processor") @@ -99,7 +99,8 @@ class TDTestCase: pThread = tmqCom.asyncInsertData(paraDict) tdLog.info("wait consumer commit notify") - tmqCom.getStartCommitNotifyFromTmqsim(rows=4) + # tmqCom.getStartCommitNotifyFromTmqsim(rows=4) + tmqCom.getStartConsumeNotifyFromTmqsim(rows=2) tdLog.info("pkill one consume processor") tmqCom.stopTmqSimProcess('tmq_sim_new') @@ -109,19 +110,21 @@ class TDTestCase: tdLog.info("wait the consume result") expectRows = 2 resultList = tmqCom.selectConsumeResult(expectRows) - actTotalRows = 0 + actConsumTotalRows = 0 for i in range(len(resultList)): - actTotalRows += resultList[i] + actConsumTotalRows += resultList[i] + + tdLog.info("act consumer1 rows: %d, consumer2 rows: %d"%(resultList[0], resultList[1])) tdSql.query(queryString) - expectRowsList.append(tdSql.getRows()) - expectTotalRows = 0 - for i in range(len(expectRowsList)): - expectTotalRows += expectRowsList[i] - - tdLog.info("act consume rows: %d, expect consume rows: %d"%(actTotalRows, expectTotalRows)) - if expectTotalRows <= resultList[0]: - tdLog.info("act consume rows: %d should >= expect consume rows: %d"%(actTotalRows, expectTotalRows)) + queryRowsList.append(tdSql.getRows()) + queryTotalRows = 0 + for i in range(len(queryRowsList)): + queryTotalRows += queryRowsList[i] + + tdLog.info("act consume rows: %d, query consume rows: %d"%(actConsumTotalRows, queryTotalRows)) + if actConsumTotalRows < queryTotalRows: + tdLog.info("act consume rows: %d should >= query consume rows: %d"%(actConsumTotalRows, queryTotalRows)) tdLog.exit("0 tmq consume rows error!") # time.sleep(10) @@ -130,9 +133,95 @@ class TDTestCase: tdLog.printNoPrefix("======== test case 1 end ...... ") + + def tmqCase2(self): + tdLog.printNoPrefix("======== test case 2: ") + paraDict = {'dbName': 'db1', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 10, + 'rowsPerTbl': 1000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 20, + 'showMsg': 1, + 'showRow': 1} + + topicNameList = ['topic3', 'topic4'] + queryRowsList = [] + tmqCom.initConsumerTable() + + tdLog.info("create topics from stb with filter") + # queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName']) + queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s" %(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicNameList[0], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + tdSql.query(queryString) + queryRowsList.append(tdSql.getRows()) + + # create one stb2 + paraDict["stbName"] = 'stb2' + # queryString = "select ts, sin(c1), abs(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName']) + queryString = "select ts, sin(c1), abs(pow(c1,3)) from %s.%s" %(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicNameList[1], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + tdSql.query(queryString) + queryRowsList.append(tdSql.getRows()) + + # init consume info, and start tmq_sim, then check consume result + tdLog.info("insert consume info to consume processor") + consumerId = 0 + paraDict["rowsPerTbl"] = 5000 + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 + topicList = "%s,%s"%(topicNameList[0],topicNameList[1]) + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:3000, auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor 1") + tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) + + tdLog.info("start consume processor 2") + tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],'cdb',0,1) + + tdLog.info("wait the consume result") + expectRows = 2 + resultList = tmqCom.selectConsumeResult(expectRows) + actConsumTotalRows = 0 + for i in range(len(resultList)): + actConsumTotalRows += resultList[i] + + tdLog.info("act consumer1 rows: %d, consumer2 rows: %d"%(resultList[0], resultList[1])) + + queryTotalRows = 0 + for i in range(len(queryRowsList)): + queryTotalRows += queryRowsList[i] + + tdLog.info("act consume rows: %d, query consume rows: %d"%(actConsumTotalRows, queryTotalRows)) + if actConsumTotalRows < queryTotalRows: + tdLog.info("act consume rows: %d should >= query consume rows: %d"%(actConsumTotalRows, queryTotalRows)) + tdLog.exit("0 tmq consume rows error!") + + # time.sleep(10) + # for i in range(len(topicNameList)): + # tdSql.query("drop topic %s"%topicNameList[i]) + + tdLog.printNoPrefix("======== test case 2 end ...... ") + def run(self): tdSql.prepare() self.tmqCase1() + self.tmqCase2() def stop(self): tdSql.close() diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 910b067d4e0bce5e289c136de29470591a446f2b..5ac32eaad9f1d4a1b8235feff6182f67ac9c9f2e 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -58,7 +58,6 @@ int32_t shellRunSingleCommand(char *command) { } if (shellRegexMatch(command, "^[ \t]*(quit|q|exit)[ \t;]*$", REG_EXTENDED | REG_ICASE)) { - shellWriteHistory(); return -1; } @@ -887,7 +886,6 @@ void shellWriteHistory() { } i = (i + 1) % SHELL_MAX_HISTORY_SIZE; } - taosFsyncFile(pFile); taosCloseFile(&pFile); } diff --git a/tools/shell/src/shellMain.c b/tools/shell/src/shellMain.c index bc5809ffe8b389ca57146a65f8c67d451f934b9e..795621dfddba857483df6c35240cdbbd8b944289 100644 --- a/tools/shell/src/shellMain.c +++ b/tools/shell/src/shellMain.c @@ -83,6 +83,9 @@ int main(int argc, char *argv[]) { #endif taos_init(); + // kill heart-beat thread when quit + taos_set_hb_quit(1); + if (shell.args.is_dump_config) { shellDumpConfig(); taos_cleanup();