提交 1c46958c 编写于 作者: C Cary Xu

enh: check max/sum/avg func for rsma

上级 9b348a4a
......@@ -232,7 +232,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress);
const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData);
void blockDebugShowData(const SArray* dataBlocks);
void blockDebugShowData(const SArray* dataBlocks, const char* flag);
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t suid);
......
......@@ -1488,7 +1488,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
return buf;
}
void blockDebugShowData(const SArray* dataBlocks) {
void blockDebugShowData(const SArray* dataBlocks, const char* flag) {
char pBuf[128] = {0};
int32_t sz = taosArrayGetSize(dataBlocks);
for (int32_t i = 0; i < sz; i++) {
......@@ -1496,7 +1496,7 @@ void blockDebugShowData(const SArray* dataBlocks) {
int32_t colNum = pDataBlock->info.numOfCols;
int32_t rows = pDataBlock->info.rows;
for (int32_t j = 0; j < rows; j++) {
printf("|");
printf("%s |", flag);
for (int32_t k = 0; k < colNum; k++) {
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
......@@ -1521,8 +1521,11 @@ void blockDebugShowData(const SArray* dataBlocks) {
case TSDB_DATA_TYPE_UBIGINT:
printf(" %15lu |", *(uint64_t*)var);
break;
case TSDB_DATA_TYPE_FLOAT:
printf(" %15f |", *(float*)var);
break;
case TSDB_DATA_TYPE_DOUBLE:
printf(" %15f |", *(double*)var);
printf(" %15lf |", *(double*)var);
break;
}
}
......@@ -1550,8 +1553,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
bufSize += sizeof(SSubmitBlk);
}
ASSERT(bufSize < 3 * 1024 * 1024);
*pReq = taosMemoryCalloc(1, bufSize);
if (!(*pReq)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -1562,7 +1563,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
int32_t msgLen = sizeof(SSubmitReq);
int32_t numOfBlks = 0;
SRowBuilder rb = {0};
tdSRowInit(&rb, pTSchema->version); // TODO: use the latest version
tdSRowInit(&rb, pTSchema->version);
for (int32_t i = 0; i < sz; ++i) {
SSDataBlock* pDataBlock = taosArrayGet(pDataBlocks, i);
......@@ -1580,18 +1581,17 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
pSubmitBlk->uid = pDataBlock->info.groupId;
pSubmitBlk->numOfRows = rows;
++numOfBlks;
msgLen += sizeof(SSubmitBlk);
int32_t dataLen = 0;
for (int32_t j = 0; j < rows; ++j) { // iterate by row
tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen)); // set row buf
printf("|");
bool isStartKey = false;
int32_t offset = 0;
for (int32_t k = 0; k < colNum; ++k) { // iterate by column
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
STColumn* pCol = &pTSchema->columns[k];
ASSERT(pCol->type == pColInfoData->info.type);
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP:
if (!isStartKey) {
......@@ -1600,29 +1600,29 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
offset, k);
} else {
tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, offset, k);
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, offset, k);
}
break;
case TSDB_DATA_TYPE_NCHAR: {
tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_NCHAR, TD_VTYPE_NORM, var, true, offset, k);
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_NCHAR, TD_VTYPE_NORM, var, true, offset, k);
break;
}
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_VARCHAR, TD_VTYPE_NORM, var, true, offset, k);
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_VARCHAR, TD_VTYPE_NORM, var, true, offset, k);
break;
}
case TSDB_DATA_TYPE_VARBINARY:
case TSDB_DATA_TYPE_DECIMAL:
case TSDB_DATA_TYPE_BLOB:
case TSDB_DATA_TYPE_MEDIUMBLOB:
printf("the column type %" PRIi16 " is defined but not implemented yet\n", pColInfoData->info.type);
uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
TASSERT(0);
break;
default:
if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
tdAppendColValToRow(&rb, 2, pColInfoData->info.type, TD_VTYPE_NORM, var, true, offset, k);
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pColInfoData->info.type, TD_VTYPE_NORM, var, true, offset, k);
} else {
printf("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
TASSERT(0);
}
break;
......@@ -1630,7 +1630,13 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
offset += TYPE_BYTES[pColInfoData->info.type];
}
dataLen += TD_ROW_LEN(rb.pBuf);
#ifdef TD_DEBUG_PRINT_ROW
tdSRowPrint(rb.pBuf, pTSchema, __func__);
#endif
}
++numOfBlks;
pSubmitBlk->dataLen = dataLen;
msgLen += pSubmitBlk->dataLen;
}
......
......@@ -400,7 +400,11 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3
}
if (taosArrayGetSize(pResult) > 0) {
blockDebugShowData(pResult);
#if 1
char flag[10] = {0};
snprintf(flag, 10, "level %" PRIi8, level);
blockDebugShowData(pResult, flag);
#endif
STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2);
SSubmitReq *pReq = NULL;
if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) != 0) {
......@@ -444,7 +448,7 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb
}
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
// TODO: use the proper schema instead of 0, and cache STSchema in cache
// TODO: cache STSchema
STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, -1);
if (!pTSchema) {
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
......
......@@ -297,8 +297,8 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo
tSkipListPutBatchByIter(pTbData->pData, &blkIter, (iter_next_fn_t)tGetSubmitBlkNext);
#ifdef TD_DEBUG_PRINT_ROW
printf("!!! %s:%d table %" PRIi64 " has %d rows in skiplist\n\n", __func__, __LINE__, pTbData->uid,
SL_SIZE(pTbData->pData));
printf("!!! %s:%d vgId:%d dir:%s table:%" PRIi64 " has %d rows in skiplist\n\n", __func__, __LINE__,
TD_VID(pTsdb->pVnode), pTsdb->dir, pTbData->uid, SL_SIZE(pTbData->pData));
#endif
// Set statistics
......
......@@ -1661,7 +1661,11 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
}
#ifdef TD_DEBUG_PRINT_ROW
tdSRowPrint(row1, pSchema1, __func__);
char flags[70] = {0};
STsdb* pTsdb = pTsdbReadHandle->rhelper.pRepo;
snprintf(flags, 70, "%s:%d vgId:%d dir:%s row1%s=NULL,row2%s=NULL", __func__, __LINE__, TD_VID(pTsdb->pVnode),
pTsdb->dir, row1 ? "!" : "", row2 ? "!" : "");
tdSRowPrint(row1, pSchema1, flags);
#endif
if (isRow1DataRow) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册