未验证 提交 6d788b6b 编写于 作者: wmmhello's avatar wmmhello 提交者: GitHub

Merge pull request #18718 from taosdata/refact/submit_req

fix:merge submit req
......@@ -266,7 +266,7 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag);
// for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf);
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlocks, STSchema* pTSchema, int32_t vgId,
int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t suid);
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
......
......@@ -44,6 +44,12 @@ typedef struct SColData SColData;
#define HAS_VALUE ((uint8_t)0x4)
// bitmap ================================
const static uint8_t BIT1_OR_MAP[8] = {
0b00000001, 0b00000010, 0b00000100, 0b00001000, 0b00010000, 0b00100000, 0b01000000, 0b10000000,
};
const static uint8_t BIT1_AND_MAP[8] = {
0b11111110, 0b11111101, 0b11111011, 0b11110111, 0b11101111, 0b11011111, 0b10111111, 0b01111111,
};
const static uint8_t BIT2_MAP[4][4] = {{0b00000000, 0b00000001, 0b00000010, 0},
{0b00000000, 0b00000100, 0b00001000, 2},
{0b00000000, 0b00010000, 0b00100000, 4},
......@@ -52,7 +58,14 @@ const static uint8_t BIT2_MAP[4][4] = {{0b00000000, 0b00000001, 0b00000010, 0},
#define N1(n) ((((uint8_t)1) << (n)) - 1)
#define BIT1_SIZE(n) ((((n)-1) >> 3) + 1)
#define BIT2_SIZE(n) ((((n)-1) >> 2) + 1)
#define SET_BIT1(p, i, v) ((p)[(i) >> 3] = (p)[(i) >> 3] & N1((i)&7) | (((uint8_t)(v)) << ((i)&7)))
#define SET_BIT1(p, i, v) \
do { \
if (v) { \
(p)[(i) >> 3] |= BIT1_OR_MAP[(i)&7]; \
} else { \
(p)[(i) >> 3] &= BIT1_AND_MAP[(i)&7]; \
} \
} while (0)
#define GET_BIT1(p, i) (((p)[(i) >> 3] >> ((i)&7)) & ((uint8_t)1))
#define SET_BIT2(p, i, v) ((p)[(i) >> 2] = (p)[(i) >> 2] & N1(BIT2_MAP[(i)&3][3]) | BIT2_MAP[(i)&3][(v)])
#define GET_BIT2(p, i) (((p)[(i) >> 2] >> BIT2_MAP[(i)&3][3]) & ((uint8_t)3))
......
......@@ -85,11 +85,12 @@ int32_t qSetSTableIdForRsma(SNode* pStmt, int64_t uid);
void qCleanupKeywordsTable();
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash);
int32_t qResetStmtDataBlock(void* block, bool keepBuf);
int32_t qCloneStmtDataBlock(void** pDst, void* pSrc, bool reset);
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId);
void qDestroyStmtDataBlock(void* pBlock);
STableMeta* qGetTableMetaInDataBlock(void* pDataBlock);
int32_t qResetStmtDataBlock(STableDataCxt* block, bool keepBuf);
int32_t qCloneStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, bool reset);
int32_t qRebuildStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, uint64_t uid, uint64_t suid, int32_t vgId, bool rebuildCreateTb);
void qDestroyStmtDataBlock(STableDataCxt* pBlock);
STableMeta* qGetTableMetaInDataBlock(STableDataCxt* pDataBlock);
int32_t qCloneCurrentTbData(STableDataCxt* pDataBlock, SSubmitTbData **pData);
int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx);
int32_t qStmtParseQuerySql(SParseContext* pCxt, SQuery* pQuery);
......
......@@ -163,6 +163,25 @@ typedef struct STargetInfo {
int32_t vgId;
} STargetInfo;
typedef struct SBoundColInfo {
int16_t *pColIndex; // bound index => schema index
int32_t numOfCols;
int32_t numOfBound;
} SBoundColInfo;
typedef struct STableDataCxt {
STableMeta *pMeta;
STSchema *pSchema;
SBoundColInfo boundColsInfo;
SArray *pValues;
SSubmitTbData *pData;
TSKEY lastTs;
bool ordered;
bool duplicateTs;
} STableDataCxt;
typedef int32_t (*__async_send_cb_fn_t)(void* param, SDataBuf* pMsg, int32_t code);
typedef int32_t (*__async_exec_fn_t)(void* param);
......@@ -238,6 +257,7 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t
char* parseTagDatatoJson(void* p);
int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst);
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst);
int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst);
extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSize, int32_t* msgLen,
void* (*mallocFp)(int64_t));
......
......@@ -21,8 +21,6 @@ extern "C" {
#endif
#include "catalog.h"
typedef void STableDataCxt;
typedef enum {
STMT_TYPE_INSERT = 1,
STMT_TYPE_MULTI_INSERT,
......@@ -74,7 +72,8 @@ typedef struct SStmtExecInfo {
int32_t affectedRows;
SRequestObj *pRequest;
SHashObj *pBlockHash;
bool autoCreateTbl;
STableDataCxt *pCurrBlock;
SSubmitTbData *pCurrTbData;
} SStmtExecInfo;
typedef struct SStmtSQLInfo {
......
......@@ -171,12 +171,11 @@ int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags,
return TSDB_CODE_SUCCESS;
}
int32_t stmtUpdateExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash, bool autoCreateTbl) {
int32_t stmtUpdateExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) {
STscStmt* pStmt = (STscStmt*)stmt;
pStmt->sql.pVgHash = pVgHash;
pStmt->exec.pBlockHash = pBlockHash;
pStmt->exec.autoCreateTbl = autoCreateTbl;
return TSDB_CODE_SUCCESS;
}
......@@ -186,7 +185,7 @@ int32_t stmtUpdateInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, SNam
STscStmt* pStmt = (STscStmt*)stmt;
STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, tbName, sTableName, autoCreateTbl));
STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash, autoCreateTbl));
STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash));
pStmt->sql.autoCreateTbl = autoCreateTbl;
......@@ -241,6 +240,8 @@ int32_t stmtCacheBlock(STscStmt* pStmt) {
}
int32_t stmtParseSql(STscStmt* pStmt) {
pStmt->exec.pCurrBlock = NULL;
SStmtCallback stmtCb = {
.pStmt = pStmt,
.getTbNameFn = stmtGetTbName,
......@@ -293,14 +294,14 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
char* key = taosHashGetKey(pIter, &keyLen);
STableMeta* pMeta = qGetTableMetaInDataBlock(pBlocks);
/*
if (keepTable && (strlen(pStmt->bInfo.tbFName) == keyLen) && strncmp(pStmt->bInfo.tbFName, key, keyLen) == 0) {
if (keepTable && pBlocks == pStmt->exec.pCurrBlock) {
ASSERT(NULL == pBlocks->pData);
TSWAP(pBlocks->pData, pStmt->exec.pCurrTbData);
STMT_ERR_RET(qResetStmtDataBlock(pBlocks, false));
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
continue;
}
*/
qDestroyStmtDataBlock(pBlocks);
taosHashRemove(pStmt->exec.pBlockHash, key, keyLen);
......@@ -308,12 +309,15 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
}
pStmt->exec.autoCreateTbl = false;
if (keepTable) {
return TSDB_CODE_SUCCESS;
}
if (!keepTable) {
taosHashCleanup(pStmt->exec.pBlockHash);
pStmt->exec.pBlockHash = NULL;
}
tDestroySSubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
taosMemoryFreeClear(pStmt->exec.pCurrTbData);
STMT_ERR_RET(stmtCleanBindInfo(pStmt));
......@@ -350,7 +354,7 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
return TSDB_CODE_SUCCESS;
}
int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataCxt* pDataBlock, STableDataCxt** newBlock, uint64_t uid) {
int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataCxt* pDataBlock, STableDataCxt** newBlock, uint64_t uid, uint64_t suid) {
SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
SVgroupInfo vgInfo = {0};
SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
......@@ -362,7 +366,9 @@ int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataCxt* pDataBlock, STableD
STMT_ERR_RET(
taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo)));
STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, vgInfo.vgId));
STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, suid, vgInfo.vgId, pStmt->sql.autoCreateTbl));
STMT_DLOG("tableDataCxt rebuilt, uid:%" PRId64 ", vgId:%d", uid, vgInfo.vgId);
return TSDB_CODE_SUCCESS;
}
......@@ -371,12 +377,14 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
pStmt->bInfo.needParse = true;
pStmt->bInfo.inExecCache = false;
STableDataCxt* pCxtInExec =
STableDataCxt** pCxtInExec =
taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
if (pCxtInExec) {
pStmt->bInfo.needParse = false;
pStmt->bInfo.inExecCache = true;
pStmt->exec.pCurrBlock = *pCxtInExec;
if (pStmt->sql.autoCreateTbl) {
tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
return TSDB_CODE_SUCCESS;
......@@ -403,18 +411,18 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pStmt->bInfo.tbSuid, sizeof(pStmt->bInfo.tbSuid));
if (pCache) {
pStmt->bInfo.needParse = false;
pStmt->exec.autoCreateTbl = true;
pStmt->bInfo.tbUid = 0;
STableDataCxt* pNewBlock = NULL;
STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, 0));
STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, 0, pStmt->bInfo.tbSuid));
if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
POINTER_BYTES)) {
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
pStmt->exec.pCurrBlock = pNewBlock;
tscDebug("reuse stmt block for tb %s in sqlBlock, suid:0x%" PRIx64, pStmt->bInfo.tbFName, pStmt->bInfo.tbSuid);
return TSDB_CODE_SUCCESS;
......@@ -486,13 +494,15 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
pStmt->bInfo.tagsCached = true;
STableDataCxt* pNewBlock = NULL;
STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, uid));
STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, uid, suid));
if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
POINTER_BYTES)) {
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
pStmt->exec.pCurrBlock = pNewBlock;
tscDebug("tb %s in sqlBlock list, set to current", pStmt->bInfo.tbFName);
return TSDB_CODE_SUCCESS;
......@@ -622,8 +632,6 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
pStmt->bInfo.sname.tname, tags, pStmt->exec.pRequest->msgBuf,
pStmt->exec.pRequest->msgBufLen));
pStmt->exec.autoCreateTbl = true;
return TSDB_CODE_SUCCESS;
}
......@@ -725,12 +733,18 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
return TSDB_CODE_SUCCESS;
}
STableDataCxt** pDataBlock =
(STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
STableDataCxt** pDataBlock = NULL;
if (pStmt->exec.pCurrBlock) {
pDataBlock = &pStmt->exec.pCurrBlock;
} else {
pDataBlock= (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
if (NULL == pDataBlock) {
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
pStmt->exec.pCurrBlock = *pDataBlock;
}
if (colIdx < 0) {
int32_t code = qBindStmtColsValue(*pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen);
......@@ -857,7 +871,6 @@ int stmtExec(TAOS_STMT* stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
int32_t code = 0;
SSubmitRsp* pRsp = NULL;
bool autoCreateTbl = pStmt->exec.autoCreateTbl;
STMT_DLOG_E("start to exec");
......@@ -866,8 +879,13 @@ int stmtExec(TAOS_STMT* stmt) {
if (STMT_TYPE_QUERY == pStmt->sql.type) {
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
} else {
tDestroySSubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
taosMemoryFreeClear(pStmt->exec.pCurrTbData);
STMT_ERR_RET(qCloneCurrentTbData(pStmt->exec.pCurrBlock, &pStmt->exec.pCurrTbData));
STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash));
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, (autoCreateTbl ? (void**)&pRsp : NULL));
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
}
if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
......@@ -890,15 +908,6 @@ _return:
stmtCleanExecInfo(pStmt, (code ? false : true), false);
if (TSDB_CODE_SUCCESS == code && autoCreateTbl) {
if (NULL == pRsp) {
tscError("no submit resp got for auto create table");
code = TSDB_CODE_TSC_APP_ERROR;
} else {
code = stmtUpdateTableUid(pStmt, pRsp);
}
}
tFreeSSubmitRsp(pRsp);
++pStmt->sql.runTimes;
......
......@@ -38,7 +38,8 @@ int32_t colDataGetFullLength(const SColumnInfoData* pColumnInfoData, int32_t num
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
return pColumnInfoData->varmeta.length + sizeof(int32_t) * numOfRows;
} else {
return ((pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) ? 0 : pColumnInfoData->info.bytes * numOfRows) + BitmapLen(numOfRows);
return ((pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) ? 0 : pColumnInfoData->info.bytes * numOfRows) +
BitmapLen(numOfRows);
}
}
......@@ -1157,7 +1158,8 @@ void blockDataCleanup(SSDataBlock* pDataBlock) {
}
// todo temporarily disable it
static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows, bool clearPayload) {
static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows,
bool clearPayload) {
ASSERT(numOfRows > 0 /*&& pBlockInfo->capacity >= pBlockInfo->rows*/);
if (numOfRows <= pBlockInfo->capacity) {
return TSDB_CODE_SUCCESS;
......@@ -2009,6 +2011,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
* @param suid
*
*/
#if 0
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlock, STSchema* pTSchema, int32_t vgId,
tb_uid_t suid) {
int32_t bufSize = sizeof(SSubmitReq);
......@@ -2174,6 +2177,167 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB
return TSDB_CODE_SUCCESS;
}
#endif
int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDataBlock, STSchema* pTSchema,
int32_t vgId, tb_uid_t suid) {
SSubmitReq2* pReq = NULL;
SArray* pVals = NULL;
int32_t numOfBlks = 0;
int32_t sz = 1;
terrno = TSDB_CODE_SUCCESS;
if (!(pReq = taosMemoryMalloc(sizeof(SSubmitReq2)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
goto _end;
}
for (int32_t i = 0; i < sz; ++i) {
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows;
if (colNum <= 1) { // invalid if only with TS col
continue;
}
SSubmitTbData* pTbData = (SSubmitTbData*)taosMemoryCalloc(1, sizeof(SSubmitTbData));
if (!pTbData) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
if(!(pTbData->aRowP = taosArrayInit(rows, sizeof(SRow*)))){
taosMemoryFree(pTbData);
goto _end;
}
pTbData->suid = suid;
pTbData->uid = pDataBlock->info.id.groupId;
pTbData->sver = pTSchema->version;
if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
taosArrayDestroy(pTbData->aRowP);
taosMemoryFree(pTbData);
goto _end;
}
for (int32_t j = 0; j < rows; ++j) { // iterate by row
taosArrayClear(pVals);
bool isStartKey = false;
int32_t offset = 0;
for (int32_t k = 0; k < colNum; ++k) { // iterate by column
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
STColumn* pCol = &pTSchema->columns[k];
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP:
ASSERT(pColInfoData->info.type == pCol->type);
if (!isStartKey) {
isStartKey = true;
SColVal cv = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, pCol->type, (SValue){.val = *(TSKEY*)var});
taosArrayPush(pVals, &cv);
} else if (colDataIsNull_s(pColInfoData, j)) {
SColVal cv = COL_VAL_NULL(PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type);
taosArrayPush(pVals, &cv);
} else {
SColVal cv = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, (SValue){.val = *(int64_t*)var});
taosArrayPush(pVals, &cv);
}
break;
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
ASSERT(pColInfoData->info.type == pCol->type);
if (colDataIsNull_s(pColInfoData, j)) {
SColVal cv = COL_VAL_NULL(PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type);
taosArrayPush(pVals, &cv);
} else {
void* data = colDataGetVarData(pColInfoData, j);
SValue sv = (SValue){.nData = varDataLen(data), .pData = varDataVal(data)}; // address copy, no value
SColVal cv = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, pCol->type, sv);
taosArrayPush(pVals, &cv);
}
break;
}
case TSDB_DATA_TYPE_VARBINARY:
case TSDB_DATA_TYPE_DECIMAL:
case TSDB_DATA_TYPE_BLOB:
case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_MEDIUMBLOB:
uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
ASSERT(0);
break;
default:
if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
if (colDataIsNull_s(pColInfoData, j)) {
SColVal cv = COL_VAL_NULL(PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type); // should use pCol->type
taosArrayPush(pVals, &cv);
} else {
SValue sv;
if (pCol->type == pColInfoData->info.type) {
memcpy(&sv.val, var, tDataTypes[pCol->type].bytes);
} else {
/**
* 1. sum/avg would convert to int64_t/uint64_t/double during aggregation
* 2. below conversion may lead to overflow or loss, the app should select the right data type.
*/
char tv[8] = {0};
if (pColInfoData->info.type == TSDB_DATA_TYPE_FLOAT) {
float v = 0;
GET_TYPED_DATA(v, float, pColInfoData->info.type, var);
SET_TYPED_DATA(&tv, pCol->type, v);
} else if (pColInfoData->info.type == TSDB_DATA_TYPE_DOUBLE) {
double v = 0;
GET_TYPED_DATA(v, double, pColInfoData->info.type, var);
SET_TYPED_DATA(&tv, pCol->type, v);
} else if (IS_SIGNED_NUMERIC_TYPE(pColInfoData->info.type)) {
int64_t v = 0;
GET_TYPED_DATA(v, int64_t, pColInfoData->info.type, var);
SET_TYPED_DATA(&tv, pCol->type, v);
} else {
uint64_t v = 0;
GET_TYPED_DATA(v, uint64_t, pColInfoData->info.type, var);
SET_TYPED_DATA(&tv, pCol->type, v);
}
memcpy(&sv.val, tv, tDataTypes[pCol->type].bytes);
}
SColVal cv = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID + k, pColInfoData->info.type, sv);
taosArrayPush(pVals, &cv);
}
} else {
uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
ASSERT(0);
}
break;
}
}
SRow* pRow = NULL;
if ((terrno = tRowBuild(pVals, pTSchema, &pRow)) < 0) {
tDestroySSubmitTbData(pTbData, TSDB_MSG_FLG_ENCODE);
goto _end;
}
ASSERT(pRow);
taosArrayPush(pTbData->aRowP, &pRow);
}
taosArrayPush(pReq->aSubmitTbData, pTbData);
}
_end:
taosArrayDestroy(pVals);
if (terrno != 0) {
*ppReq = NULL;
if (pReq) tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
return TSDB_CODE_FAILED;
}
*ppReq = pReq;
return TSDB_CODE_SUCCESS;
}
char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {
ASSERT(stbFullName[0] != 0);
......
......@@ -63,9 +63,9 @@ static int32_t tGetTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson);
#define KV_FLG_MID ((uint8_t)0x20)
#define KV_FLG_BIG ((uint8_t)0x30)
#define ROW_BIT_NONE ((uint8_t)0x0)
#define ROW_BIT_NULL ((uint8_t)0x1)
#define ROW_BIT_VALUE ((uint8_t)0x2)
#define BIT_FLG_NONE ((uint8_t)0x0)
#define BIT_FLG_NULL ((uint8_t)0x1)
#define BIT_FLG_VALUE ((uint8_t)0x2)
#pragma pack(push, 1)
typedef struct {
......@@ -162,7 +162,7 @@ int32_t tRowBuild(SArray *aColVal, STSchema *pTSchema, SRow **ppRow) {
ntp = sizeof(SRow);
break;
case HAS_VALUE:
ntp = sizeof(SRow) + pTSchema->flen;
ntp = sizeof(SRow) + pTSchema->flen + ntp;
break;
case (HAS_NULL | HAS_NONE):
ntp = sizeof(SRow) + BIT1_SIZE(pTSchema->numOfCols - 1);
......@@ -314,7 +314,7 @@ int32_t tRowBuild(SArray *aColVal, STSchema *pTSchema, SRow **ppRow) {
if (pColVal) {
if (pColVal->cid == pTColumn->colId) {
if (COL_VAL_IS_VALUE(pColVal)) { // VALUE
ROW_SET_BITMAP(pb, flag, iTColumn - 1, ROW_BIT_VALUE);
ROW_SET_BITMAP(pb, flag, iTColumn - 1, BIT_FLG_VALUE);
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
*(int32_t *)(pf + pTColumn->offset) = nv;
......@@ -327,24 +327,24 @@ int32_t tRowBuild(SArray *aColVal, STSchema *pTSchema, SRow **ppRow) {
memcpy(pf + pTColumn->offset, &pColVal->value.val, TYPE_BYTES[pTColumn->type]);
}
} else if (COL_VAL_IS_NONE(pColVal)) { // NONE
ROW_SET_BITMAP(pb, flag, iTColumn - 1, ROW_BIT_NONE);
ROW_SET_BITMAP(pb, flag, iTColumn - 1, BIT_FLG_NONE);
if (pf) memset(pf + pTColumn->offset, 0, TYPE_BYTES[pTColumn->type]);
} else { // NULL
ROW_SET_BITMAP(pb, flag, iTColumn - 1, ROW_BIT_NULL);
ROW_SET_BITMAP(pb, flag, iTColumn - 1, BIT_FLG_NULL);
if (pf) memset(pf + pTColumn->offset, 0, TYPE_BYTES[pTColumn->type]);
}
pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL;
pColVal = (++iColVal < nColVal) ? &colVals[iColVal] : NULL;
} else if (pColVal->cid > pTColumn->colId) { // NONE
ROW_SET_BITMAP(pb, flag, iTColumn - 1, ROW_BIT_NONE);
ROW_SET_BITMAP(pb, flag, iTColumn - 1, BIT_FLG_NONE);
if (pf) memset(pf + pTColumn->offset, 0, TYPE_BYTES[pTColumn->type]);
pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL;
} else {
pColVal = (++iColVal < nColVal) ? &colVals[iColVal] : NULL;
}
} else { // NONE
ROW_SET_BITMAP(pb, flag, iTColumn - 1, ROW_BIT_NONE);
ROW_SET_BITMAP(pb, flag, iTColumn - 1, BIT_FLG_NONE);
if (pf) memset(pf + pTColumn->offset, 0, TYPE_BYTES[pTColumn->type]);
pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL;
}
......@@ -459,7 +459,7 @@ void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) {
} else {
uint8_t *pf;
uint8_t *pv;
uint8_t bv = ROW_BIT_VALUE;
uint8_t bv = BIT_FLG_VALUE;
switch (pRow->flag) {
case (HAS_NULL | HAS_NONE):
......@@ -487,10 +487,10 @@ void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) {
break;
}
if (bv == ROW_BIT_NONE) {
if (bv == BIT_FLG_NONE) {
*pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type);
return;
} else if (bv == ROW_BIT_NULL) {
} else if (bv == BIT_FLG_NULL) {
*pColVal = COL_VAL_NULL(pTColumn->colId, pTColumn->type);
return;
}
......@@ -615,6 +615,7 @@ int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag) {
if (iEnd - iStart > 1) {
code = tRowMergeImpl(aRowP, pTSchema, iStart, iEnd, flag);
if (code) return code;
}
// the array is also changing, so the iStart just ++ instead of iEnd
......@@ -789,7 +790,7 @@ SColVal *tRowIterNext(SRowIter *pIter) {
goto _exit;
}
} else { // Tuple
uint8_t bv = ROW_BIT_VALUE;
uint8_t bv = BIT_FLG_VALUE;
if (pIter->pb) {
switch (pIter->pRow->flag) {
case (HAS_NULL | HAS_NONE):
......@@ -810,10 +811,10 @@ SColVal *tRowIterNext(SRowIter *pIter) {
break;
}
if (bv == ROW_BIT_NONE) {
if (bv == BIT_FLG_NONE) {
pIter->cv = COL_VAL_NONE(pTColumn->colId, pTColumn->type);
goto _exit;
} else if (bv == ROW_BIT_NULL) {
} else if (bv == BIT_FLG_NULL) {
pIter->cv = COL_VAL_NULL(pTColumn->colId, pTColumn->type);
goto _exit;
}
......@@ -947,11 +948,11 @@ static int32_t tRowAppendTupleToColData(SRow *pRow, STSchema *pTSchema, SColData
break;
}
if (bv == ROW_BIT_NONE) {
if (bv == BIT_FLG_NONE) {
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0);
if (code) goto _exit;
goto _continue;
} else if (bv == ROW_BIT_NULL) {
} else if (bv == BIT_FLG_NULL) {
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0);
if (code) goto _exit;
goto _continue;
......@@ -2133,6 +2134,279 @@ _exit:
return code;
}
static int32_t tColDataSwapValue(SColData *pColData, int32_t i, int32_t j) {
int32_t code = 0;
if (IS_VAR_DATA_TYPE(pColData->type)) {
int32_t nData1 = pColData->aOffset[i + 1] - pColData->aOffset[i];
int32_t nData2 = (j < pColData->nVal - 1) ? pColData->aOffset[j + 1] - pColData->aOffset[j]
: pColData->nData - pColData->aOffset[j];
uint8_t *pData = taosMemoryMalloc(TMAX(nData1, nData2));
if (pData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
if (nData1 > nData2) {
memcpy(pData, pColData->pData + pColData->aOffset[i], nData1);
memcpy(pColData->pData + pColData->aOffset[i], pColData->pData + pColData->aOffset[j], nData2);
memmove(pColData->pData + pColData->aOffset[i] + nData2, pColData->pData + pColData->aOffset[i] + nData1,
pColData->aOffset[j] - pColData->aOffset[i + 1]);
memcpy(pColData->pData + pColData->aOffset[j] + nData2 - nData1, pData, nData1);
} else {
memcpy(pData, pColData->pData + pColData->aOffset[j], nData2);
memcpy(pColData->pData + pColData->aOffset[j] + nData2 - nData1, pColData->pData + pColData->aOffset[i], nData1);
memmove(pColData->pData + pColData->aOffset[j] + nData2 - nData1, pColData->pData + pColData->aOffset[i] + nData1,
pColData->aOffset[j] - pColData->aOffset[i + 1]);
memcpy(pColData->pData + pColData->aOffset[i], pData, nData2);
}
for (int32_t k = i + 1; k <= j; ++k) {
pColData->aOffset[k] = pColData->aOffset[k] + nData2 - nData1;
}
taosMemoryFree(pData);
} else {
uint64_t val;
memcpy(&val, &pColData->pData[TYPE_BYTES[pColData->type] * i], TYPE_BYTES[pColData->type]);
memcpy(&pColData->pData[TYPE_BYTES[pColData->type] * i], &pColData->pData[TYPE_BYTES[pColData->type] * j],
TYPE_BYTES[pColData->type]);
memcpy(&pColData->pData[TYPE_BYTES[pColData->type] * j], &val, TYPE_BYTES[pColData->type]);
}
_exit:
return code;
}
static void tColDataSwap(SColData *pColData, int32_t i, int32_t j) {
ASSERT(i < j);
ASSERT(j < pColData->nVal);
switch (pColData->flag) {
case HAS_NONE:
case HAS_NULL:
break;
case (HAS_NULL | HAS_NONE): {
uint8_t bv = GET_BIT1(pColData->pBitMap, i);
SET_BIT1(pColData->pBitMap, i, GET_BIT1(pColData->pBitMap, j));
SET_BIT1(pColData->pBitMap, j, bv);
} break;
case HAS_VALUE: {
tColDataSwapValue(pColData, i, j);
} break;
case (HAS_VALUE | HAS_NONE):
case (HAS_VALUE | HAS_NULL): {
uint8_t bv = GET_BIT1(pColData->pBitMap, i);
SET_BIT1(pColData->pBitMap, i, GET_BIT1(pColData->pBitMap, j));
SET_BIT1(pColData->pBitMap, j, bv);
tColDataSwapValue(pColData, i, j);
} break;
case (HAS_VALUE | HAS_NULL | HAS_NONE): {
uint8_t bv = GET_BIT2(pColData->pBitMap, i);
SET_BIT2(pColData->pBitMap, i, GET_BIT2(pColData->pBitMap, j));
SET_BIT2(pColData->pBitMap, j, bv);
tColDataSwapValue(pColData, i, j);
} break;
default:
ASSERT(0);
break;
}
}
static void tColDataSort(SColData *aColData, int32_t nColData) {
if (aColData[0].nVal == 0) return;
// TODO
}
static void tColDataMergeImpl(SColData *pColData, int32_t iStart, int32_t iEnd /* not included */) {
switch (pColData->flag) {
case HAS_NONE:
case HAS_NULL: {
pColData->nVal = pColData->nVal - (iEnd - iStart - 1);
} break;
case (HAS_NULL | HAS_NONE): {
if (GET_BIT1(pColData->pBitMap, iStart) == BIT_FLG_NONE) {
for (int32_t i = iStart + 1; i < iEnd; ++i) {
if (GET_BIT1(pColData->pBitMap, i) == BIT_FLG_NULL) {
SET_BIT1(pColData->pBitMap, iStart, BIT_FLG_NULL);
break;
}
}
}
for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) {
SET_BIT1(pColData->pBitMap, j, GET_BIT1(pColData->pBitMap, i));
}
pColData->nVal = pColData->nVal - (iEnd - iStart - 1);
uint8_t flag = 0;
for (int32_t i = 0; i < pColData->nVal; ++i) {
uint8_t bv = GET_BIT1(pColData->pBitMap, i);
if (bv == BIT_FLG_NONE) {
flag |= HAS_NONE;
} else if (bv == BIT_FLG_NULL) {
flag |= HAS_NULL;
} else {
ASSERT(0);
}
if (flag == pColData->flag) break;
}
pColData->flag = flag;
} break;
case HAS_VALUE: {
if (IS_VAR_DATA_TYPE(pColData->type)) {
int32_t nDiff = pColData->aOffset[iEnd - 1] - pColData->aOffset[iStart];
memmove(&pColData->pData[pColData->aOffset[iStart]], &pColData->pData[pColData->aOffset[iEnd - 1]],
pColData->nData - pColData->aOffset[iEnd - 1]);
pColData->nData -= nDiff;
for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) {
pColData->aOffset[j] = pColData->aOffset[i] - nDiff;
}
} else {
memmove(&pColData->pData[TYPE_BYTES[pColData->type] * iStart],
&pColData->pData[TYPE_BYTES[pColData->type] * (iEnd - 1)],
TYPE_BYTES[pColData->type] * (pColData->nVal - iEnd + 1));
pColData->nData -= (TYPE_BYTES[pColData->type] * (iEnd - iStart - 1));
}
pColData->nVal = pColData->nVal - (iEnd - iStart - 1);
} break;
case (HAS_VALUE | HAS_NONE): {
uint8_t bv;
int32_t iv;
for (int32_t i = iEnd - 1; i >= iStart; --i) {
bv = GET_BIT1(pColData->pBitMap, i);
if (bv) {
iv = i;
break;
}
}
if (bv) { // has a value
if (IS_VAR_DATA_TYPE(pColData->type)) {
if (iv != iStart) {
memmove(&pColData->pData[pColData->aOffset[iStart]], &pColData->pData[pColData->aOffset[iv]],
iv < (pColData->nVal - 1) ? pColData->aOffset[iv + 1] - pColData->aOffset[iv]
: pColData->nData - pColData->aOffset[iv]);
}
// TODO
ASSERT(0);
} else {
if (iv != iStart) {
memcpy(&pColData->pData[TYPE_BYTES[pColData->type] * iStart],
&pColData->pData[TYPE_BYTES[pColData->type] * iv], TYPE_BYTES[pColData->type]);
}
memmove(&pColData->pData[TYPE_BYTES[pColData->type] * (iStart + 1)],
&pColData->pData[TYPE_BYTES[pColData->type] * iEnd],
TYPE_BYTES[pColData->type] * (iEnd - iStart - 1));
pColData->nData -= (TYPE_BYTES[pColData->type] * (iEnd - iStart - 1));
}
SET_BIT1(pColData->pBitMap, iStart, 1);
for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) {
SET_BIT1(pColData->pBitMap, j, GET_BIT1(pColData->pBitMap, i));
}
uint8_t flag = HAS_VALUE;
for (int32_t i = 0; i < pColData->nVal - (iEnd - iStart - 1); ++i) {
if (GET_BIT1(pColData->pBitMap, i) == 0) {
flag |= HAS_NONE;
}
if (flag == pColData->flag) break;
}
pColData->flag = flag;
} else { // all NONE
if (IS_VAR_DATA_TYPE(pColData->type)) {
int32_t nDiff = pColData->aOffset[iEnd - 1] - pColData->aOffset[iStart];
memmove(&pColData->pData[pColData->aOffset[iStart]], &pColData->pData[pColData->aOffset[iEnd - 1]],
pColData->nData - pColData->aOffset[iEnd - 1]);
pColData->nData -= nDiff;
for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) {
pColData->aOffset[j] = pColData->aOffset[i] - nDiff;
}
} else {
memmove(&pColData->pData[TYPE_BYTES[pColData->type] * iStart],
&pColData->pData[TYPE_BYTES[pColData->type] * (iEnd - 1)],
TYPE_BYTES[pColData->type] * (pColData->nVal - iEnd + 1));
pColData->nData -= (TYPE_BYTES[pColData->type] * (iEnd - iStart - 1));
}
for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) {
SET_BIT1(pColData->pBitMap, j, GET_BIT1(pColData->pBitMap, i));
}
}
pColData->nVal = pColData->nVal - (iEnd - iStart - 1);
} break;
case (HAS_VALUE | HAS_NULL): {
if (IS_VAR_DATA_TYPE(pColData->type)) {
int32_t nDiff = pColData->aOffset[iEnd - 1] - pColData->aOffset[iStart];
memmove(&pColData->pData[pColData->aOffset[iStart]], &pColData->pData[pColData->aOffset[iEnd - 1]],
pColData->nData - pColData->aOffset[iEnd - 1]);
pColData->nData -= nDiff;
for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) {
pColData->aOffset[j] = pColData->aOffset[i] - nDiff;
}
} else {
memmove(&pColData->pData[TYPE_BYTES[pColData->type] * iStart],
&pColData->pData[TYPE_BYTES[pColData->type] * (iEnd - 1)],
TYPE_BYTES[pColData->type] * (pColData->nVal - iEnd + 1));
pColData->nData -= (TYPE_BYTES[pColData->type] * (iEnd - iStart - 1));
}
for (int32_t i = iEnd - 1, j = iStart; i < pColData->nVal; ++i, ++j) {
SET_BIT1(pColData->pBitMap, j, GET_BIT1(pColData->pBitMap, i));
}
pColData->nVal = pColData->nVal - (iEnd - iStart - 1);
uint8_t flag = 0;
for (int32_t i = 0; i < pColData->nVal; ++i) {
uint8_t bv = GET_BIT1(pColData->pBitMap, i);
if (bv) {
flag |= HAS_VALUE;
} else {
flag |= HAS_NULL;
}
if (flag == pColData->flag) break;
}
pColData->flag = flag;
} break;
case (HAS_VALUE | HAS_NULL | HAS_NONE): {
// TODO
ASSERT(0);
pColData->nVal = pColData->nVal - (iEnd - iStart - 1);
} break;
default:
ASSERT(0);
break;
}
}
static void tColDataMerge(SColData *aColData, int32_t nColData) {
int32_t iStart = 0;
for (;;) {
if (iStart >= aColData[0].nVal) break;
int32_t iEnd = iStart + 1;
while (iEnd < aColData[0].nVal) {
if (((TSKEY *)aColData[0].pData)[iEnd] != ((TSKEY *)aColData[0].pData)[iStart]) break;
iEnd++;
}
if (iEnd - iStart > 1) {
for (int32_t i = 0; i < nColData; i++) {
tColDataMergeImpl(&aColData[i], iStart, iEnd);
}
}
iStart++;
}
}
void tColDataSortMerge(SArray *colDataArr) {
int32_t nColData = TARRAY_SIZE(colDataArr);
SColData *aColData = (SColData *)TARRAY_DATA(colDataArr);
......@@ -2160,14 +2434,12 @@ void tColDataSortMerge(SArray *colDataArr) {
// sort -------
if (doSort) {
ASSERT(0);
// todo
tColDataSort(aColData, nColData);
}
// merge -------
if (doMerge) {
ASSERT(0);
// todo
tColDataMerge(aColData, nColData);
}
_exit:
......
......@@ -6867,11 +6867,16 @@ _exit:
}
void tDestroySSubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
if (NULL == pTbData) {
return;
}
if (flag == TSDB_MSG_FLG_ENCODE) {
if (pTbData->pCreateTbReq) {
tdDestroySVCreateTbReq(pTbData->pCreateTbReq);
taosMemoryFree(pTbData->pCreateTbReq);
}
if (flag == TSDB_MSG_FLG_ENCODE) {
if (pTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
int32_t nColData = TARRAY_SIZE(pTbData->aCol);
SColData *aColData = (SColData *)TARRAY_DATA(pTbData->aCol);
......@@ -6890,6 +6895,10 @@ void tDestroySSubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
taosArrayDestroy(pTbData->aRowP);
}
} else if (flag == TSDB_MSG_FLG_DECODE) {
if (pTbData->pCreateTbReq) {
taosMemoryFree(pTbData->pCreateTbReq);
}
if (pTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
taosArrayDestroy(pTbData->aCol);
} else {
......
......@@ -158,8 +158,8 @@ int32_t tsdbCommit(STsdb* pTsdb);
int32_t tsdbFinishCommit(STsdb* pTsdb);
int32_t tsdbRollbackCommit(STsdb* pTsdb);
int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp);
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows);
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
......@@ -197,9 +197,9 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t m
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg);
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema,
int32_t tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema,
SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName,
SBatchDeleteReq* pDeleteReq);
SBatchDeleteReq* pDeleteReq, void** ppData, int32_t* pLen);
// sma
int32_t smaInit();
......@@ -220,7 +220,7 @@ int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq);
int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType);
int32_t tdProcessRSmaSubmit(SSma* pSma, void* pReq, void* pMsg, int32_t len, int32_t inputType);
int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq);
int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore, bool isAdd);
......
......@@ -601,8 +601,8 @@ static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
return TSDB_CODE_FAILED;
}
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
// TODO: spin lock for race conditiond
SSubmitReq2 *pSubmitReq = (SSubmitReq2 *)pReq;
// spin lock for race condition during insert data
if (tsdbInsertData(pTsdb, version, pSubmitReq, NULL) < 0) {
return TSDB_CODE_FAILED;
}
......@@ -610,29 +610,19 @@ static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
return TSDB_CODE_SUCCESS;
}
static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
SSubmitMsgIter msgIter = {0};
SSubmitBlk *pBlock = NULL;
SSubmitBlkIter blkIter = {0};
STSRow *row = NULL;
static int32_t tdFetchSubmitReqSuids(SSubmitReq2 *pMsg, STbUidStore *pStore) {
SArray *pSubmitTbData = pMsg ? pMsg->aSubmitTbData : NULL;
int32_t size = taosArrayGetSize(pSubmitTbData);
terrno = TSDB_CODE_SUCCESS;
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) {
for (int32_t i = 0; i < size; ++i) {
SSubmitTbData *pData = TARRAY_GET_ELEM(pSubmitTbData, i);
if (terrno = tdUidStorePut(pStore, pData->suid, NULL) < 0) {
return -1;
}
while (true) {
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) {
return -1;
}
if (!pBlock) break;
tdUidStorePut(pStore, msgIter.suid, NULL);
}
if (terrno != TSDB_CODE_SUCCESS) {
return -1;
}
return 0;
}
......@@ -712,7 +702,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
output->info.rows);
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
SSubmitReq *pReq = NULL;
SSubmitReq2 *pReq = NULL;
// TODO: the schema update should be handled later(TD-17965)
if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) {
......@@ -723,19 +713,17 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
}
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
taosMemoryFreeClear(pReq);
tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
smaError("vgId:%d, process submit req for rsma suid:%" PRIu64 ", uid:%" PRIu64 " level %" PRIi8
" failed since %s",
SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, terrstr());
goto _err;
}
smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64
" len %" PRIu32,
SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, output->info.version,
htonl(pReq->header.contLen));
smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64,
SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, output->info.version);
taosMemoryFreeClear(pReq);
if(pReq) tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
}
}
......@@ -754,21 +742,20 @@ _err:
*
* @param pSma
* @param pMsg
* @param len
* @param inputType
* @param pInfo
* @param suid
* @return int32_t
*/
static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo,
static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t len, int32_t inputType, SRSmaInfo *pInfo,
tb_uid_t suid) {
const SSubmitReq *pReq = (const SSubmitReq *)pMsg;
void *qItem = taosAllocateQitem(pReq->header.contLen, DEF_QITEM);
void *qItem = taosAllocateQitem(len, DEF_QITEM);
if (!qItem) {
return TSDB_CODE_FAILED;
}
memcpy(qItem, pMsg, pReq->header.contLen);
memcpy(qItem, pMsg, len);
taosWriteQitem(pInfo->queue, qItem);
......@@ -1015,7 +1002,7 @@ static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
* @param suid
* @return int32_t
*/
static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) {
static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t len, int32_t inputType, tb_uid_t suid) {
SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid);
if (!pRSmaInfo) {
smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
......@@ -1023,7 +1010,7 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputTyp
}
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
if (tdExecuteRSmaImplAsync(pSma, pMsg, inputType, pRSmaInfo, suid) < 0) {
if (tdExecuteRSmaImplAsync(pSma, pMsg, len, inputType, pRSmaInfo, suid) < 0) {
tdReleaseRSmaInfo(pSma, pRSmaInfo);
return TSDB_CODE_FAILED;
}
......@@ -1045,7 +1032,7 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputTyp
return TSDB_CODE_SUCCESS;
}
int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
int32_t tdProcessRSmaSubmit(SSma *pSma, void *pReq, void* pMsg, int32_t len, int32_t inputType) {
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
if (!pEnv) {
// only applicable when rsma env exists
......@@ -1059,19 +1046,22 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
}
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
if (tdFetchSubmitReqSuids(pMsg, &uidStore) < 0) {
if (tdFetchSubmitReqSuids(pReq, &uidStore) < 0) {
smaError("vgId:%d, failed to process rsma submit fetch suid since: %s", SMA_VID(pSma), terrstr());
goto _err;
}
if (uidStore.suid != 0) {
if (tdExecuteRSmaAsync(pSma, pMsg, inputType, uidStore.suid) < 0) {
if (tdExecuteRSmaAsync(pSma, pMsg, len, inputType, uidStore.suid) < 0) {
smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr());
goto _err;
}
void *pIter = NULL;
while ((pIter = taosHashIterate(uidStore.uidHash, pIter))) {
tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
if (tdExecuteRSmaAsync(pSma, pMsg, inputType, *pTbSuid) < 0) {
if (tdExecuteRSmaAsync(pSma, pMsg, len, inputType, *pTbSuid) < 0) {
smaError("vgId:%d, failed to process rsma submit exec 2 since: %s", SMA_VID(pSma), terrstr());
goto _err;
}
}
......@@ -1081,7 +1071,6 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
return TSDB_CODE_SUCCESS;
_err:
tdUidStoreDestory(&uidStore);
smaError("vgId:%d, failed to process rsma submit since: %s", SMA_VID(pSma), terrstr());
return TSDB_CODE_FAILED;
}
......
......@@ -204,18 +204,18 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
}
SBatchDeleteReq deleteReq = {0};
SSubmitReq *pSubmitReq =
tqBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, &pTsmaStat->pTSma->schemaTag, true,
pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq);
// TODO deleteReq
taosArrayDestroy(deleteReq.deleteReqs);
void *pSubmitReq = NULL;
int32_t contLen = 0;
if (!pSubmitReq) {
smaError("vgId:%d, failed to gen submit blk while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
if (tqBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, &pTsmaStat->pTSma->schemaTag, true,
pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq, &pSubmitReq, &contLen) < 0) {
smaError("vgId:%d, failed to gen submit msg while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
indexUid, tstrerror(terrno));
goto _err;
}
// TODO deleteReq
taosArrayDestroy(deleteReq.deleteReqs);
#if 0
ASSERT(!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14));
......@@ -224,7 +224,7 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
SRpcMsg submitReqMsg = {
.msgType = TDMT_VND_SUBMIT,
.pCont = pSubmitReq,
.contLen = ntohl(pSubmitReq->length),
.contLen = ntohl(contLen),
};
if (tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &submitReqMsg) < 0) {
......
......@@ -72,6 +72,7 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
return 0;
}
#if 0
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema,
SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName,
SBatchDeleteReq* pDeleteReq) {
......@@ -299,6 +300,203 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
return ret;
}
#endif
int32_t tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema,
SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName,
SBatchDeleteReq* pDeleteReq, void** ppData, int32_t* pLen) {
void* pBuf = NULL;
int32_t len = 0;
SSubmitReq2* pReq = NULL;
SArray* tagArray = NULL;
SArray* createTbArray = NULL;
SArray* pVals = NULL;
int32_t sz = taosArrayGetSize(pBlocks);
if (!(tagArray = taosArrayInit(1, sizeof(STagVal)))) {
goto _end;
}
if (!(createTbArray = taosArrayInit(sz, POINTER_BYTES))) {
goto _end;
}
if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
goto _end;
}
// create table req
if (createTb) {
for (int32_t i = 0; i < sz; ++i) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
SVCreateTbReq* pCreateTbReq = NULL;
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
taosArrayPush(createTbArray, &pCreateTbReq);
continue;
}
if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) {
goto _end;
};
// don't move to the end of loop as to destroy in the end of func when error occur
taosArrayPush(createTbArray, &pCreateTbReq);
// set const
pCreateTbReq->flags = 0;
pCreateTbReq->type = TSDB_CHILD_TABLE;
pCreateTbReq->ctb.suid = suid;
// set super table name
SName name = {0};
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
pCreateTbReq->ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName);
// set tag content
taosArrayClear(tagArray);
STagVal tagVal = {
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.i64 = (int64_t)pDataBlock->info.id.groupId,
};
taosArrayPush(tagArray, &tagVal);
pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray);
STag* pTag = NULL;
tTagNew(tagArray, 1, false, &pTag);
if (pTag == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
// set tag name
SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
char tagNameStr[TSDB_COL_NAME_LEN] = {0};
strcpy(tagNameStr, "group_id");
taosArrayPush(tagName, tagNameStr);
pCreateTbReq->ctb.tagName = tagName;
// set table name
if (pDataBlock->info.parTbName[0]) {
pCreateTbReq->name = strdup(pDataBlock->info.parTbName);
} else {
pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
}
}
}
// SSubmitTbData req
for (int32_t i = 0; i < sz; ++i) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
pDeleteReq->suid = suid;
pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq);
continue;
}
int32_t rows = pDataBlock->info.rows;
SSubmitTbData* pTbData = (SSubmitTbData*)taosMemoryCalloc(1, sizeof(SSubmitTbData));
if (!pTbData) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
if (!(pTbData->aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
taosMemoryFree(pTbData);
goto _end;
}
pTbData->suid = suid;
pTbData->uid = 0; // uid is assigned by vnode
pTbData->sver = pTSchema->version;
tqDebug("tq sink, convert block1 %d, rows: %d", i, rows);
if (createTb) {
pTbData->pCreateTbReq = taosArrayGetP(createTbArray, i);
if (pTbData->pCreateTbReq) pTbData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
}
if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) {
taosArrayDestroy(pTbData->aRowP);
taosMemoryFree(pTbData);
goto _end;
}
for (int32_t j = 0; j < rows; j++) {
taosArrayClear(pVals);
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
const STColumn* pCol = &pTSchema->columns[k];
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
if (colDataIsNull_s(pColData, j)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv);
} else {
void* data = colDataGetData(pColData, j);
if (IS_STR_DATA_TYPE(pCol->type)) {
SValue sv = (SValue){.nData = varDataLen(data), .pData = varDataVal(data)}; // address copy, no value
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
taosArrayPush(pVals, &cv);
} else {
SValue sv;
memcpy(&sv.val, data, tDataTypes[pCol->type].bytes);
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
taosArrayPush(pVals, &cv);
}
}
}
SRow* pRow = NULL;
if ((terrno = tRowBuild(pVals, (STSchema*)pTSchema, &pRow)) < 0) {
tDestroySSubmitTbData(pTbData, TSDB_MSG_FLG_ENCODE);
goto _end;
}
ASSERT(pRow);
taosArrayPush(pTbData->aRowP, &pRow);
}
taosArrayPush(pReq->aSubmitTbData, pTbData);
}
// encode
tEncodeSize(tEncodeSSubmitReq2, pReq, len, terrno);
if (TSDB_CODE_SUCCESS == terrno) {
SEncoder encoder;
len += sizeof(SMsgHead);
pBuf = rpcMallocCont(len);
if (NULL == pBuf) {
goto _end;
}
((SMsgHead*)pBuf)->vgId = htonl(TD_VID(pVnode));
((SMsgHead*)pBuf)->contLen = htonl(len);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead));
if (tEncodeSSubmitReq2(&encoder, pReq) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to encode submit req since %s", terrstr());
}
tEncoderClear(&encoder);
}
_end:
taosArrayDestroy(tagArray);
taosArrayDestroy(pVals);
tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
if (terrno != 0) {
rpcFreeCont(pBuf);
taosArrayDestroy(pDeleteReq->deleteReqs);
return TSDB_CODE_FAILED;
}
if (ppData) *ppData = pBuf;
if (pLen) *pLen = len;
return TSDB_CODE_SUCCESS;
}
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
const SArray* pBlocks = (const SArray*)data;
......
......@@ -26,14 +26,17 @@ static int64_t tsMaxKeyByPrecision[] = {31556995200000L, 31556995200000000L, 921
// static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *pRsp) {
SSubmitMsgIter msgIter = {0};
SSubmitBlk *pBlock = NULL;
int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq2 *pMsg, SSubmitRsp2 *pRsp) {
int32_t arrSize = 0;
int32_t affectedrows = 0;
int32_t numOfRows = 0;
ASSERT(pTsdb->mem != NULL);
if (pMsg) {
arrSize = taosArrayGetSize(pMsg->aSubmitTbData);
}
// scan and convert
if (tsdbScanAndConvertSubmitMsg(pTsdb, pMsg) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) {
......@@ -43,22 +46,10 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *
}
// loop to insert
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) {
for (int32_t i = 0; i < arrSize; ++i) {
if ((terrno = tsdbInsertTableData(pTsdb, version, taosArrayGet(pMsg->aSubmitTbData, i), &affectedrows)) < 0) {
return -1;
}
while (true) {
SSubmitBlkRsp r = {0};
tGetSubmitMsgNext(&msgIter, &pBlock);
if (pBlock == NULL) break;
#if 0
if ((terrno = tsdbInsertTableData(pTsdb, version, &msgIter, pBlock, &r)) < 0) {
return -1;
}
#else
ASSERT(0);
#endif
numOfRows += msgIter.numOfRows;
}
if (pRsp != NULL) {
......@@ -86,9 +77,8 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, STable *pTable, STSRow *
}
#endif
static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, STSRow *row, TSKEY minKey, TSKEY maxKey,
static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, TSKEY rowKey, TSKEY minKey, TSKEY maxKey,
TSKEY now) {
TSKEY rowKey = TD_ROW_KEY(row);
if (rowKey < minKey || rowKey > maxKey) {
tsdbError("vgId:%d, table uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
" maxKey %" PRId64 " row key %" PRId64,
......@@ -100,6 +90,7 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, STSRow *ro
return 0;
}
#if 0
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
ASSERT(pMsg != NULL);
// STsdbMeta * pMeta = pTsdb->tsdbMeta;
......@@ -170,3 +161,43 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
if (terrno != TSDB_CODE_SUCCESS) return -1;
return 0;
}
#endif
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) {
ASSERT(pMsg != NULL);
STsdbKeepCfg *pCfg = &pTsdb->keepCfg;
TSKEY now = taosGetTimestamp(pCfg->precision);
TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2;
TSKEY maxKey = tsMaxKeyByPrecision[pCfg->precision];
int32_t size = taosArrayGetSize(pMsg->aSubmitTbData);
terrno = TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < size; ++i) {
SSubmitTbData *pData = TARRAY_GET_ELEM(pMsg->aSubmitTbData, i);
if (pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
uint64_t nColData = TARRAY_SIZE(pData->aCol);
SColData *aColData = (SColData *)TARRAY_DATA(pData->aCol);
if (nColData > 0) {
int32_t nRows = aColData[0].nVal;
TSKEY *aKey = (TSKEY *)aColData[0].pData;
for (int32_t r = 0; r < nRows; ++r) {
if (tsdbCheckRowRange(pTsdb, pData->uid, aKey[r], minKey, maxKey, now) < 0) {
return -1;
}
}
}
} else {
int32_t nRows = taosArrayGetSize(pData->aRowP);
for (int32_t r = 0; r < nRows; ++r) {
SRow *pRow = (SRow *)taosArrayGetP(pData->aRowP, r);
if (tsdbCheckRowRange(pTsdb, pData->uid, pRow->ts, minKey, maxKey, now) < 0) {
return -1;
}
}
}
}
if (terrno != TSDB_CODE_SUCCESS) return -1;
return 0;
}
\ No newline at end of file
......@@ -236,7 +236,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
break;
/* TSDB */
case TDMT_VND_SUBMIT:
if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
if (vnodeProcessSubmitReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
break;
case TDMT_VND_DELETE:
if (vnodeProcessDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
......@@ -857,6 +857,7 @@ static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock,
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
#if 1
int32_t code = 0;
terrno = 0;
SSubmitReq2 *pSubmitReq = &(SSubmitReq2){0};
SSubmitRsp2 *pSubmitRsp = &(SSubmitRsp2){0};
......@@ -868,7 +869,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
// decode
SDecoder dc = {0};
tDecoderInit(&dc, (char *)pReq + sizeof(SMsgHead), len - sizeof(SMsgHead));
tDecoderInit(&dc, pReq, len);
if (tDecodeSSubmitReq2(&dc, pSubmitReq) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
......@@ -876,6 +877,11 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
tDecoderClear(&dc);
// check
code = tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq);
if (code) {
goto _exit;
}
for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);
......@@ -887,6 +893,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
code = metaGetInfo(pVnode->pMeta, pSubmitTbData->uid, &info, NULL);
if (code) {
code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
vWarn("vgId:%d, table uid:%" PRId64 " not exists", TD_VID(pVnode), pSubmitTbData->uid);
goto _exit;
}
......@@ -904,6 +911,29 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
goto _exit;
}
}
if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
int32_t nColData = TARRAY_SIZE(pSubmitTbData->aCol);
SColData *aColData = (SColData *)TARRAY_DATA(pSubmitTbData->aCol);
if (nColData <= 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
if (aColData[0].cid != PRIMARYKEY_TIMESTAMP_COL_ID || aColData[0].type != TSDB_DATA_TYPE_TIMESTAMP ||
aColData[0].nVal <= 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
for (int32_t i = 1; i < nColData; i++) {
if (aColData[i].nVal != aColData[0].nVal) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
}
}
}
// loop to handle
......@@ -982,6 +1012,7 @@ _exit:
atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1);
if (code == 0) {
atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1);
tdProcessRSmaSubmit(pVnode->pSma, pSubmitReq, pReq, len, STREAM_INPUT__DATA_SUBMIT);
}
// clear
......@@ -989,6 +1020,8 @@ _exit:
tDestroySSubmitReq2(pSubmitReq, TSDB_MSG_FLG_DECODE);
tDestroySSubmitRsp2(pSubmitRsp, TSDB_MSG_FLG_ENCODE);
if (code) terrno = code;
return code;
#else
......
......@@ -141,23 +141,6 @@ int32_t insCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start);
int32_t insBuildOutput(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks);
void insDestroyDataBlock(STableDataBlocks *pDataBlock);
typedef struct SBoundColInfo {
int16_t *pColIndex; // bound index => schema index
int32_t numOfCols;
int32_t numOfBound;
} SBoundColInfo;
typedef struct STableDataCxt {
STableMeta *pMeta;
STSchema *pSchema;
SBoundColInfo boundColsInfo;
SArray *pValues;
SSubmitTbData *pData;
TSKEY lastTs;
bool ordered;
bool duplicateTs;
} STableDataCxt;
typedef struct SVgroupDataCxt {
int32_t vgId;
SSubmitReq2 *pData;
......
......@@ -887,8 +887,8 @@ static int32_t preParseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModif
static int32_t getTableDataCxt(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataCxt** pTableCxt) {
if (pCxt->pComCxt->async) {
return insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pTableMeta->uid, sizeof(pStmt->pTableMeta->uid), pStmt->pTableMeta,
&pStmt->pCreateTblReq, pTableCxt, false);
return insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pTableMeta->uid, sizeof(pStmt->pTableMeta->uid),
pStmt->pTableMeta, &pStmt->pCreateTblReq, pTableCxt, false);
}
char tbFName[TSDB_TABLE_FNAME_LEN];
......@@ -1065,7 +1065,8 @@ static int32_t parseValueTokenImpl(SInsertParseContext* pCxt, const char** pSql,
isnan(dv)) {
return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z);
}
pVal->value.val = *(int64_t*)&dv;
float f = dv;
memcpy(&pVal->value.val, &f, sizeof(f));
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
......@@ -1440,8 +1441,9 @@ static int32_t setStmtInfo(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt)
memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
SStmtCallback* pStmtCb = pCxt->pComCxt->pStmtCb;
int32_t code = (*pStmtCb->setInfoFn)(pStmtCb->pStmt, pStmt->pTableMeta, tags, &pStmt->targetTableName, pStmt->usingTableProcessing,
pStmt->pVgroupsHashObj, pStmt->pTableBlockHashObj, pStmt->usingTableName.tname);
int32_t code = (*pStmtCb->setInfoFn)(pStmtCb->pStmt, pStmt->pTableMeta, tags, &pStmt->targetTableName,
pStmt->usingTableProcessing, pStmt->pVgroupsHashObj, pStmt->pTableBlockHashObj,
pStmt->usingTableName.tname);
memset(&pCxt->tags, 0, sizeof(pCxt->tags));
pStmt->pVgroupsHashObj = NULL;
......
......@@ -22,6 +22,36 @@
#include "ttime.h"
#include "ttypes.h"
typedef struct SKvParam {
int16_t pos;
SArray* pTagVals;
SSchema* schema;
char buf[TSDB_MAX_TAGS_LEN];
} SKvParam;
int32_t qCloneCurrentTbData(STableDataCxt* pDataBlock, SSubmitTbData **pData) {
*pData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
if (NULL == *pData) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SSubmitTbData *pNew = *pData;
*pNew = *pDataBlock->pData;
cloneSVreateTbReq(pDataBlock->pData->pCreateTbReq, &pNew->pCreateTbReq);
pNew->aCol = taosArrayDup(pDataBlock->pData->aCol, NULL);
int32_t colNum = taosArrayGetSize(pNew->aCol);
for (int32_t i = 0; i < colNum; ++i) {
SColData *pCol = (SColData*)taosArrayGet(pNew->aCol, i);
tColDataDeepClear(pCol);
}
return TSDB_CODE_SUCCESS;
}
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) {
int32_t code = TSDB_CODE_SUCCESS;
SArray* pVgDataBlocks = NULL;
......@@ -134,6 +164,14 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch
goto end;
}
if (NULL == pDataBlock->pData->pCreateTbReq) {
pDataBlock->pData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
if (NULL == pDataBlock->pData->pCreateTbReq) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
}
insBuildCreateTbReq(pDataBlock->pData->pCreateTbReq, tName, pTag, suid, sTableName, tagName, pDataBlock->pMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL);
end:
......@@ -285,7 +323,7 @@ _return:
int32_t buildBoundFields(int32_t numOfBound, int16_t* boundColumns, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD_E** fields,
uint8_t timePrec) {
if (fields) {
*fields = taosMemoryCalloc(numOfBound, sizeof(TAOS_FIELD));
*fields = taosMemoryCalloc(numOfBound, sizeof(TAOS_FIELD_E));
if (NULL == *fields) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -350,7 +388,7 @@ int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD_E** fiel
return TSDB_CODE_SUCCESS;
}
int32_t qResetStmtDataBlock(void* block, bool deepClear) {
int32_t qResetStmtDataBlock(STableDataCxt* block, bool deepClear) {
STableDataCxt* pBlock = (STableDataCxt*)block;
int32_t colNum = taosArrayGetSize(pBlock->pData->aCol);
......@@ -366,7 +404,7 @@ int32_t qResetStmtDataBlock(void* block, bool deepClear) {
return TSDB_CODE_SUCCESS;
}
int32_t qCloneStmtDataBlock(void** pDst, void* pSrc, bool reset) {
int32_t qCloneStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, bool reset) {
int32_t code = 0;
*pDst = taosMemoryCalloc(1, sizeof(STableDataCxt));
......@@ -429,7 +467,7 @@ int32_t qCloneStmtDataBlock(void** pDst, void* pSrc, bool reset) {
return code;
}
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId) {
int32_t qRebuildStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, uint64_t uid, uint64_t suid, int32_t vgId, bool rebuildCreateTb) {
int32_t code = qCloneStmtDataBlock(pDst, pSrc, false);
if (code) {
return code;
......@@ -439,14 +477,25 @@ int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgI
if (pBlock->pMeta) {
pBlock->pMeta->uid = uid;
pBlock->pMeta->vgId = vgId;
pBlock->pMeta->suid = suid;
}
pBlock->pData->suid = suid;
pBlock->pData->uid = uid;
if (rebuildCreateTb && NULL == pBlock->pData->pCreateTbReq) {
pBlock->pData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
if (NULL == pBlock->pData->pCreateTbReq) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
STableMeta* qGetTableMetaInDataBlock(void* pDataBlock) { return ((STableDataCxt*)pDataBlock)->pMeta; }
STableMeta* qGetTableMetaInDataBlock(STableDataCxt* pDataBlock) { return ((STableDataCxt*)pDataBlock)->pMeta; }
void qDestroyStmtDataBlock(void* pBlock) {
void qDestroyStmtDataBlock(STableDataCxt* pBlock) {
if (pBlock == NULL) {
return;
}
......
......@@ -1074,6 +1074,7 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat
if (TSDB_CODE_SUCCESS == code) {
*pOutput = pTableCxt;
qDebug("tableDataCxt created, uid:%" PRId64 ", vgId:%d", pTableMeta->uid, pTableMeta->vgId);
} else {
taosMemoryFree(pTableCxt);
}
......@@ -1124,6 +1125,7 @@ void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) {
}
tDestroySSubmitReq2(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pVgCxt->pData);
taosMemoryFree(pVgCxt);
}
......@@ -1181,6 +1183,8 @@ static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCx
taosArrayPush(pVgCxt->pData->aSubmitTbData, pTableCxt->pData);
taosMemoryFreeClear(pTableCxt->pData);
qDebug("add tableDataCxt uid:%" PRId64 " to vgId:%d", pTableCxt->pMeta->uid, pVgCxt->vgId);
return TSDB_CODE_SUCCESS;
}
......@@ -1240,6 +1244,16 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) {
while (TSDB_CODE_SUCCESS == code && NULL != p) {
STableDataCxt* pTableCxt = *(STableDataCxt**)p;
if (colFormat) {
SColData *pCol = taosArrayGet(pTableCxt->pData->aCol, 0);
if (pCol->nVal <= 0) {
p = taosHashIterate(pTableHash, p);
continue;
}
if (pTableCxt->pData->pCreateTbReq) {
pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
}
taosArraySort(pTableCxt->pData->aCol, insColDataComp);
tColDataSortMerge(pTableCxt->pData->aCol);
......@@ -1274,7 +1288,7 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) {
if (TSDB_CODE_SUCCESS == code) {
*pVgDataBlocks = pVgroupList;
} else {
taosArrayDestroy(pVgroupList);
insDestroyVgroupDataCxtList(pVgroupList);
}
return code;
......
......@@ -244,6 +244,7 @@ void destroyQueryExecRes(SExecResult* pRes) {
}
case TDMT_VND_SUBMIT: {
tDestroySSubmitRsp2((SSubmitRsp2*)pRes->res, TSDB_MSG_FLG_DECODE);
taosMemoryFreeClear(pRes->res);
break;
}
case TDMT_SCH_QUERY:
......@@ -486,3 +487,55 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
return TSDB_CODE_SUCCESS;
}
int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst) {
if (NULL == pSrc) {
*pDst = NULL;
return TSDB_CODE_SUCCESS;
}
*pDst = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
if (NULL == *pDst) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
(*pDst)->flags = pSrc->flags;
if (pSrc->name) {
(*pDst)->name = strdup(pSrc->name);
}
(*pDst)->uid = pSrc->uid;
(*pDst)->ctime = pSrc->ctime;
(*pDst)->ttl = pSrc->ttl;
(*pDst)->commentLen = pSrc->commentLen;
if (pSrc->comment) {
(*pDst)->comment = strdup(pSrc->comment);
}
(*pDst)->type = pSrc->type;
if (pSrc->type == TSDB_CHILD_TABLE) {
if (pSrc->ctb.stbName) {
(*pDst)->ctb.stbName = strdup(pSrc->ctb.stbName);
}
(*pDst)->ctb.tagNum = pSrc->ctb.tagNum;
(*pDst)->ctb.suid = pSrc->ctb.suid;
if (pSrc->ctb.tagName) {
(*pDst)->ctb.tagName = taosArrayDup(pSrc->ctb.tagName, NULL);
}
STag* pTag = (STag *)pSrc->ctb.pTag;
if (pTag) {
(*pDst)->ctb.pTag = taosMemoryMalloc(pTag->len);
memcpy((*pDst)->ctb.pTag, pTag, pTag->len);
}
} else {
(*pDst)->ntb.schemaRow.nCols = pSrc->ntb.schemaRow.nCols;
(*pDst)->ntb.schemaRow.version = pSrc->ntb.schemaRow.nCols;
if (pSrc->ntb.schemaRow.nCols > 0 && pSrc->ntb.schemaRow.pSchema) {
(*pDst)->ntb.schemaRow.pSchema = taosMemoryMalloc(pSrc->ntb.schemaRow.nCols * sizeof(SSchema));
memcpy((*pDst)->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.nCols * sizeof(SSchema));
}
}
return TSDB_CODE_SUCCESS;
}
......@@ -262,9 +262,11 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
SSubmitRsp2 *rsp = taosMemoryMalloc(sizeof(*rsp));
tDecoderInit(&coder, msg, msgSize);
code = tDecodeSSubmitRsp2(&coder, rsp);
tDecoderClear(&coder);
if (code) {
SCH_TASK_ELOG("tDecodeSSubmitRsp2 failed, code:%d", code);
tDestroySSubmitRsp2(rsp, TSDB_MSG_FLG_DECODE);
taosMemoryFree(rsp);
SCH_ERR_JRET(code);
}
......@@ -281,11 +283,10 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
if (sum->aCreateTbRsp) {
taosArrayAddAll(sum->aCreateTbRsp, rsp->aCreateTbRsp);
taosArrayDestroy(rsp->aCreateTbRsp);
taosMemoryFree(rsp);
} else {
TSWAP(sum->aCreateTbRsp, rsp->aCreateTbRsp);
taosMemoryFree(rsp);
}
taosMemoryFree(rsp);
} else {
pJob->execRes.res = rsp;
pJob->execRes.msgType = TDMT_VND_SUBMIT;
......@@ -301,6 +302,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
}
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
tDestroySSubmitRsp2(rsp, TSDB_MSG_FLG_DECODE);
taosMemoryFree(rsp);
}
}
......
......@@ -1120,7 +1120,7 @@ int32_t WCSPatternMatch(const TdUcs4 *patterStr, const TdUcs4 *str, size_t size,
return TSDB_PATTERN_NOMATCH;
}
return (str[j] == 0 || j >= size) ? TSDB_PATTERN_MATCH : TSDB_PATTERN_NOMATCH;
return (j >= size || str[j] == 0) ? TSDB_PATTERN_MATCH : TSDB_PATTERN_NOMATCH;
}
int32_t compareStrRegexCompMatch(const void *pLeft, const void *pRight) { return compareStrRegexComp(pLeft, pRight); }
......
......@@ -219,7 +219,7 @@ typedef struct {
int32_t caseRunNum; // total run case num
} CaseCtrl;
#if 1
#if 0
CaseCtrl gCaseCtrl = {
.precision = TIME_PRECISION_MICRO,
.bindNullNum = 0,
......@@ -244,7 +244,7 @@ CaseCtrl gCaseCtrl = {
.funcIdxList = NULL,
.checkParamNum = false,
.runTimes = 0,
.caseIdx = 0,
.caseIdx = 22,
.caseNum = 1,
.caseRunIdx = -1,
.caseRunNum = -1,
......@@ -252,7 +252,7 @@ CaseCtrl gCaseCtrl = {
#endif
#if 0
#if 1
CaseCtrl gCaseCtrl = { // default
.precision = TIME_PRECISION_MILLI,
.bindNullNum = 0,
......@@ -2749,7 +2749,6 @@ void runAll(TAOS *taos) {
printf("%s Begin\n", gCaseCtrl.caseCatalog);
runCaseList(taos);
#if 0
strcpy(gCaseCtrl.caseCatalog, "Micro DB precision Test");
printf("%s Begin\n", gCaseCtrl.caseCatalog);
gCaseCtrl.precision = TIME_PRECISION_MICRO;
......@@ -2805,7 +2804,6 @@ void runAll(TAOS *taos) {
gCaseCtrl.bindColNum = 6;
runCaseList(taos);
gCaseCtrl.bindColNum = 0;
#endif
/*
strcpy(gCaseCtrl.caseCatalog, "Bind Col Type Test");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册