diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 4cc1b8527c57f297d93383019d5c847648ec5945..7fc263c93cbf3fd4f4649c5ce7502d957629ed95 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -202,6 +202,17 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return code; } + code = vnodeStart(pImpl); + if (code != 0) { + tFreeSCreateVnodeReq(&createReq); + dError("vgId:%d, failed to start sync since %s", createReq.vgId, terrstr()); + vnodeClose(pImpl); + vnodeDestroy(path, pMgmt->pTfs); + terrno = code; + return code; + } + + code = vmWriteVnodesToFile(pMgmt); if (code != 0) { tFreeSCreateVnodeReq(&createReq); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 687a799c575baeb13603f2d37edcb8e4e6895bd2..3088c5dea40f92b5ef9a8b2b582ef8bfedc66d58 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -74,12 +74,6 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { return -1; } - // sync integration - vnodeSyncSetQ(pImpl, NULL); - vnodeSyncSetRpc(pImpl, NULL); - int32_t ret = vnodeSyncStart(pImpl); - assert(ret == 0); - taosWLockLatch(&pMgmt->latch); int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); taosWUnLockLatch(&pMgmt->latch); @@ -153,6 +147,7 @@ static void *vmOpenVnodeFunc(void *param) { pThread->failed++; } else { vmOpenVnode(pMgmt, pCfg, pImpl); + //vnodeStart(pImpl); dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->opened++; } @@ -364,10 +359,52 @@ static int32_t vmRequire(SMgmtWrapper *pWrapper, bool *required) { return 0; } +static int32_t vmStart(SMgmtWrapper *pWrapper) { + dDebug("vnode-mgmt start to run"); + SVnodesMgmt *pMgmt = pWrapper->pMgmt; + + taosRLockLatch(&pMgmt->latch); + + void *pIter = taosHashIterate(pMgmt->hash, NULL); + while (pIter) { + SVnodeObj **ppVnode = pIter; + if (ppVnode == NULL || *ppVnode == NULL) continue; + + SVnodeObj *pVnode = *ppVnode; + vnodeStart(pVnode->pImpl); + pIter = taosHashIterate(pMgmt->hash, pIter); + } + + taosRUnLockLatch(&pMgmt->latch); + return 0; +} + +static void vmStop(SMgmtWrapper *pWrapper) { +#if 0 + dDebug("vnode-mgmt start to stop"); + SVnodesMgmt *pMgmt = pWrapper->pMgmt; + taosRLockLatch(&pMgmt->latch); + + void *pIter = taosHashIterate(pMgmt->hash, NULL); + while (pIter) { + SVnodeObj **ppVnode = pIter; + if (ppVnode == NULL || *ppVnode == NULL) continue; + + SVnodeObj *pVnode = *ppVnode; + vnodeStop(pVnode->pImpl); + pIter = taosHashIterate(pMgmt->hash, pIter); + } + + taosRUnLockLatch(&pMgmt->latch); +#endif +} + void vmSetMgmtFp(SMgmtWrapper *pWrapper) { SMgmtFp mgmtFp = {0}; mgmtFp.openFp = vmInit; mgmtFp.closeFp = vmCleanup; + mgmtFp.startFp = vmStart; + mgmtFp.stopFp = vmStop; mgmtFp.requiredFp = vmRequire; vmInitMsgHandle(pWrapper); @@ -396,4 +433,4 @@ void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo) { } taosRUnLockLatch(&pMgmt->latch); -} \ No newline at end of file +} diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index f4975c183e26da5f7a198451e1df134fea6e657b..9eab4d33760f91a588d151a9b1c2196c4cb6b43f 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -61,6 +61,9 @@ int32_t vnodeSync(SVnode *pVnode); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName); +int32_t vnodeStart(SVnode *pVnode); +void vnodeStop(SVnode *pVnode); + int64_t vnodeGetSyncHandle(SVnode *pVnode); void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot); @@ -171,11 +174,6 @@ typedef struct { uint64_t uid; } STableKeyInfo; -// sync integration -void vnodeSyncSetQ(SVnode *pVnode, void *qHandle); -void vnodeSyncSetRpc(SVnode *pVnode, void *rpcHandle); -int32_t vnodeSyncStart(SVnode *pVnode); - #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 2b10afa5ed9548b2f6397d6d3bd3487072b82c3a..11136bc7bad5f8982bdf3ec493171b49287a2d2c 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -14,6 +14,7 @@ */ #include "vnodeInt.h" +#include "vnodeSync.h" int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { SVnodeInfo info = {0}; @@ -171,6 +172,16 @@ void vnodeClose(SVnode *pVnode) { } } +// start the sync timer after the queue is ready +int32_t vnodeStart(SVnode *pVnode) { + vnodeSyncSetQ(pVnode, NULL); + vnodeSyncSetRpc(pVnode, NULL); + vnodeSyncStart(pVnode); + return 0; +} + +void vnodeStop(SVnode *pVnode) {} + int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; } void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) { pSnapshot->lastApplyIndex = pVnode->state.committed; } \ No newline at end of file diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 46f11663e3ddb241131843f74f2d93f80c906f19..434591c4c32b0f85214aac200eabb04f629a6b9e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -268,7 +268,6 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) { int32_t numOfCols = LIST_LENGTH(pNode->pSlots); SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); - pBlock->info.numOfCols = numOfCols; pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); pBlock->info.blockId = pNode->dataBlockId; @@ -294,6 +293,7 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) { taosArrayPush(pBlock->pDataBlock, &idata); } + pBlock->info.numOfCols = taosArrayGetSize(pBlock->pDataBlock); return pBlock; } @@ -1032,6 +1032,8 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc pColInfo->info.bytes = tDataTypes[type].bytes; pInput->pData[paramIndex] = pColInfo; + } else { + pColInfo = pInput->pData[paramIndex]; } ASSERT(!IS_VAR_DATA_TYPE(type)); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index a52e39bd4f58343c0166145a9c444e7c366c0101..70087ee46bf3b2f49a8876372c1df75c8cdb582a 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -355,7 +355,7 @@ static int32_t translateToIso8601(SFunctionNode* pFunc, char* pErrBuf, int32_t l return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } - pFunc->node.resType = (SDataType) { .bytes = 24, .type = TSDB_DATA_TYPE_BINARY}; + pFunc->node.resType = (SDataType) { .bytes = 64, .type = TSDB_DATA_TYPE_BINARY}; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 820a4894b5ed756bcacfe030211b24b35e86b11e..6773f8192b54b94fd1d395b7c362a99897d4156a 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -489,7 +489,7 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) { if (fmIsUserDefinedFunc(node->funcId)) { return DEAL_RES_CONTINUE; } - + FOREACH(tnode, node->pParameterList) { if (!SCL_IS_CONST_NODE(tnode)) { return DEAL_RES_CONTINUE; @@ -517,8 +517,8 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) { res->node.resType = node->node.resType; int32_t type = output.columnData->info.type; if (IS_VAR_DATA_TYPE(type)) { - res->datum.p = output.columnData->pData; - output.columnData->pData = NULL; + res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1); + memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData)); } else { memcpy(nodesGetValueFromNode(res), output.columnData->pData, tDataTypes[type].bytes); } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 50c924bbb972e43d97fdf3fcfb5d51824d2a2b04..399a2255ac4403ee1a966a741545b6652211d1ea 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -403,6 +403,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CHECKSUM, "Invalid msg checksum" TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGLEN, "Invalid msg length") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, "Invalid msg type") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_LEADER, "Sync not leader") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error") + // wal TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, "Unexpected generic error in wal") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted") diff --git a/tools/shell/src/shellCommand.c b/tools/shell/src/shellCommand.c index c34ee7a22c6da4caf03ceae4b9a24744074eb4a0..fcdbf2d020568a206967f2db711365586aa0fa74 100644 --- a/tools/shell/src/shellCommand.c +++ b/tools/shell/src/shellCommand.c @@ -395,7 +395,7 @@ void shellClearScreen(int32_t ecmd_pos, int32_t cursor_pos) { void shellShowOnScreen(SShellCmd *cmd) { struct winsize w; if (ioctl(0, TIOCGWINSZ, &w) < 0 || w.ws_col == 0 || w.ws_row == 0) { - fprintf(stderr, "No stream device\n"); + // fprintf(stderr, "No stream device\n"); w.ws_col = 120; w.ws_row = 30; } diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index ac2be3c3cf89d45b032a3b26f3bafe7c218fce29..1f036ab25be42ede5ad7cdee03c4a70605804785 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -750,7 +750,7 @@ void shellReadHistory() { void shellWriteHistory() { SShellHistory *pHistory = &shell.history; - TdFilePtr pFile = taosOpenFile(pHistory->file, TD_FILE_WRITE | TD_FILE_STREAM); + TdFilePtr pFile = taosOpenFile(pHistory->file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_STREAM | TD_FILE_APPEND); if (pFile == NULL) return; for (int32_t i = pHistory->hstart; i != pHistory->hend;) {