未验证 提交 24c94957 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #20857 from taosdata/fix/TS-3081

fix:optimize log & change the length of tag if tag is null in schemaless
...@@ -699,7 +699,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, ...@@ -699,7 +699,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
pReq.numOfTags = 1; pReq.numOfTags = 1;
SField field = {0}; SField field = {0};
field.type = TSDB_DATA_TYPE_NCHAR; field.type = TSDB_DATA_TYPE_NCHAR;
field.bytes = 1; field.bytes = TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
strcpy(field.name, tsSmlTagName); strcpy(field.name, tsSmlTagName);
taosArrayPush(pReq.pTags, &field); taosArrayPush(pReq.pTags, &field);
} }
......
...@@ -114,11 +114,11 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -114,11 +114,11 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) { if (code != 0) {
if (terrno != 0) code = terrno; if (terrno != 0) code = terrno;
dGError("vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr(code)); dGError("vnodeProcessFetchMsg vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr());
vmSendRsp(pMsg, code); vmSendRsp(pMsg, code);
} }
dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code); dGTrace("vnodeProcessFetchMsg vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
......
...@@ -296,10 +296,9 @@ void tqCloseReader(STqReader* pReader) { ...@@ -296,10 +296,9 @@ void tqCloseReader(STqReader* pReader) {
int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) { int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) {
if (walReadSeekVer(pReader->pWalReader, ver) < 0) { if (walReadSeekVer(pReader->pWalReader, ver) < 0) {
tqDebug("tmq poll: wal reader failed to seek to ver:%"PRId64" code:%s, %s", ver, tstrerror(terrno), id);
return -1; return -1;
} }
tqDebug("tmq poll: wal reader seek to ver:%"PRId64" %s", ver, id); tqDebug("tmq poll: wal reader seek to ver success ver:%"PRId64" %s", ver, id);
return 0; return 0;
} }
......
...@@ -1062,6 +1062,7 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) { ...@@ -1062,6 +1062,7 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if((pTaskInfo->execModel != OPTR_EXEC_MODEL_QUEUE) || (pTaskInfo->streamInfo.submit.msgStr != NULL)){ if((pTaskInfo->execModel != OPTR_EXEC_MODEL_QUEUE) || (pTaskInfo->streamInfo.submit.msgStr != NULL)){
qError("qStreamSetScanMemData err:%d,%p", pTaskInfo->execModel, pTaskInfo->streamInfo.submit.msgStr); qError("qStreamSetScanMemData err:%d,%p", pTaskInfo->execModel, pTaskInfo->streamInfo.submit.msgStr);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1; return -1;
} }
qDebug("set the submit block for future scan"); qDebug("set the submit block for future scan");
...@@ -1102,7 +1103,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1102,7 +1103,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
// let's seek to the next version in wal file // let's seek to the next version in wal file
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) { if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) {
qError("tqSeekVer failed ver:%"PRId64", %s", pOffset->version + 1, id);
return -1; return -1;
} }
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
...@@ -1125,6 +1125,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1125,6 +1125,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
} else { } else {
taosRUnLockLatch(&pTaskInfo->lock); taosRUnLockLatch(&pTaskInfo->lock);
qError("no table in table list, %s", id); qError("no table in table list, %s", id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1; return -1;
} }
} }
...@@ -1143,6 +1144,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1143,6 +1144,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
} else { } else {
qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid, qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid,
numOfTables, pScanInfo->currentTable, id); numOfTables, pScanInfo->currentTable, id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1; return -1;
} }
...@@ -1175,6 +1177,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1175,6 +1177,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
pScanBaseInfo->cond.twindows.skey = oldSkey; pScanBaseInfo->cond.twindows.skey = oldSkey;
} else { } else {
qError("invalid pOffset->type:%d, %s", pOffset->type, id); qError("invalid pOffset->type:%d, %s", pOffset->type, id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1; return -1;
} }
...@@ -1189,6 +1192,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1189,6 +1192,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
if (setForSnapShot(sContext, pOffset->uid) != 0) { if (setForSnapShot(sContext, pOffset->uid) != 0) {
qError("setDataForSnapShot error. uid:%" PRId64" , %s", pOffset->uid, id); qError("setDataForSnapShot error. uid:%" PRId64" , %s", pOffset->uid, id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1; return -1;
} }
...@@ -1224,6 +1228,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1224,6 +1228,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
SSnapContext* sContext = pInfo->sContext; SSnapContext* sContext = pInfo->sContext;
if (setForSnapShot(sContext, pOffset->uid) != 0) { if (setForSnapShot(sContext, pOffset->uid) != 0) {
qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version); qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1; return -1;
} }
qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64 " %s", pOffset->uid, pOffset->ts, id); qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64 " %s", pOffset->uid, pOffset->ts, id);
......
...@@ -999,6 +999,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan ...@@ -999,6 +999,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id) { SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id) {
if (pOperator == NULL) { if (pOperator == NULL) {
qError("invalid operator, failed to find tableScanOperator %s", id); qError("invalid operator, failed to find tableScanOperator %s", id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return NULL; return NULL;
} }
...@@ -1007,6 +1008,7 @@ SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, con ...@@ -1007,6 +1008,7 @@ SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, con
} else { } else {
if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) { if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
qError("invalid operator, failed to find tableScanOperator %s", id); qError("invalid operator, failed to find tableScanOperator %s", id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return NULL; return NULL;
} }
......
...@@ -207,17 +207,12 @@ int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) { ...@@ -207,17 +207,12 @@ int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) {
return 0; return 0;
} }
// pReader->curInvalid = 1;
// pReader->curVersion = ver;
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId, wInfo("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
ver, pWal->vers.firstVer, pWal->vers.lastVer); ver, pWal->vers.firstVer, pWal->vers.lastVer);
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1; return -1;
} }
// if (ver < pWal->vers.snapshotVer) {
// }
if (walReadSeekVerImpl(pReader, ver) < 0) { if (walReadSeekVerImpl(pReader, ver) < 0) {
return -1; return -1;
...@@ -236,8 +231,6 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { ...@@ -236,8 +231,6 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
if (pRead->curVersion != fetchVer) { if (pRead->curVersion != fetchVer) {
if (walReadSeekVer(pRead, fetchVer) < 0) { if (walReadSeekVer(pRead, fetchVer) < 0) {
// pRead->curVersion = fetchVer;
// pRead->curInvalid = 1;
return -1; return -1;
} }
seeked = true; seeked = true;
...@@ -256,7 +249,6 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { ...@@ -256,7 +249,6 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
} else { } else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
} }
// pRead->curInvalid = 1;
return -1; return -1;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册