提交 99a84b08 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/3.0_interval_hash_optimize

...@@ -34,7 +34,7 @@ typedef struct { ...@@ -34,7 +34,7 @@ typedef struct {
TXN txn; TXN txn;
} SStreamState; } SStreamState;
SStreamState* streamStateOpen(char* path, SStreamTask* pTask); SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath);
void streamStateClose(SStreamState* pState); void streamStateClose(SStreamState* pState);
int32_t streamStateBegin(SStreamState* pState); int32_t streamStateBegin(SStreamState* pState);
int32_t streamStateCommit(SStreamState* pState); int32_t streamStateCommit(SStreamState* pState);
......
...@@ -212,7 +212,7 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN ...@@ -212,7 +212,7 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN
tmq_commit_async(tmq, res, commit_cb, consumer); tmq_commit_async(tmq, res, commit_cb, consumer);
} }
JNIEXPORT int JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp(JNIEnv *env, jobject jobj, jlong jtmq) { JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp(JNIEnv *env, jobject jobj, jlong jtmq) {
tmq_t *tmq = (tmq_t *)jtmq; tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) { if (tmq == NULL) {
jniError("jobj:%p, tmq is closed", jobj); jniError("jobj:%p, tmq is closed", jobj);
...@@ -222,7 +222,7 @@ JNIEXPORT int JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp( ...@@ -222,7 +222,7 @@ JNIEXPORT int JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp(
return tmq_unsubscribe((tmq_t *)tmq); return tmq_unsubscribe((tmq_t *)tmq);
} }
JNIEXPORT int JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqConsumerCloseImp(JNIEnv *env, jobject jobj, JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqConsumerCloseImp(JNIEnv *env, jobject jobj,
jlong jtmq) { jlong jtmq) {
tmq_t *tmq = (tmq_t *)jtmq; tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) { if (tmq == NULL) {
......
...@@ -878,12 +878,18 @@ int hbMgrInit() { ...@@ -878,12 +878,18 @@ int hbMgrInit() {
clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *)); clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *));
TdThreadMutexAttr attr = {0}; TdThreadMutexAttr attr = {0};
taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
int ret = taosThreadMutexAttrInit(&attr); int ret = taosThreadMutexAttrInit(&attr);
assert(ret == 0); assert(ret == 0);
taosThreadMutexInit(&clientHbMgr.lock, &attr); ret = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
taosThreadMutexAttrDestroy(&attr); assert(ret == 0);
ret = taosThreadMutexInit(&clientHbMgr.lock, &attr);
assert(ret == 0);
ret = taosThreadMutexAttrDestroy(&attr);
assert(ret == 0);
// init handle funcs // init handle funcs
hbMgrInitHandle(); hbMgrInitHandle();
......
...@@ -1446,6 +1446,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) { ...@@ -1446,6 +1446,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
int32_t payloadSize = pageSize - blockDataGetSerialMetaSize(numOfCols); int32_t payloadSize = pageSize - blockDataGetSerialMetaSize(numOfCols);
int32_t rowSize = pBlock->info.rowSize; int32_t rowSize = pBlock->info.rowSize;
int32_t nRows = payloadSize / rowSize; int32_t nRows = payloadSize / rowSize;
ASSERT(nRows >= 1);
// the true value must be less than the value of nRows // the true value must be less than the value of nRows
int32_t additional = 0; int32_t additional = 0;
......
...@@ -760,7 +760,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { ...@@ -760,7 +760,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
// expand executor // expand executor
if (pTask->taskLevel == TASK_LEVEL__SOURCE) { if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask); pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false);
if (pTask->pState == NULL) { if (pTask->pState == NULL) {
return -1; return -1;
} }
...@@ -774,7 +774,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { ...@@ -774,7 +774,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
ASSERT(pTask->exec.executor); ASSERT(pTask->exec.executor);
} else if (pTask->taskLevel == TASK_LEVEL__AGG) { } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask); pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false);
if (pTask->pState == NULL) { if (pTask->pState == NULL) {
return -1; return -1;
} }
......
...@@ -320,6 +320,7 @@ void tLDataIterNextBlock(SLDataIter *pIter) { ...@@ -320,6 +320,7 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
pIter->pSttBlk = NULL; pIter->pSttBlk = NULL;
if (index != -1) { if (index != -1) {
pIter->iSttBlk = index;
pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk); pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
} }
} }
......
...@@ -989,7 +989,8 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod ...@@ -989,7 +989,8 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
if (pNode->output) { if (pNode->output) {
(*numOfOutputCols) += 1; (*numOfOutputCols) += 1;
} else { } else if (info != NULL) {
// select distinct tbname from stb where tbname='abc';
info->output = false; info->output = false;
} }
} }
......
...@@ -5297,12 +5297,12 @@ bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { ...@@ -5297,12 +5297,12 @@ bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
} }
static void doModeAdd(SModeInfo* pInfo, char* data) { static void doModeAdd(SModeInfo* pInfo, char* data) {
int32_t hashKeyBytes = IS_VAR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes; int32_t hashKeyBytes = IS_STR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes;
SModeItem** pHashItem = taosHashGet(pInfo->pHash, data, hashKeyBytes); SModeItem** pHashItem = taosHashGet(pInfo->pHash, data, hashKeyBytes);
if (pHashItem == NULL) { if (pHashItem == NULL) {
int32_t size = sizeof(SModeItem) + pInfo->colBytes; int32_t size = sizeof(SModeItem) + pInfo->colBytes;
SModeItem* pItem = (SModeItem*)(pInfo->pItems + pInfo->numOfPoints * size); SModeItem* pItem = (SModeItem*)(pInfo->pItems + pInfo->numOfPoints * size);
memcpy(pItem->data, data, pInfo->colBytes); memcpy(pItem->data, data, hashKeyBytes);
pItem->count += 1; pItem->count += 1;
taosHashPut(pInfo->pHash, data, hashKeyBytes, &pItem, sizeof(SModeItem*)); taosHashPut(pInfo->pHash, data, hashKeyBytes, &pItem, sizeof(SModeItem*));
......
...@@ -17,6 +17,18 @@ ...@@ -17,6 +17,18 @@
#include "plannodes.h" #include "plannodes.h"
#include "tdatablock.h" #include "tdatablock.h"
#ifndef htonll
#define htonll(x) \
(((int64_t)x & 0x00000000000000ff) << 7 * 8) | (((int64_t)x & 0x000000000000ff00) << 5 * 8) | \
(((int64_t)x & 0x0000000000ff0000) << 3 * 8) | (((int64_t)x & 0x00000000ff000000) << 1 * 8) | \
(((int64_t)x & 0x000000ff00000000) >> 1 * 8) | (((int64_t)x & 0x0000ff0000000000) >> 3 * 8) | \
(((int64_t)x & 0x00ff000000000000) >> 5 * 8) | (((int64_t)x & 0xff00000000000000) >> 7 * 8)
#define ntohll(x) htonll(x)
#endif
#define NODES_MSG_DEFAULT_LEN 1024 #define NODES_MSG_DEFAULT_LEN 1024
#define TLV_TYPE_ARRAY_ELEM 0 #define TLV_TYPE_ARRAY_ELEM 0
...@@ -86,8 +98,8 @@ static int32_t tlvEncodeImpl(STlvEncoder* pEncoder, int16_t type, const void* pV ...@@ -86,8 +98,8 @@ static int32_t tlvEncodeImpl(STlvEncoder* pEncoder, int16_t type, const void* pV
pEncoder->allocSize = pEncoder->allocSize * 2; pEncoder->allocSize = pEncoder->allocSize * 2;
} }
STlv* pTlv = (STlv*)(pEncoder->pBuf + pEncoder->offset); STlv* pTlv = (STlv*)(pEncoder->pBuf + pEncoder->offset);
pTlv->type = type; pTlv->type = htons(type);
pTlv->len = len; pTlv->len = htonl(len);
memcpy(pTlv->value, pValue, len); memcpy(pTlv->value, pValue, len);
pEncoder->offset += tlvLen; pEncoder->offset += tlvLen;
++(pEncoder->tlvCount); ++(pEncoder->tlvCount);
...@@ -117,26 +129,32 @@ static int32_t tlvEncodeValueI8(STlvEncoder* pEncoder, int8_t value) { ...@@ -117,26 +129,32 @@ static int32_t tlvEncodeValueI8(STlvEncoder* pEncoder, int8_t value) {
} }
static int32_t tlvEncodeI16(STlvEncoder* pEncoder, int16_t type, int16_t value) { static int32_t tlvEncodeI16(STlvEncoder* pEncoder, int16_t type, int16_t value) {
value = htons(value);
return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value));
} }
static int32_t tlvEncodeValueI16(STlvEncoder* pEncoder, int16_t value) { static int32_t tlvEncodeValueI16(STlvEncoder* pEncoder, int16_t value) {
value = htons(value);
return tlvEncodeValueImpl(pEncoder, &value, sizeof(value)); return tlvEncodeValueImpl(pEncoder, &value, sizeof(value));
} }
static int32_t tlvEncodeI32(STlvEncoder* pEncoder, int16_t type, int32_t value) { static int32_t tlvEncodeI32(STlvEncoder* pEncoder, int16_t type, int32_t value) {
value = htonl(value);
return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value));
} }
static int32_t tlvEncodeValueI32(STlvEncoder* pEncoder, int32_t value) { static int32_t tlvEncodeValueI32(STlvEncoder* pEncoder, int32_t value) {
value = htonl(value);
return tlvEncodeValueImpl(pEncoder, &value, sizeof(value)); return tlvEncodeValueImpl(pEncoder, &value, sizeof(value));
} }
static int32_t tlvEncodeI64(STlvEncoder* pEncoder, int16_t type, int64_t value) { static int32_t tlvEncodeI64(STlvEncoder* pEncoder, int16_t type, int64_t value) {
value = htonll(value);
return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value));
} }
static int32_t tlvEncodeValueI64(STlvEncoder* pEncoder, int64_t value) { static int32_t tlvEncodeValueI64(STlvEncoder* pEncoder, int64_t value) {
value = htonll(value);
return tlvEncodeValueImpl(pEncoder, &value, sizeof(value)); return tlvEncodeValueImpl(pEncoder, &value, sizeof(value));
} }
...@@ -149,34 +167,44 @@ static int32_t tlvEncodeValueU8(STlvEncoder* pEncoder, uint8_t value) { ...@@ -149,34 +167,44 @@ static int32_t tlvEncodeValueU8(STlvEncoder* pEncoder, uint8_t value) {
} }
static int32_t tlvEncodeU16(STlvEncoder* pEncoder, int16_t type, uint16_t value) { static int32_t tlvEncodeU16(STlvEncoder* pEncoder, int16_t type, uint16_t value) {
value = htons(value);
return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value));
} }
static int32_t tlvEncodeValueU16(STlvEncoder* pEncoder, uint16_t value) { static int32_t tlvEncodeValueU16(STlvEncoder* pEncoder, uint16_t value) {
value = htons(value);
return tlvEncodeValueImpl(pEncoder, &value, sizeof(value)); return tlvEncodeValueImpl(pEncoder, &value, sizeof(value));
} }
static int32_t tlvEncodeU64(STlvEncoder* pEncoder, int16_t type, uint64_t value) { static int32_t tlvEncodeU64(STlvEncoder* pEncoder, int16_t type, uint64_t value) {
value = htonll(value);
return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value));
} }
static int32_t tlvEncodeValueU64(STlvEncoder* pEncoder, uint64_t value) { static int32_t tlvEncodeValueU64(STlvEncoder* pEncoder, uint64_t value) {
value = htonll(value);
return tlvEncodeValueImpl(pEncoder, &value, sizeof(value)); return tlvEncodeValueImpl(pEncoder, &value, sizeof(value));
} }
static int32_t tlvEncodeDouble(STlvEncoder* pEncoder, int16_t type, double value) { static int32_t tlvEncodeDouble(STlvEncoder* pEncoder, int16_t type, double value) {
return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); int64_t temp = *(int64_t*)&value;
temp = htonll(temp);
return tlvEncodeImpl(pEncoder, type, &temp, sizeof(temp));
} }
static int32_t tlvEncodeValueDouble(STlvEncoder* pEncoder, double value) { static int32_t tlvEncodeValueDouble(STlvEncoder* pEncoder, double value) {
return tlvEncodeValueImpl(pEncoder, &value, sizeof(value)); int64_t temp = *(int64_t*)&value;
temp = htonll(temp);
return tlvEncodeValueImpl(pEncoder, &temp, sizeof(temp));
} }
static int32_t tlvEncodeEnum(STlvEncoder* pEncoder, int16_t type, int32_t value) { static int32_t tlvEncodeEnum(STlvEncoder* pEncoder, int16_t type, int32_t value) {
value = htonl(value);
return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value));
} }
static int32_t tlvEncodeValueEnum(STlvEncoder* pEncoder, int32_t value) { static int32_t tlvEncodeValueEnum(STlvEncoder* pEncoder, int32_t value) {
value = htonl(value);
return tlvEncodeValueImpl(pEncoder, &value, sizeof(value)); return tlvEncodeValueImpl(pEncoder, &value, sizeof(value));
} }
...@@ -197,7 +225,7 @@ static int32_t tlvEncodeCStr(STlvEncoder* pEncoder, int16_t type, const char* pV ...@@ -197,7 +225,7 @@ static int32_t tlvEncodeCStr(STlvEncoder* pEncoder, int16_t type, const char* pV
static int32_t tlvEncodeValueCStr(STlvEncoder* pEncoder, const char* pValue) { static int32_t tlvEncodeValueCStr(STlvEncoder* pEncoder, const char* pValue) {
int16_t len = strlen(pValue); int16_t len = strlen(pValue);
int32_t code = tlvEncodeValueImpl(pEncoder, &len, sizeof(len)); int32_t code = tlvEncodeValueI16(pEncoder, len);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueImpl(pEncoder, pValue, len); code = tlvEncodeValueImpl(pEncoder, pValue, len);
} }
...@@ -218,8 +246,8 @@ static int32_t tlvEncodeObj(STlvEncoder* pEncoder, int16_t type, FToMsg func, co ...@@ -218,8 +246,8 @@ static int32_t tlvEncodeObj(STlvEncoder* pEncoder, int16_t type, FToMsg func, co
int32_t code = func(pObj, pEncoder); int32_t code = func(pObj, pEncoder);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
STlv* pTlv = (STlv*)(pEncoder->pBuf + start); STlv* pTlv = (STlv*)(pEncoder->pBuf + start);
pTlv->type = type; pTlv->type = htons(type);
pTlv->len = pEncoder->offset - start - sizeof(STlv); pTlv->len = htonl(pEncoder->offset - start - sizeof(STlv));
} }
++(pEncoder->tlvCount); ++(pEncoder->tlvCount);
return code; return code;
...@@ -236,8 +264,8 @@ static int32_t tlvEncodeObjArray(STlvEncoder* pEncoder, int16_t type, FToMsg fun ...@@ -236,8 +264,8 @@ static int32_t tlvEncodeObjArray(STlvEncoder* pEncoder, int16_t type, FToMsg fun
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
STlv* pTlv = (STlv*)(pEncoder->pBuf + start); STlv* pTlv = (STlv*)(pEncoder->pBuf + start);
pTlv->type = type; pTlv->type = htons(type);
pTlv->len = pEncoder->offset - start - sizeof(STlv); pTlv->len = htonl(pEncoder->offset - start - sizeof(STlv));
} }
} }
return code; return code;
...@@ -259,6 +287,8 @@ static int32_t tlvGetNextTlv(STlvDecoder* pDecoder, STlv** pTlv) { ...@@ -259,6 +287,8 @@ static int32_t tlvGetNextTlv(STlvDecoder* pDecoder, STlv** pTlv) {
} }
*pTlv = (STlv*)(pDecoder->pBuf + pDecoder->offset); *pTlv = (STlv*)(pDecoder->pBuf + pDecoder->offset);
(*pTlv)->type = ntohs((*pTlv)->type);
(*pTlv)->len = ntohl((*pTlv)->len);
if ((*pTlv)->len + pDecoder->offset > pDecoder->bufSize) { if ((*pTlv)->len + pDecoder->offset > pDecoder->bufSize) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -291,22 +321,52 @@ static int32_t tlvDecodeValueI8(STlvDecoder* pDecoder, int8_t* pValue) { ...@@ -291,22 +321,52 @@ static int32_t tlvDecodeValueI8(STlvDecoder* pDecoder, int8_t* pValue) {
return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue));
} }
static int32_t tlvDecodeI16(STlv* pTlv, int16_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } static int32_t tlvDecodeI16(STlv* pTlv, int16_t* pValue) {
int32_t code = tlvDecodeImpl(pTlv, pValue, sizeof(*pValue));
if (TSDB_CODE_SUCCESS == code) {
*pValue = ntohs(*pValue);
}
return code;
}
static int32_t tlvDecodeValueI16(STlvDecoder* pDecoder, int16_t* pValue) { static int32_t tlvDecodeValueI16(STlvDecoder* pDecoder, int16_t* pValue) {
return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); int32_t code = tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue));
if (TSDB_CODE_SUCCESS == code) {
*pValue = ntohs(*pValue);
}
return code;
} }
static int32_t tlvDecodeI32(STlv* pTlv, int32_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } static int32_t tlvDecodeI32(STlv* pTlv, int32_t* pValue) {
int32_t code = tlvDecodeImpl(pTlv, pValue, sizeof(*pValue));
if (TSDB_CODE_SUCCESS == code) {
*pValue = ntohl(*pValue);
}
return code;
}
static int32_t tlvDecodeValueI32(STlvDecoder* pDecoder, int32_t* pValue) { static int32_t tlvDecodeValueI32(STlvDecoder* pDecoder, int32_t* pValue) {
return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); int32_t code = tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue));
if (TSDB_CODE_SUCCESS == code) {
*pValue = ntohl(*pValue);
}
return code;
} }
static int32_t tlvDecodeI64(STlv* pTlv, int64_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } static int32_t tlvDecodeI64(STlv* pTlv, int64_t* pValue) {
int32_t code = tlvDecodeImpl(pTlv, pValue, sizeof(*pValue));
if (TSDB_CODE_SUCCESS == code) {
*pValue = ntohll(*pValue);
}
return code;
}
static int32_t tlvDecodeValueI64(STlvDecoder* pDecoder, int64_t* pValue) { static int32_t tlvDecodeValueI64(STlvDecoder* pDecoder, int64_t* pValue) {
return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); int32_t code = tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue));
if (TSDB_CODE_SUCCESS == code) {
*pValue = ntohll(*pValue);
}
return code;
} }
static int32_t tlvDecodeU8(STlv* pTlv, uint8_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } static int32_t tlvDecodeU8(STlv* pTlv, uint8_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); }
...@@ -315,22 +375,54 @@ static int32_t tlvDecodeValueU8(STlvDecoder* pDecoder, uint8_t* pValue) { ...@@ -315,22 +375,54 @@ static int32_t tlvDecodeValueU8(STlvDecoder* pDecoder, uint8_t* pValue) {
return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue));
} }
static int32_t tlvDecodeU16(STlv* pTlv, uint16_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } static int32_t tlvDecodeU16(STlv* pTlv, uint16_t* pValue) {
int32_t code = tlvDecodeImpl(pTlv, pValue, sizeof(*pValue));
if (TSDB_CODE_SUCCESS == code) {
*pValue = ntohs(*pValue);
}
return code;
}
static int32_t tlvDecodeValueU16(STlvDecoder* pDecoder, uint16_t* pValue) { static int32_t tlvDecodeValueU16(STlvDecoder* pDecoder, uint16_t* pValue) {
return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); int32_t code = tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue));
if (TSDB_CODE_SUCCESS == code) {
*pValue = ntohs(*pValue);
}
return code;
} }
static int32_t tlvDecodeU64(STlv* pTlv, uint64_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } static int32_t tlvDecodeU64(STlv* pTlv, uint64_t* pValue) {
int32_t code = tlvDecodeImpl(pTlv, pValue, sizeof(*pValue));
if (TSDB_CODE_SUCCESS == code) {
*pValue = ntohll(*pValue);
}
return code;
}
static int32_t tlvDecodeValueU64(STlvDecoder* pDecoder, uint64_t* pValue) { static int32_t tlvDecodeValueU64(STlvDecoder* pDecoder, uint64_t* pValue) {
return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); int32_t code = tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue));
if (TSDB_CODE_SUCCESS == code) {
*pValue = ntohll(*pValue);
}
return code;
} }
static int32_t tlvDecodeDouble(STlv* pTlv, double* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } static int32_t tlvDecodeDouble(STlv* pTlv, double* pValue) {
int64_t temp = 0;
int32_t code = tlvDecodeI64(pTlv, &temp);
if (TSDB_CODE_SUCCESS == code) {
*pValue = *(double*)&temp;
}
return code;
}
static int32_t tlvDecodeValueDouble(STlvDecoder* pDecoder, double* pValue) { static int32_t tlvDecodeValueDouble(STlvDecoder* pDecoder, double* pValue) {
return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); int64_t temp = 0;
int32_t code = tlvDecodeValueI64(pDecoder, &temp);
if (TSDB_CODE_SUCCESS == code) {
*pValue = *(double*)&temp;
}
return code;
} }
static int32_t convertIntegerType(int32_t value, void* pValue, int16_t len) { static int32_t convertIntegerType(int32_t value, void* pValue, int16_t len) {
...@@ -2462,33 +2554,54 @@ static int32_t msgToPhysiWindowNode(STlvDecoder* pDecoder, void* pObj) { ...@@ -2462,33 +2554,54 @@ static int32_t msgToPhysiWindowNode(STlvDecoder* pDecoder, void* pObj) {
return code; return code;
} }
enum { enum { PHY_INTERVAL_CODE_WINDOW = 1, PHY_INTERVAL_CODE_INLINE_ATTRS };
PHY_INTERVAL_CODE_WINDOW = 1,
PHY_INTERVAL_CODE_INTERVAL, static int32_t physiIntervalNodeInlineToMsg(const void* pObj, STlvEncoder* pEncoder) {
PHY_INTERVAL_CODE_OFFSET, const SIntervalPhysiNode* pNode = (const SIntervalPhysiNode*)pObj;
PHY_INTERVAL_CODE_SLIDING,
PHY_INTERVAL_CODE_INTERVAL_UNIT, int32_t code = tlvEncodeValueI64(pEncoder, pNode->interval);
PHY_INTERVAL_CODE_SLIDING_UNIT if (TSDB_CODE_SUCCESS == code) {
}; code = tlvEncodeValueI64(pEncoder, pNode->offset);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI64(pEncoder, pNode->sliding);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI8(pEncoder, pNode->intervalUnit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI8(pEncoder, pNode->slidingUnit);
}
return code;
}
static int32_t physiIntervalNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t physiIntervalNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SIntervalPhysiNode* pNode = (const SIntervalPhysiNode*)pObj; const SIntervalPhysiNode* pNode = (const SIntervalPhysiNode*)pObj;
int32_t code = tlvEncodeObj(pEncoder, PHY_INTERVAL_CODE_WINDOW, physiWindowNodeToMsg, &pNode->window); int32_t code = tlvEncodeObj(pEncoder, PHY_INTERVAL_CODE_WINDOW, physiWindowNodeToMsg, &pNode->window);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI64(pEncoder, PHY_INTERVAL_CODE_INTERVAL, pNode->interval); code = tlvEncodeObj(pEncoder, PHY_INTERVAL_CODE_INLINE_ATTRS, physiIntervalNodeInlineToMsg, pNode);
} }
return code;
}
static int32_t msgToPhysiIntervalNodeInline(STlvDecoder* pDecoder, void* pObj) {
SIntervalPhysiNode* pNode = (SIntervalPhysiNode*)pObj;
int32_t code = tlvDecodeValueI64(pDecoder, &pNode->interval);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI64(pEncoder, PHY_INTERVAL_CODE_OFFSET, pNode->offset); code = tlvDecodeValueI64(pDecoder, &pNode->offset);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI64(pEncoder, PHY_INTERVAL_CODE_SLIDING, pNode->sliding); code = tlvDecodeValueI64(pDecoder, &pNode->sliding);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI8(pEncoder, PHY_INTERVAL_CODE_INTERVAL_UNIT, pNode->intervalUnit); code = tlvDecodeValueI8(pDecoder, &pNode->intervalUnit);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI8(pEncoder, PHY_INTERVAL_CODE_SLIDING_UNIT, pNode->slidingUnit); code = tlvDecodeValueI8(pDecoder, &pNode->slidingUnit);
} }
return code; return code;
...@@ -2504,20 +2617,8 @@ static int32_t msgToPhysiIntervalNode(STlvDecoder* pDecoder, void* pObj) { ...@@ -2504,20 +2617,8 @@ static int32_t msgToPhysiIntervalNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_INTERVAL_CODE_WINDOW: case PHY_INTERVAL_CODE_WINDOW:
code = tlvDecodeObjFromTlv(pTlv, msgToPhysiWindowNode, &pNode->window); code = tlvDecodeObjFromTlv(pTlv, msgToPhysiWindowNode, &pNode->window);
break; break;
case PHY_INTERVAL_CODE_INTERVAL: case PHY_INTERVAL_CODE_INLINE_ATTRS:
code = tlvDecodeI64(pTlv, &pNode->interval); code = tlvDecodeObjFromTlv(pTlv, msgToPhysiIntervalNodeInline, pNode);
break;
case PHY_INTERVAL_CODE_OFFSET:
code = tlvDecodeI64(pTlv, &pNode->offset);
break;
case PHY_INTERVAL_CODE_SLIDING:
code = tlvDecodeI64(pTlv, &pNode->sliding);
break;
case PHY_INTERVAL_CODE_INTERVAL_UNIT:
code = tlvDecodeI8(pTlv, &pNode->intervalUnit);
break;
case PHY_INTERVAL_CODE_SLIDING_UNIT:
code = tlvDecodeI8(pTlv, &pNode->slidingUnit);
break; break;
default: default:
break; break;
......
...@@ -473,10 +473,11 @@ class PlannerTestBaseImpl { ...@@ -473,10 +473,11 @@ class PlannerTestBaseImpl {
cout << "nodesNodeToMsg: " cout << "nodesNodeToMsg: "
<< chrono::duration_cast<chrono::microseconds>(chrono::steady_clock::now() - start).count() << "us" << endl; << chrono::duration_cast<chrono::microseconds>(chrono::steady_clock::now() - start).count() << "us" << endl;
string copyStr(pStr, len);
SNode* pNode = NULL; SNode* pNode = NULL;
char* pNewStr = NULL; char* pNewStr = NULL;
int32_t newlen = 0; int32_t newlen = 0;
DO_WITH_THROW(nodesMsgToNode, pStr, len, &pNode) DO_WITH_THROW(nodesMsgToNode, copyStr.c_str(), len, &pNode)
DO_WITH_THROW(nodesNodeToMsg, pNode, &pNewStr, &newlen) DO_WITH_THROW(nodesNodeToMsg, pNode, &pNewStr, &newlen)
if (newlen != len || 0 != memcmp(pStr, pNewStr, len)) { if (newlen != len || 0 != memcmp(pStr, pNewStr, len)) {
cout << "nodesNodeToMsg error!!!!!!!!!!!!!! len = " << len << ", newlen = " << newlen << endl; cout << "nodesNodeToMsg error!!!!!!!!!!!!!! len = " << len << ", newlen = " << newlen << endl;
......
...@@ -18,14 +18,19 @@ ...@@ -18,14 +18,19 @@
#include "tcommon.h" #include "tcommon.h"
#include "ttimer.h" #include "ttimer.h"
SStreamState* streamStateOpen(char* path, SStreamTask* pTask) { SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) {
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
if (pState == NULL) { if (pState == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
char statePath[300]; char statePath[300];
sprintf(statePath, "%s/%d", path, pTask->taskId); if (!specPath) {
sprintf(statePath, "%s/%d", path, pTask->taskId);
} else {
memcpy(statePath, path, 300);
}
if (tdbOpen(statePath, 4096, 256, &pState->db) < 0) { if (tdbOpen(statePath, 4096, 256, &pState->db) < 0) {
goto _err; goto _err;
} }
......
...@@ -841,6 +841,10 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx ...@@ -841,6 +841,10 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
// copy content to the parent page // copy content to the parent page
tdbBtreeInitPage(pParent, &(SBtreeInitPageArg){.flags = flags, .pBt = pBt}, 0); tdbBtreeInitPage(pParent, &(SBtreeInitPageArg){.flags = flags, .pBt = pBt}, 0);
tdbPageCopy(pNews[0], pParent, 1); tdbPageCopy(pNews[0], pParent, 1);
if (!TDB_BTREE_PAGE_IS_LEAF(pNews[0])) {
((SIntHdr *)(pParent->pData))->pgno = ((SIntHdr *)(pNews[0]->pData))->pgno;
}
} }
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
......
...@@ -260,7 +260,7 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { ...@@ -260,7 +260,7 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
pPage->isDirty = 0; pPage->isDirty = 0;
// tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
tdbPCacheRelease(pPager->pCache, pPage, pTxn); tdbPCacheRelease(pPager->pCache, pPage, pTxn);
} }
...@@ -353,7 +353,7 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) { ...@@ -353,7 +353,7 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
pPage->isDirty = 0; pPage->isDirty = 0;
// tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
tdbPCacheRelease(pPager->pCache, pPage, pTxn); tdbPCacheRelease(pPager->pCache, pPage, pTxn);
} }
......
...@@ -268,7 +268,7 @@ int walRollFileInfo(SWal* pWal) { ...@@ -268,7 +268,7 @@ int walRollFileInfo(SWal* pWal) {
char* walMetaSerialize(SWal* pWal) { char* walMetaSerialize(SWal* pWal) {
char buf[30]; char buf[30];
ASSERT(pWal->fileInfoSet); ASSERT(pWal->fileInfoSet);
int sz = pWal->fileInfoSet->size; int sz = taosArrayGetSize(pWal->fileInfoSet);
cJSON* pRoot = cJSON_CreateObject(); cJSON* pRoot = cJSON_CreateObject();
cJSON* pMeta = cJSON_CreateObject(); cJSON* pMeta = cJSON_CreateObject();
cJSON* pFiles = cJSON_CreateArray(); cJSON* pFiles = cJSON_CreateArray();
...@@ -384,8 +384,10 @@ static int walFindCurMetaVer(SWal* pWal) { ...@@ -384,8 +384,10 @@ static int walFindCurMetaVer(SWal* pWal) {
int code = regexec(&walMetaRegexPattern, name, 0, NULL, 0); int code = regexec(&walMetaRegexPattern, name, 0, NULL, 0);
if (code == 0) { if (code == 0) {
sscanf(name, "meta-ver%d", &metaVer); sscanf(name, "meta-ver%d", &metaVer);
wDebug("vgId:%d, wal find current meta: %s is the meta file, ver %d", pWal->cfg.vgId, name, metaVer);
break; break;
} }
wDebug("vgId:%d, wal find current meta: %s is not meta file", pWal->cfg.vgId, name);
} }
taosCloseDir(&pDir); taosCloseDir(&pDir);
regfree(&walMetaRegexPattern); regfree(&walMetaRegexPattern);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册