提交 ed11b141 编写于 作者: L Liu Jicong

Merge remote-tracking branch 'refs/remotes/origin/refact/submit_req' into refact/submit_req

...@@ -44,18 +44,24 @@ typedef struct SColData SColData; ...@@ -44,18 +44,24 @@ typedef struct SColData SColData;
#define HAS_VALUE ((uint8_t)0x4) #define HAS_VALUE ((uint8_t)0x4)
// bitmap ================================ // bitmap ================================
const static uint8_t BIT2_MAP[4][4] = {{0b00000000, 0b00000001, 0b00000010, 0}, const static uint8_t BIT1_MAP[8] = {0b11111110, 0b11111101, 0b11111011, 0b11110111,
{0b00000000, 0b00000100, 0b00001000, 2}, 0b11101111, 0b11011111, 0b10111111, 0b01111111};
{0b00000000, 0b00010000, 0b00100000, 4},
{0b00000000, 0b01000000, 0b10000000, 6}}; const static uint8_t BIT2_MAP[4] = {0b11111100, 0b11110011, 0b11001111, 0b00111111};
#define N1(n) ((((uint8_t)1) << (n)) - 1) #define ONE ((uint8_t)1)
#define BIT1_SIZE(n) ((((n)-1) >> 3) + 1) #define THREE ((uint8_t)3)
#define BIT2_SIZE(n) ((((n)-1) >> 2) + 1) #define DIV_8(i) ((i) >> 3)
#define SET_BIT1(p, i, v) ((p)[(i) >> 3] = (p)[(i) >> 3] & N1((i)&7) | (((uint8_t)(v)) << ((i)&7))) #define MOD_8(i) ((i)&7)
#define GET_BIT1(p, i) (((p)[(i) >> 3] >> ((i)&7)) & ((uint8_t)1)) #define DIV_4(i) ((i) >> 2)
#define SET_BIT2(p, i, v) ((p)[(i) >> 2] = (p)[(i) >> 2] & N1(BIT2_MAP[(i)&3][3]) | BIT2_MAP[(i)&3][(v)]) #define MOD_4(i) ((i)&3)
#define GET_BIT2(p, i) (((p)[(i) >> 2] >> BIT2_MAP[(i)&3][3]) & ((uint8_t)3)) #define MOD_4_TIME_2(i) (MOD_4(i) << 1)
#define BIT1_SIZE(n) (DIV_8((n)-1) + 1)
#define BIT2_SIZE(n) (DIV_4((n)-1) + 1)
#define SET_BIT1(p, i, v) ((p)[DIV_8(i)] = (p)[DIV_8(i)] & BIT1_MAP[MOD_8(i)] | ((v) << MOD_8(i)))
#define GET_BIT1(p, i) (((p)[DIV_8(i)] >> MOD_8(i)) & ONE)
#define SET_BIT2(p, i, v) ((p)[DIV_4(i)] = (p)[DIV_4(i)] & BIT2_MAP[MOD_4(i)] | ((v) << MOD_4_TIME_2(i)))
#define GET_BIT2(p, i) (((p)[DIV_4(i)] >> MOD_4_TIME_2(i)) & THREE)
// SBuffer ================================ // SBuffer ================================
struct SBuffer { struct SBuffer {
......
...@@ -87,7 +87,7 @@ void qCleanupKeywordsTable(); ...@@ -87,7 +87,7 @@ void qCleanupKeywordsTable();
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash); int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash);
int32_t qResetStmtDataBlock(STableDataCxt* block, bool keepBuf); int32_t qResetStmtDataBlock(STableDataCxt* block, bool keepBuf);
int32_t qCloneStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, bool reset); int32_t qCloneStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, bool reset);
int32_t qRebuildStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, uint64_t uid, int32_t vgId, bool rebuildCreateTb); int32_t qRebuildStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, uint64_t uid, uint64_t suid, int32_t vgId, bool rebuildCreateTb);
void qDestroyStmtDataBlock(STableDataCxt* pBlock); void qDestroyStmtDataBlock(STableDataCxt* pBlock);
STableMeta* qGetTableMetaInDataBlock(STableDataCxt* pDataBlock); STableMeta* qGetTableMetaInDataBlock(STableDataCxt* pDataBlock);
int32_t qCloneCurrentTbData(STableDataCxt* pDataBlock, SSubmitTbData **pData); int32_t qCloneCurrentTbData(STableDataCxt* pDataBlock, SSubmitTbData **pData);
......
...@@ -240,6 +240,8 @@ int32_t stmtCacheBlock(STscStmt* pStmt) { ...@@ -240,6 +240,8 @@ int32_t stmtCacheBlock(STscStmt* pStmt) {
} }
int32_t stmtParseSql(STscStmt* pStmt) { int32_t stmtParseSql(STscStmt* pStmt) {
pStmt->exec.pCurrBlock = NULL;
SStmtCallback stmtCb = { SStmtCallback stmtCb = {
.pStmt = pStmt, .pStmt = pStmt,
.getTbNameFn = stmtGetTbName, .getTbNameFn = stmtGetTbName,
...@@ -352,7 +354,7 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) { ...@@ -352,7 +354,7 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataCxt* pDataBlock, STableDataCxt** newBlock, uint64_t uid) { int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataCxt* pDataBlock, STableDataCxt** newBlock, uint64_t uid, uint64_t suid) {
SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp); SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
SVgroupInfo vgInfo = {0}; SVgroupInfo vgInfo = {0};
SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter, SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
...@@ -364,7 +366,9 @@ int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataCxt* pDataBlock, STableD ...@@ -364,7 +366,9 @@ int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataCxt* pDataBlock, STableD
STMT_ERR_RET( STMT_ERR_RET(
taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo))); taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo)));
STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, vgInfo.vgId, pStmt->sql.autoCreateTbl)); STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, suid, vgInfo.vgId, pStmt->sql.autoCreateTbl));
STMT_DLOG("tableDataCxt rebuilt, uid:%" PRId64 ", vgId:%d", uid, vgInfo.vgId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -410,7 +414,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { ...@@ -410,7 +414,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
pStmt->bInfo.tbUid = 0; pStmt->bInfo.tbUid = 0;
STableDataCxt* pNewBlock = NULL; STableDataCxt* pNewBlock = NULL;
STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, 0)); STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, 0, pStmt->bInfo.tbSuid));
if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock, if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
POINTER_BYTES)) { POINTER_BYTES)) {
...@@ -490,7 +494,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { ...@@ -490,7 +494,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
pStmt->bInfo.tagsCached = true; pStmt->bInfo.tagsCached = true;
STableDataCxt* pNewBlock = NULL; STableDataCxt* pNewBlock = NULL;
STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, uid)); STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, uid, suid));
if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock, if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
POINTER_BYTES)) { POINTER_BYTES)) {
...@@ -596,8 +600,6 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) { ...@@ -596,8 +600,6 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
STMT_ERR_RET(stmtGetFromCache(pStmt)); STMT_ERR_RET(stmtGetFromCache(pStmt));
if (pStmt->bInfo.needParse) { if (pStmt->bInfo.needParse) {
pStmt->exec.pCurrBlock = NULL;
strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1); strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1);
pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0; pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
......
...@@ -63,9 +63,9 @@ static int32_t tGetTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson); ...@@ -63,9 +63,9 @@ static int32_t tGetTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson);
#define KV_FLG_MID ((uint8_t)0x20) #define KV_FLG_MID ((uint8_t)0x20)
#define KV_FLG_BIG ((uint8_t)0x30) #define KV_FLG_BIG ((uint8_t)0x30)
#define ROW_BIT_NONE ((uint8_t)0x0) #define BIT_FLG_NONE ((uint8_t)0x0)
#define ROW_BIT_NULL ((uint8_t)0x1) #define BIT_FLG_NULL ((uint8_t)0x1)
#define ROW_BIT_VALUE ((uint8_t)0x2) #define BIT_FLG_VALUE ((uint8_t)0x2)
#pragma pack(push, 1) #pragma pack(push, 1)
typedef struct { typedef struct {
...@@ -314,7 +314,7 @@ int32_t tRowBuild(SArray *aColVal, STSchema *pTSchema, SRow **ppRow) { ...@@ -314,7 +314,7 @@ int32_t tRowBuild(SArray *aColVal, STSchema *pTSchema, SRow **ppRow) {
if (pColVal) { if (pColVal) {
if (pColVal->cid == pTColumn->colId) { if (pColVal->cid == pTColumn->colId) {
if (COL_VAL_IS_VALUE(pColVal)) { // VALUE if (COL_VAL_IS_VALUE(pColVal)) { // VALUE
ROW_SET_BITMAP(pb, flag, iTColumn - 1, ROW_BIT_VALUE); ROW_SET_BITMAP(pb, flag, iTColumn - 1, BIT_FLG_VALUE);
if (IS_VAR_DATA_TYPE(pTColumn->type)) { if (IS_VAR_DATA_TYPE(pTColumn->type)) {
*(int32_t *)(pf + pTColumn->offset) = nv; *(int32_t *)(pf + pTColumn->offset) = nv;
...@@ -327,24 +327,24 @@ int32_t tRowBuild(SArray *aColVal, STSchema *pTSchema, SRow **ppRow) { ...@@ -327,24 +327,24 @@ int32_t tRowBuild(SArray *aColVal, STSchema *pTSchema, SRow **ppRow) {
memcpy(pf + pTColumn->offset, &pColVal->value.val, TYPE_BYTES[pTColumn->type]); memcpy(pf + pTColumn->offset, &pColVal->value.val, TYPE_BYTES[pTColumn->type]);
} }
} else if (COL_VAL_IS_NONE(pColVal)) { // NONE } else if (COL_VAL_IS_NONE(pColVal)) { // NONE
ROW_SET_BITMAP(pb, flag, iTColumn - 1, ROW_BIT_NONE); ROW_SET_BITMAP(pb, flag, iTColumn - 1, BIT_FLG_NONE);
if (pf) memset(pf + pTColumn->offset, 0, TYPE_BYTES[pTColumn->type]); if (pf) memset(pf + pTColumn->offset, 0, TYPE_BYTES[pTColumn->type]);
} else { // NULL } else { // NULL
ROW_SET_BITMAP(pb, flag, iTColumn - 1, ROW_BIT_NULL); ROW_SET_BITMAP(pb, flag, iTColumn - 1, BIT_FLG_NULL);
if (pf) memset(pf + pTColumn->offset, 0, TYPE_BYTES[pTColumn->type]); if (pf) memset(pf + pTColumn->offset, 0, TYPE_BYTES[pTColumn->type]);
} }
pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL; pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL;
pColVal = (++iColVal < nColVal) ? &colVals[iColVal] : NULL; pColVal = (++iColVal < nColVal) ? &colVals[iColVal] : NULL;
} else if (pColVal->cid > pTColumn->colId) { // NONE } else if (pColVal->cid > pTColumn->colId) { // NONE
ROW_SET_BITMAP(pb, flag, iTColumn - 1, ROW_BIT_NONE); ROW_SET_BITMAP(pb, flag, iTColumn - 1, BIT_FLG_NONE);
if (pf) memset(pf + pTColumn->offset, 0, TYPE_BYTES[pTColumn->type]); if (pf) memset(pf + pTColumn->offset, 0, TYPE_BYTES[pTColumn->type]);
pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL; pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL;
} else { } else {
pColVal = (++iColVal < nColVal) ? &colVals[iColVal] : NULL; pColVal = (++iColVal < nColVal) ? &colVals[iColVal] : NULL;
} }
} else { // NONE } else { // NONE
ROW_SET_BITMAP(pb, flag, iTColumn - 1, ROW_BIT_NONE); ROW_SET_BITMAP(pb, flag, iTColumn - 1, BIT_FLG_NONE);
if (pf) memset(pf + pTColumn->offset, 0, TYPE_BYTES[pTColumn->type]); if (pf) memset(pf + pTColumn->offset, 0, TYPE_BYTES[pTColumn->type]);
pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL; pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL;
} }
...@@ -459,7 +459,7 @@ void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) { ...@@ -459,7 +459,7 @@ void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) {
} else { } else {
uint8_t *pf; uint8_t *pf;
uint8_t *pv; uint8_t *pv;
uint8_t bv = ROW_BIT_VALUE; uint8_t bv = BIT_FLG_VALUE;
switch (pRow->flag) { switch (pRow->flag) {
case (HAS_NULL | HAS_NONE): case (HAS_NULL | HAS_NONE):
...@@ -487,10 +487,10 @@ void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) { ...@@ -487,10 +487,10 @@ void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) {
break; break;
} }
if (bv == ROW_BIT_NONE) { if (bv == BIT_FLG_NONE) {
*pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type); *pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type);
return; return;
} else if (bv == ROW_BIT_NULL) { } else if (bv == BIT_FLG_NULL) {
*pColVal = COL_VAL_NULL(pTColumn->colId, pTColumn->type); *pColVal = COL_VAL_NULL(pTColumn->colId, pTColumn->type);
return; return;
} }
...@@ -615,6 +615,7 @@ int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag) { ...@@ -615,6 +615,7 @@ int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag) {
if (iEnd - iStart > 1) { if (iEnd - iStart > 1) {
code = tRowMergeImpl(aRowP, pTSchema, iStart, iEnd, flag); code = tRowMergeImpl(aRowP, pTSchema, iStart, iEnd, flag);
if (code) return code;
} }
// the array is also changing, so the iStart just ++ instead of iEnd // the array is also changing, so the iStart just ++ instead of iEnd
...@@ -789,7 +790,7 @@ SColVal *tRowIterNext(SRowIter *pIter) { ...@@ -789,7 +790,7 @@ SColVal *tRowIterNext(SRowIter *pIter) {
goto _exit; goto _exit;
} }
} else { // Tuple } else { // Tuple
uint8_t bv = ROW_BIT_VALUE; uint8_t bv = BIT_FLG_VALUE;
if (pIter->pb) { if (pIter->pb) {
switch (pIter->pRow->flag) { switch (pIter->pRow->flag) {
case (HAS_NULL | HAS_NONE): case (HAS_NULL | HAS_NONE):
...@@ -810,10 +811,10 @@ SColVal *tRowIterNext(SRowIter *pIter) { ...@@ -810,10 +811,10 @@ SColVal *tRowIterNext(SRowIter *pIter) {
break; break;
} }
if (bv == ROW_BIT_NONE) { if (bv == BIT_FLG_NONE) {
pIter->cv = COL_VAL_NONE(pTColumn->colId, pTColumn->type); pIter->cv = COL_VAL_NONE(pTColumn->colId, pTColumn->type);
goto _exit; goto _exit;
} else if (bv == ROW_BIT_NULL) { } else if (bv == BIT_FLG_NULL) {
pIter->cv = COL_VAL_NULL(pTColumn->colId, pTColumn->type); pIter->cv = COL_VAL_NULL(pTColumn->colId, pTColumn->type);
goto _exit; goto _exit;
} }
...@@ -947,11 +948,11 @@ static int32_t tRowAppendTupleToColData(SRow *pRow, STSchema *pTSchema, SColData ...@@ -947,11 +948,11 @@ static int32_t tRowAppendTupleToColData(SRow *pRow, STSchema *pTSchema, SColData
break; break;
} }
if (bv == ROW_BIT_NONE) { if (bv == BIT_FLG_NONE) {
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0); code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0);
if (code) goto _exit; if (code) goto _exit;
goto _continue; goto _continue;
} else if (bv == ROW_BIT_NULL) { } else if (bv == BIT_FLG_NULL) {
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0); code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0);
if (code) goto _exit; if (code) goto _exit;
goto _continue; goto _continue;
...@@ -2133,6 +2134,304 @@ _exit: ...@@ -2133,6 +2134,304 @@ _exit:
return code; return code;
} }
static int32_t tColDataSwapValue(SColData *pColData, int32_t i, int32_t j) {
int32_t code = 0;
if (IS_VAR_DATA_TYPE(pColData->type)) {
int32_t nData1 = pColData->aOffset[i + 1] - pColData->aOffset[i];
int32_t nData2 = (j < pColData->nVal - 1) ? pColData->aOffset[j + 1] - pColData->aOffset[j]
: pColData->nData - pColData->aOffset[j];
uint8_t *pData = taosMemoryMalloc(TMAX(nData1, nData2));
if (pData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
if (nData1 > nData2) {
memcpy(pData, pColData->pData + pColData->aOffset[i], nData1);
memcpy(pColData->pData + pColData->aOffset[i], pColData->pData + pColData->aOffset[j], nData2);
memmove(pColData->pData + pColData->aOffset[i] + nData2, pColData->pData + pColData->aOffset[i] + nData1,
pColData->aOffset[j] - pColData->aOffset[i + 1]);
memcpy(pColData->pData + pColData->aOffset[j] + nData2 - nData1, pData, nData1);
} else {
memcpy(pData, pColData->pData + pColData->aOffset[j], nData2);
memcpy(pColData->pData + pColData->aOffset[j] + nData2 - nData1, pColData->pData + pColData->aOffset[i], nData1);
memmove(pColData->pData + pColData->aOffset[j] + nData2 - nData1, pColData->pData + pColData->aOffset[i] + nData1,
pColData->aOffset[j] - pColData->aOffset[i + 1]);
memcpy(pColData->pData + pColData->aOffset[i], pData, nData2);
}
for (int32_t k = i + 1; k <= j; ++k) {
pColData->aOffset[k] = pColData->aOffset[k] + nData2 - nData1;
}
taosMemoryFree(pData);
} else {
uint64_t val;
memcpy(&val, &pColData->pData[TYPE_BYTES[pColData->type] * i], TYPE_BYTES[pColData->type]);
memcpy(&pColData->pData[TYPE_BYTES[pColData->type] * i], &pColData->pData[TYPE_BYTES[pColData->type] * j],
TYPE_BYTES[pColData->type]);
memcpy(&pColData->pData[TYPE_BYTES[pColData->type] * j], &val, TYPE_BYTES[pColData->type]);
}
_exit:
return code;
}
static void tColDataSwap(SColData *pColData, int32_t i, int32_t j) {
ASSERT(i < j);
ASSERT(j < pColData->nVal);
switch (pColData->flag) {
case HAS_NONE:
case HAS_NULL:
break;
case (HAS_NULL | HAS_NONE): {
uint8_t bv = GET_BIT1(pColData->pBitMap, i);
SET_BIT1(pColData->pBitMap, i, GET_BIT1(pColData->pBitMap, j));
SET_BIT1(pColData->pBitMap, j, bv);
} break;
case HAS_VALUE: {
tColDataSwapValue(pColData, i, j);
} break;
case (HAS_VALUE | HAS_NONE):
case (HAS_VALUE | HAS_NULL): {
uint8_t bv = GET_BIT1(pColData->pBitMap, i);
SET_BIT1(pColData->pBitMap, i, GET_BIT1(pColData->pBitMap, j));
SET_BIT1(pColData->pBitMap, j, bv);
tColDataSwapValue(pColData, i, j);
} break;
case (HAS_VALUE | HAS_NULL | HAS_NONE): {
uint8_t bv = GET_BIT2(pColData->pBitMap, i);
SET_BIT2(pColData->pBitMap, i, GET_BIT2(pColData->pBitMap, j));
SET_BIT2(pColData->pBitMap, j, bv);
tColDataSwapValue(pColData, i, j);
} break;
default:
ASSERT(0);
break;
}
}
static void tColDataSort(SColData *aColData, int32_t nColData) {
if (aColData[0].nVal == 0) return;
// TODO
}
static void tColDataMergeImpl(SColData *pColData, int32_t iStart, int32_t iEnd /* not included */) {
switch (pColData->flag) {
case HAS_NONE:
case HAS_NULL: {
pColData->nVal -= (iEnd - iStart - 1);
} break;
case (HAS_NULL | HAS_NONE): {
if (GET_BIT1(pColData->pBitMap, iStart) == 0) {
for (int32_t i = iStart + 1; i < iEnd; ++i) {
if (GET_BIT1(pColData->pBitMap, i) == 1) {
SET_BIT1(pColData->pBitMap, iStart, 1);
break;
}
}
}
for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) {
SET_BIT1(pColData->pBitMap, j, GET_BIT1(pColData->pBitMap, i));
}
pColData->nVal -= (iEnd - iStart - 1);
uint8_t flag = 0;
for (int32_t i = 0; i < pColData->nVal; ++i) {
uint8_t bv = GET_BIT1(pColData->pBitMap, i);
if (bv == BIT_FLG_NONE) {
flag |= HAS_NONE;
} else if (bv == BIT_FLG_NULL) {
flag |= HAS_NULL;
} else {
ASSERT(0);
}
if (flag == pColData->flag) break;
}
pColData->flag = flag;
} break;
case HAS_VALUE: {
if (IS_VAR_DATA_TYPE(pColData->type)) {
int32_t nDiff = pColData->aOffset[iEnd - 1] - pColData->aOffset[iStart];
memmove(pColData->pData + pColData->aOffset[iStart], pColData->pData + pColData->aOffset[iEnd - 1],
pColData->nData - pColData->aOffset[iEnd - 1]);
pColData->nData -= nDiff;
for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) {
pColData->aOffset[j] = pColData->aOffset[i] - nDiff;
}
} else {
memmove(pColData->pData + TYPE_BYTES[pColData->type] * iStart,
pColData->pData + TYPE_BYTES[pColData->type] * (iEnd - 1),
TYPE_BYTES[pColData->type] * (pColData->nVal - iEnd + 1));
pColData->nData -= (TYPE_BYTES[pColData->type] * (iEnd - iStart - 1));
}
pColData->nVal -= (iEnd - iStart - 1);
} break;
case (HAS_VALUE | HAS_NONE): {
uint8_t bv;
int32_t iv;
for (int32_t i = iEnd - 1; i >= iStart; --i) {
bv = GET_BIT1(pColData->pBitMap, i);
if (bv) {
iv = i;
break;
}
}
if (bv) { // has a value
if (IS_VAR_DATA_TYPE(pColData->type)) {
if (iv != iStart) {
memmove(&pColData->pData[pColData->aOffset[iStart]], &pColData->pData[pColData->aOffset[iv]],
iv < (pColData->nVal - 1) ? pColData->aOffset[iv + 1] - pColData->aOffset[iv]
: pColData->nData - pColData->aOffset[iv]);
}
// TODO
ASSERT(0);
} else {
if (iv != iStart) {
memcpy(&pColData->pData[TYPE_BYTES[pColData->type] * iStart],
&pColData->pData[TYPE_BYTES[pColData->type] * iv], TYPE_BYTES[pColData->type]);
}
memmove(&pColData->pData[TYPE_BYTES[pColData->type] * (iStart + 1)],
&pColData->pData[TYPE_BYTES[pColData->type] * iEnd],
TYPE_BYTES[pColData->type] * (iEnd - iStart - 1));
pColData->nData -= (TYPE_BYTES[pColData->type] * (iEnd - iStart - 1));
}
SET_BIT1(pColData->pBitMap, iStart, 1);
for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) {
SET_BIT1(pColData->pBitMap, j, GET_BIT1(pColData->pBitMap, i));
}
uint8_t flag = HAS_VALUE;
for (int32_t i = 0; i < pColData->nVal - (iEnd - iStart - 1); ++i) {
if (GET_BIT1(pColData->pBitMap, i) == 0) {
flag |= HAS_NONE;
}
if (flag == pColData->flag) break;
}
pColData->flag = flag;
} else { // all NONE
if (IS_VAR_DATA_TYPE(pColData->type)) {
int32_t nDiff = pColData->aOffset[iEnd - 1] - pColData->aOffset[iStart];
memmove(&pColData->pData[pColData->aOffset[iStart]], &pColData->pData[pColData->aOffset[iEnd - 1]],
pColData->nData - pColData->aOffset[iEnd - 1]);
pColData->nData -= nDiff;
for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) {
pColData->aOffset[j] = pColData->aOffset[i] - nDiff;
}
} else {
memmove(pColData->pData + TYPE_BYTES[pColData->type] * (iStart + 1),
pColData->pData + TYPE_BYTES[pColData->type] * iEnd,
TYPE_BYTES[pColData->type] * (pColData->nVal - iEnd + 1));
pColData->nData -= (TYPE_BYTES[pColData->type] * (iEnd - iStart - 1));
}
for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) {
SET_BIT1(pColData->pBitMap, j, GET_BIT1(pColData->pBitMap, i));
}
}
pColData->nVal -= (iEnd - iStart - 1);
} break;
case (HAS_VALUE | HAS_NULL): {
if (IS_VAR_DATA_TYPE(pColData->type)) {
int32_t nDiff = pColData->aOffset[iEnd - 1] - pColData->aOffset[iStart];
memmove(pColData->pData + pColData->aOffset[iStart], pColData->pData + pColData->aOffset[iEnd - 1],
pColData->nData - pColData->aOffset[iEnd - 1]);
pColData->nData -= nDiff;
for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) {
pColData->aOffset[j] = pColData->aOffset[i] - nDiff;
}
} else {
memmove(pColData->pData + TYPE_BYTES[pColData->type] * iStart,
pColData->pData + TYPE_BYTES[pColData->type] * (iEnd - 1),
TYPE_BYTES[pColData->type] * (pColData->nVal - iEnd + 1));
pColData->nData -= (TYPE_BYTES[pColData->type] * (iEnd - iStart - 1));
}
for (int32_t i = iEnd - 1, j = iStart; i < pColData->nVal; ++i, ++j) {
SET_BIT1(pColData->pBitMap, j, GET_BIT1(pColData->pBitMap, i));
}
pColData->nVal -= (iEnd - iStart - 1);
uint8_t flag = 0;
for (int32_t i = 0; i < pColData->nVal; ++i) {
if (GET_BIT1(pColData->pBitMap, i)) {
flag |= HAS_VALUE;
} else {
flag |= HAS_NULL;
}
if (flag == pColData->flag) break;
}
pColData->flag = flag;
} break;
case (HAS_VALUE | HAS_NULL | HAS_NONE): {
uint8_t bv;
int32_t iv;
for (int32_t i = iEnd - 1; i >= iStart; --i) {
bv = GET_BIT2(pColData->pBitMap, i);
if (bv) {
iv = i;
break;
}
}
if (bv) {
// TODO
ASSERT(0);
} else { // ALL NONE
if (IS_VAR_DATA_TYPE(pColData->type)) {
// TODO
ASSERT(0);
} else {
memmove(pColData->pData + TYPE_BYTES[pColData->type] * (iStart + 1),
pColData->pData + TYPE_BYTES[pColData->type] * iEnd,
TYPE_BYTES[pColData->type] * (pColData->nVal - iEnd));
pColData->nData -= (TYPE_BYTES[pColData->type] * (iEnd - iStart - 1));
}
for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) {
SET_BIT2(pColData->pBitMap, j, GET_BIT2(pColData->pBitMap, i));
}
}
pColData->nVal -= (iEnd - iStart - 1);
} break;
default:
ASSERT(0);
break;
}
}
static void tColDataMerge(SColData *aColData, int32_t nColData) {
int32_t iStart = 0;
for (;;) {
if (iStart >= aColData[0].nVal - 1) break;
int32_t iEnd = iStart + 1;
while (iEnd < aColData[0].nVal) {
if (((TSKEY *)aColData[0].pData)[iEnd] != ((TSKEY *)aColData[0].pData)[iStart]) break;
iEnd++;
}
if (iEnd - iStart > 1) {
for (int32_t i = 0; i < nColData; i++) {
tColDataMergeImpl(&aColData[i], iStart, iEnd);
}
}
iStart++;
}
}
void tColDataSortMerge(SArray *colDataArr) { void tColDataSortMerge(SArray *colDataArr) {
int32_t nColData = TARRAY_SIZE(colDataArr); int32_t nColData = TARRAY_SIZE(colDataArr);
SColData *aColData = (SColData *)TARRAY_DATA(colDataArr); SColData *aColData = (SColData *)TARRAY_DATA(colDataArr);
...@@ -2160,14 +2459,12 @@ void tColDataSortMerge(SArray *colDataArr) { ...@@ -2160,14 +2459,12 @@ void tColDataSortMerge(SArray *colDataArr) {
// sort ------- // sort -------
if (doSort) { if (doSort) {
ASSERT(0); tColDataSort(aColData, nColData);
// todo
} }
// merge ------- // merge -------
if (doMerge) { if (doMerge) {
ASSERT(0); tColDataMerge(aColData, nColData);
// todo
} }
_exit: _exit:
......
...@@ -857,6 +857,7 @@ static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, ...@@ -857,6 +857,7 @@ static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock,
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
#if 1 #if 1
int32_t code = 0; int32_t code = 0;
terrno = 0;
SSubmitReq2 *pSubmitReq = &(SSubmitReq2){0}; SSubmitReq2 *pSubmitReq = &(SSubmitReq2){0};
SSubmitRsp2 *pSubmitRsp = &(SSubmitRsp2){0}; SSubmitRsp2 *pSubmitRsp = &(SSubmitRsp2){0};
...@@ -892,6 +893,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq ...@@ -892,6 +893,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
code = metaGetInfo(pVnode->pMeta, pSubmitTbData->uid, &info, NULL); code = metaGetInfo(pVnode->pMeta, pSubmitTbData->uid, &info, NULL);
if (code) { if (code) {
code = TSDB_CODE_TDB_TABLE_NOT_EXIST; code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
vWarn("vgId:%d, table uid:%" PRId64 " not exists", TD_VID(pVnode), pSubmitTbData->uid);
goto _exit; goto _exit;
} }
...@@ -1018,6 +1020,8 @@ _exit: ...@@ -1018,6 +1020,8 @@ _exit:
tDestroySSubmitReq2(pSubmitReq, TSDB_MSG_FLG_DECODE); tDestroySSubmitReq2(pSubmitReq, TSDB_MSG_FLG_DECODE);
tDestroySSubmitRsp2(pSubmitRsp, TSDB_MSG_FLG_ENCODE); tDestroySSubmitRsp2(pSubmitRsp, TSDB_MSG_FLG_ENCODE);
if (code) terrno = code;
return code; return code;
#else #else
......
...@@ -467,7 +467,7 @@ int32_t qCloneStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, bool rese ...@@ -467,7 +467,7 @@ int32_t qCloneStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, bool rese
return code; return code;
} }
int32_t qRebuildStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, uint64_t uid, int32_t vgId, bool rebuildCreateTb) { int32_t qRebuildStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, uint64_t uid, uint64_t suid, int32_t vgId, bool rebuildCreateTb) {
int32_t code = qCloneStmtDataBlock(pDst, pSrc, false); int32_t code = qCloneStmtDataBlock(pDst, pSrc, false);
if (code) { if (code) {
return code; return code;
...@@ -477,8 +477,12 @@ int32_t qRebuildStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, uint64_ ...@@ -477,8 +477,12 @@ int32_t qRebuildStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, uint64_
if (pBlock->pMeta) { if (pBlock->pMeta) {
pBlock->pMeta->uid = uid; pBlock->pMeta->uid = uid;
pBlock->pMeta->vgId = vgId; pBlock->pMeta->vgId = vgId;
pBlock->pMeta->suid = suid;
} }
pBlock->pData->suid = suid;
pBlock->pData->uid = uid;
if (rebuildCreateTb && NULL == pBlock->pData->pCreateTbReq) { if (rebuildCreateTb && NULL == pBlock->pData->pCreateTbReq) {
pBlock->pData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq)); pBlock->pData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
if (NULL == pBlock->pData->pCreateTbReq) { if (NULL == pBlock->pData->pCreateTbReq) {
......
...@@ -1074,6 +1074,7 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat ...@@ -1074,6 +1074,7 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
*pOutput = pTableCxt; *pOutput = pTableCxt;
qDebug("tableDataCxt created, uid:%" PRId64 ", vgId:%d", pTableMeta->uid, pTableMeta->vgId);
} else { } else {
taosMemoryFree(pTableCxt); taosMemoryFree(pTableCxt);
} }
...@@ -1182,6 +1183,8 @@ static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCx ...@@ -1182,6 +1183,8 @@ static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCx
taosArrayPush(pVgCxt->pData->aSubmitTbData, pTableCxt->pData); taosArrayPush(pVgCxt->pData->aSubmitTbData, pTableCxt->pData);
taosMemoryFreeClear(pTableCxt->pData); taosMemoryFreeClear(pTableCxt->pData);
qDebug("add tableDataCxt uid:%" PRId64 " to vgId:%d", pTableCxt->pMeta->uid, pVgCxt->vgId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册