提交 e61f372a 编写于 作者: L Liu Jicong

Merge branch '3.0' into feature/tq

......@@ -202,6 +202,17 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
return code;
}
code = vnodeStart(pImpl);
if (code != 0) {
tFreeSCreateVnodeReq(&createReq);
dError("vgId:%d, failed to start sync since %s", createReq.vgId, terrstr());
vnodeClose(pImpl);
vnodeDestroy(path, pMgmt->pTfs);
terrno = code;
return code;
}
code = vmWriteVnodesToFile(pMgmt);
if (code != 0) {
tFreeSCreateVnodeReq(&createReq);
......
......@@ -74,12 +74,6 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
return -1;
}
// sync integration
vnodeSyncSetQ(pImpl, NULL);
vnodeSyncSetRpc(pImpl, NULL);
int32_t ret = vnodeSyncStart(pImpl);
assert(ret == 0);
taosWLockLatch(&pMgmt->latch);
int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
taosWUnLockLatch(&pMgmt->latch);
......@@ -153,6 +147,7 @@ static void *vmOpenVnodeFunc(void *param) {
pThread->failed++;
} else {
vmOpenVnode(pMgmt, pCfg, pImpl);
//vnodeStart(pImpl);
dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
pThread->opened++;
}
......@@ -364,10 +359,52 @@ static int32_t vmRequire(SMgmtWrapper *pWrapper, bool *required) {
return 0;
}
static int32_t vmStart(SMgmtWrapper *pWrapper) {
dDebug("vnode-mgmt start to run");
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
taosRLockLatch(&pMgmt->latch);
void *pIter = taosHashIterate(pMgmt->hash, NULL);
while (pIter) {
SVnodeObj **ppVnode = pIter;
if (ppVnode == NULL || *ppVnode == NULL) continue;
SVnodeObj *pVnode = *ppVnode;
vnodeStart(pVnode->pImpl);
pIter = taosHashIterate(pMgmt->hash, pIter);
}
taosRUnLockLatch(&pMgmt->latch);
return 0;
}
static void vmStop(SMgmtWrapper *pWrapper) {
#if 0
dDebug("vnode-mgmt start to stop");
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
taosRLockLatch(&pMgmt->latch);
void *pIter = taosHashIterate(pMgmt->hash, NULL);
while (pIter) {
SVnodeObj **ppVnode = pIter;
if (ppVnode == NULL || *ppVnode == NULL) continue;
SVnodeObj *pVnode = *ppVnode;
vnodeStop(pVnode->pImpl);
pIter = taosHashIterate(pMgmt->hash, pIter);
}
taosRUnLockLatch(&pMgmt->latch);
#endif
}
void vmSetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0};
mgmtFp.openFp = vmInit;
mgmtFp.closeFp = vmCleanup;
mgmtFp.startFp = vmStart;
mgmtFp.stopFp = vmStop;
mgmtFp.requiredFp = vmRequire;
vmInitMsgHandle(pWrapper);
......@@ -396,4 +433,4 @@ void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo) {
}
taosRUnLockLatch(&pMgmt->latch);
}
\ No newline at end of file
}
......@@ -61,6 +61,9 @@ int32_t vnodeSync(SVnode *pVnode);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName);
int32_t vnodeStart(SVnode *pVnode);
void vnodeStop(SVnode *pVnode);
int64_t vnodeGetSyncHandle(SVnode *pVnode);
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot);
......@@ -171,11 +174,6 @@ typedef struct {
uint64_t uid;
} STableKeyInfo;
// sync integration
void vnodeSyncSetQ(SVnode *pVnode, void *qHandle);
void vnodeSyncSetRpc(SVnode *pVnode, void *rpcHandle);
int32_t vnodeSyncStart(SVnode *pVnode);
#ifdef __cplusplus
}
#endif
......
......@@ -14,6 +14,7 @@
*/
#include "vnodeInt.h"
#include "vnodeSync.h"
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
SVnodeInfo info = {0};
......@@ -171,6 +172,16 @@ void vnodeClose(SVnode *pVnode) {
}
}
// start the sync timer after the queue is ready
int32_t vnodeStart(SVnode *pVnode) {
vnodeSyncSetQ(pVnode, NULL);
vnodeSyncSetRpc(pVnode, NULL);
vnodeSyncStart(pVnode);
return 0;
}
void vnodeStop(SVnode *pVnode) {}
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) { pSnapshot->lastApplyIndex = pVnode->state.committed; }
\ No newline at end of file
......@@ -268,7 +268,6 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
int32_t numOfCols = LIST_LENGTH(pNode->pSlots);
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
pBlock->info.numOfCols = numOfCols;
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
pBlock->info.blockId = pNode->dataBlockId;
......@@ -294,6 +293,7 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
taosArrayPush(pBlock->pDataBlock, &idata);
}
pBlock->info.numOfCols = taosArrayGetSize(pBlock->pDataBlock);
return pBlock;
}
......@@ -1032,6 +1032,8 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc
pColInfo->info.bytes = tDataTypes[type].bytes;
pInput->pData[paramIndex] = pColInfo;
} else {
pColInfo = pInput->pData[paramIndex];
}
ASSERT(!IS_VAR_DATA_TYPE(type));
......
......@@ -355,7 +355,7 @@ static int32_t translateToIso8601(SFunctionNode* pFunc, char* pErrBuf, int32_t l
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType = (SDataType) { .bytes = 24, .type = TSDB_DATA_TYPE_BINARY};
pFunc->node.resType = (SDataType) { .bytes = 64, .type = TSDB_DATA_TYPE_BINARY};
return TSDB_CODE_SUCCESS;
}
......
......@@ -489,7 +489,7 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
if (fmIsUserDefinedFunc(node->funcId)) {
return DEAL_RES_CONTINUE;
}
FOREACH(tnode, node->pParameterList) {
if (!SCL_IS_CONST_NODE(tnode)) {
return DEAL_RES_CONTINUE;
......@@ -517,8 +517,8 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
res->node.resType = node->node.resType;
int32_t type = output.columnData->info.type;
if (IS_VAR_DATA_TYPE(type)) {
res->datum.p = output.columnData->pData;
output.columnData->pData = NULL;
res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1);
memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData));
} else {
memcpy(nodesGetValueFromNode(res), output.columnData->pData, tDataTypes[type].bytes);
}
......
......@@ -403,6 +403,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CHECKSUM, "Invalid msg checksum"
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGLEN, "Invalid msg length")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, "Invalid msg type")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_LEADER, "Sync not leader")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error")
// wal
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, "Unexpected generic error in wal")
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted")
......
......@@ -395,7 +395,7 @@ void shellClearScreen(int32_t ecmd_pos, int32_t cursor_pos) {
void shellShowOnScreen(SShellCmd *cmd) {
struct winsize w;
if (ioctl(0, TIOCGWINSZ, &w) < 0 || w.ws_col == 0 || w.ws_row == 0) {
fprintf(stderr, "No stream device\n");
// fprintf(stderr, "No stream device\n");
w.ws_col = 120;
w.ws_row = 30;
}
......
......@@ -750,7 +750,7 @@ void shellReadHistory() {
void shellWriteHistory() {
SShellHistory *pHistory = &shell.history;
TdFilePtr pFile = taosOpenFile(pHistory->file, TD_FILE_WRITE | TD_FILE_STREAM);
TdFilePtr pFile = taosOpenFile(pHistory->file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_STREAM | TD_FILE_APPEND);
if (pFile == NULL) return;
for (int32_t i = pHistory->hstart; i != pHistory->hend;) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册