提交 13476682 编写于 作者: P plum-lihui

Merge branch '3.0' of github.com:taosdata/TDengine into 3.0

...@@ -268,6 +268,7 @@ struct SVnode { ...@@ -268,6 +268,7 @@ struct SVnode {
tsem_t canCommit; tsem_t canCommit;
int64_t sync; int64_t sync;
int32_t blockCount; int32_t blockCount;
bool restored;
tsem_t syncSem; tsem_t syncSem;
SQHandle* pQuery; SQHandle* pQuery;
}; };
......
...@@ -1395,10 +1395,26 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { ...@@ -1395,10 +1395,26 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) {
break; break;
case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_BOOL:
break; break;
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:{
pColAgg->sum += colVal.value.i8;
if (pColAgg->min > colVal.value.i8) {
pColAgg->min = colVal.value.i8;
}
if (pColAgg->max < colVal.value.i8) {
pColAgg->max = colVal.value.i8;
}
break; break;
case TSDB_DATA_TYPE_SMALLINT: }
case TSDB_DATA_TYPE_SMALLINT:{
pColAgg->sum += colVal.value.i16;
if (pColAgg->min > colVal.value.i16) {
pColAgg->min = colVal.value.i16;
}
if (pColAgg->max < colVal.value.i16) {
pColAgg->max = colVal.value.i16;
}
break; break;
}
case TSDB_DATA_TYPE_INT: { case TSDB_DATA_TYPE_INT: {
pColAgg->sum += colVal.value.i32; pColAgg->sum += colVal.value.i32;
if (pColAgg->min > colVal.value.i32) { if (pColAgg->min > colVal.value.i32) {
...@@ -1419,24 +1435,79 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { ...@@ -1419,24 +1435,79 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) {
} }
break; break;
} }
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:{
pColAgg->sum += colVal.value.f;
if (pColAgg->min > colVal.value.f) {
pColAgg->min = colVal.value.f;
}
if (pColAgg->max < colVal.value.f) {
pColAgg->max = colVal.value.f;
}
break; break;
case TSDB_DATA_TYPE_DOUBLE: }
case TSDB_DATA_TYPE_DOUBLE:{
pColAgg->sum += colVal.value.d;
if (pColAgg->min > colVal.value.d) {
pColAgg->min = colVal.value.d;
}
if (pColAgg->max < colVal.value.d) {
pColAgg->max = colVal.value.d;
}
break; break;
}
case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARCHAR:
break; break;
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:{
if (pColAgg->min > colVal.value.i64) {
pColAgg->min = colVal.value.i64;
}
if (pColAgg->max < colVal.value.i64) {
pColAgg->max = colVal.value.i64;
}
break; break;
}
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
break; break;
case TSDB_DATA_TYPE_UTINYINT: case TSDB_DATA_TYPE_UTINYINT:{
pColAgg->sum += colVal.value.u8;
if (pColAgg->min > colVal.value.u8) {
pColAgg->min = colVal.value.u8;
}
if (pColAgg->max < colVal.value.u8) {
pColAgg->max = colVal.value.u8;
}
break; break;
case TSDB_DATA_TYPE_USMALLINT: }
case TSDB_DATA_TYPE_USMALLINT:{
pColAgg->sum += colVal.value.u16;
if (pColAgg->min > colVal.value.u16) {
pColAgg->min = colVal.value.u16;
}
if (pColAgg->max < colVal.value.u16) {
pColAgg->max = colVal.value.u16;
}
break; break;
case TSDB_DATA_TYPE_UINT: }
case TSDB_DATA_TYPE_UINT:{
pColAgg->sum += colVal.value.u32;
if (pColAgg->min > colVal.value.u32) {
pColAgg->min = colVal.value.u32;
}
if (pColAgg->max < colVal.value.u32) {
pColAgg->max = colVal.value.u32;
}
break; break;
case TSDB_DATA_TYPE_UBIGINT: }
case TSDB_DATA_TYPE_UBIGINT:{
pColAgg->sum += colVal.value.u64;
if (pColAgg->min > colVal.value.u64) {
pColAgg->min = colVal.value.u64;
}
if (pColAgg->max < colVal.value.u64) {
pColAgg->max = colVal.value.u64;
}
break; break;
}
case TSDB_DATA_TYPE_JSON: case TSDB_DATA_TYPE_JSON:
break; break;
case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_VARBINARY:
......
...@@ -224,9 +224,19 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) ...@@ -224,9 +224,19 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg, vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg,
isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle); isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);
if (!pVnode->restored) {
vGError("vgId:%d, msg:%p failed to process since not leader", vgId, pMsg);
terrno = TSDB_CODE_APP_NOT_READY;
vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_APP_NOT_READY);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
continue;
}
if (pMsgArr == NULL || pIsWeakArr == NULL) { if (pMsgArr == NULL || pIsWeakArr == NULL) {
vGError("vgId:%d, msg:%p failed to process since out of memory", vgId, pMsg); vGError("vgId:%d, msg:%p failed to process since out of memory", vgId, pMsg);
vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_OUT_OF_MEMORY); terrno = TSDB_CODE_OUT_OF_MEMORY;
vnodeHandleProposeError(pVnode, pMsg, terrno);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
continue; continue;
...@@ -609,6 +619,12 @@ static void vnodeLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsm ...@@ -609,6 +619,12 @@ static void vnodeLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsm
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
} }
static void vnodeRestoreFinish(struct SSyncFSM *pFsm) {
SVnode *pVnode = pFsm->data;
pVnode->restored = true;
vDebug("vgId:%d, sync restore finished", pVnode->config.vgId);
}
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
pFsm->data = pVnode; pFsm->data = pVnode;
...@@ -616,7 +632,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { ...@@ -616,7 +632,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg; pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
pFsm->FpRollBackCb = vnodeSyncRollBackMsg; pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot; pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
pFsm->FpRestoreFinishCb = NULL; pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
pFsm->FpLeaderTransferCb = vnodeLeaderTransfer; pFsm->FpLeaderTransferCb = vnodeLeaderTransfer;
pFsm->FpReConfigCb = vnodeSyncReconfig; pFsm->FpReConfigCb = vnodeSyncReconfig;
pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead; pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
...@@ -670,11 +686,10 @@ bool vnodeIsLeader(SVnode *pVnode) { ...@@ -670,11 +686,10 @@ bool vnodeIsLeader(SVnode *pVnode) {
return false; return false;
} }
// todo if (!pVnode->restored) {
// if (!pVnode->restored) { terrno = TSDB_CODE_APP_NOT_READY;
// terrno = TSDB_CODE_APP_NOT_READY; return false;
// return false; }
// }
return true; return true;
} }
\ No newline at end of file
...@@ -416,6 +416,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { ...@@ -416,6 +416,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
} }
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
atomic_store_64(&pTaskInfo->owner, 0);
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo)); qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -69,7 +69,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys ...@@ -69,7 +69,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
// todo remove it soon // todo remove it soon
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) { if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
pInfo->mergeDataBlocks = true; pInfo->mergeDataBlocks = false;
} }
int32_t numOfRows = 4096; int32_t numOfRows = 4096;
......
...@@ -1532,7 +1532,7 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { ...@@ -1532,7 +1532,7 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
} }
uint8_t resType; uint8_t resType;
if (IS_SIGNED_NUMERIC_TYPE(colType) || TSDB_DATA_TYPE_BOOL == colType) { if (IS_SIGNED_NUMERIC_TYPE(colType) || TSDB_DATA_TYPE_BOOL == colType || TSDB_DATA_TYPE_TIMESTAMP == colType) {
resType = TSDB_DATA_TYPE_BIGINT; resType = TSDB_DATA_TYPE_BIGINT;
} else { } else {
resType = TSDB_DATA_TYPE_DOUBLE; resType = TSDB_DATA_TYPE_DOUBLE;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册