提交 948502be 编写于 作者: X Xiaoyu Wang

enh: replace row format

上级 b96d9485
......@@ -2080,6 +2080,10 @@ int tEncodeSVCreateTbReq(SEncoder* pCoder, const SVCreateTbReq* pReq);
int tDecodeSVCreateTbReq(SDecoder* pCoder, SVCreateTbReq* pReq);
static FORCE_INLINE void tdDestroySVCreateTbReq(SVCreateTbReq* req) {
if (NULL == req) {
return;
}
taosMemoryFreeClear(req->name);
taosMemoryFreeClear(req->comment);
if (req->type == TSDB_CHILD_TABLE) {
......
......@@ -354,33 +354,33 @@ typedef struct SVgDataBlocks {
void* pData; // SSubmitReq + SSubmitBlk + ...
} SVgDataBlocks;
typedef void (*FFreeDataBlockHash)(SHashObj*);
typedef void (*FFreeDataBlockArray)(SArray*);
typedef void (*FFreeTableBlockHash)(SHashObj*);
typedef void (*FFreeVgourpBlockArray)(SArray*);
typedef struct SVnodeModifOpStmt {
ENodeType nodeType;
ENodeType sqlNodeType;
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* pSql; // current sql statement position
int32_t totalRowsNum;
int32_t totalTbNum;
SName targetTableName;
SName usingTableName;
const char* pBoundCols;
struct STableMeta* pTableMeta;
SHashObj* pVgroupsHashObj;
SHashObj* pTableBlockHashObj;
SHashObj* pSubTableHashObj;
SHashObj* pTableNameHashObj;
SHashObj* pDbFNameHashObj;
SArray* pVgDataBlocks;
SVCreateTbReq createTblReq;
TdFilePtr fp;
FFreeDataBlockHash freeHashFunc;
FFreeDataBlockArray freeArrayFunc;
bool usingTableProcessing;
bool fileProcessing;
ENodeType nodeType;
ENodeType sqlNodeType;
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* pSql; // current sql statement position
int32_t totalRowsNum;
int32_t totalTbNum;
SName targetTableName;
SName usingTableName;
const char* pBoundCols;
struct STableMeta* pTableMeta;
SHashObj* pVgroupsHashObj;
SHashObj* pTableBlockHashObj; // SHashObj<tuid, STableDataCxt*>
SHashObj* pSubTableHashObj;
SHashObj* pTableNameHashObj;
SHashObj* pDbFNameHashObj;
SArray* pVgDataBlocks; // SArray<SVgroupDataCxt*>
SVCreateTbReq* pCreateTblReq;
TdFilePtr fp;
FFreeTableBlockHash freeHashFunc;
FFreeVgourpBlockArray freeArrayFunc;
bool usingTableProcessing;
bool fileProcessing;
} SVnodeModifOpStmt;
typedef struct SExplainOptions {
......
......@@ -807,7 +807,8 @@ void nodesDestroyNode(SNode* pNode) {
if (pStmt->freeArrayFunc) {
pStmt->freeArrayFunc(pStmt->pVgDataBlocks);
}
tdDestroySVCreateTbReq(&pStmt->createTblReq);
tdDestroySVCreateTbReq(pStmt->pCreateTblReq);
taosMemoryFreeClear(pStmt->pCreateTblReq);
taosCloseFile(&pStmt->fp);
break;
}
......
......@@ -20,8 +20,6 @@
struct SToken;
#define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED)
#define NEXT_TOKEN(pSql, sToken) \
do { \
int32_t index = 0; \
......@@ -37,6 +35,8 @@ struct SToken;
} \
} while (0)
#define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED)
typedef enum EOrderStatus {
ORDER_STATUS_UNKNOWN = 0,
ORDER_STATUS_ORDERED = 1,
......@@ -141,4 +141,34 @@ int32_t insCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start);
int32_t insBuildOutput(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks);
void insDestroyDataBlock(STableDataBlocks *pDataBlock);
typedef struct SBoundColInfo {
int32_t *pColIndex; // bound index => schema index
int32_t numOfCols;
int32_t numOfBound;
} SBoundColInfo;
typedef struct STableDataCxt {
STableMeta *pMeta;
STSchema *pSchema;
SBoundColInfo boundColsInfo;
SArray *pValues;
SVCreateTbReq *pCreateTblReq;
SSubmitTbData data;
} STableDataCxt;
typedef struct SVgroupDataCxt {
int32_t vgId;
SSubmitReq2 data;
} SVgroupDataCxt;
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo);
int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta *pTableMeta,
SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt);
int32_t insMergeTableDataCxt(SHashObj *pTableHash, SArray **pVgDataBlocks);
int32_t insBuildVgDataBlocks(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks);
void insDestroyTableDataCxtHashMap(SHashObj *pTableCxtHash);
void insDestroyVgroupDataCxt(SVgroupDataCxt *pVgCxt);
void insDestroyVgroupDataCxtList(SArray *pVgCxtList);
void insDestroyVgroupDataCxtHashMap(SHashObj *pVgCxtHash);
#endif // TDENGINE_PAR_INSERT_UTIL_H
......@@ -955,3 +955,277 @@ int32_t insBuildOutput(SHashObj* pVgroupsHashObj, SArray* pVgDataBlocks, SArray*
}
return TSDB_CODE_SUCCESS;
}
static void initBoundCols(int32_t ncols, int32_t* pBoundCols) {
for (int32_t i = 0; i < ncols; ++i) {
pBoundCols[i] = i;
}
}
static void initColValues(STableMeta* pTableMeta, SArray* pValues) {
SSchema* pSchemas = getTableColumnSchema(pTableMeta);
for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
SColVal val = COL_VAL_NONE(pSchemas[i].colId, pSchemas[i].type);
taosArrayPush(pValues, &val);
}
}
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) {
pInfo->numOfCols = numOfBound;
pInfo->numOfBound = numOfBound;
pInfo->pColIndex = taosMemoryCalloc(numOfBound, sizeof(int32_t));
if (NULL == pInfo->pColIndex) {
return TSDB_CODE_OUT_OF_MEMORY;
}
initBoundCols(numOfBound, pInfo->pColIndex);
return TSDB_CODE_SUCCESS;
}
static void destroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); }
static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput) {
STableDataCxt* pTableCxt = taosMemoryCalloc(1, sizeof(STableDataCxt));
if (NULL == pTableCxt) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = TSDB_CODE_SUCCESS;
pTableCxt->pMeta = tableMetaDup(pTableMeta);
if (NULL == pTableCxt->pMeta) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
if (TSDB_CODE_SUCCESS == code) {
pTableCxt->pSchema =
tBuildTSchema(getTableColumnSchema(pTableMeta), pTableMeta->tableInfo.numOfColumns, pTableMeta->sversion);
if (NULL == pTableCxt->pSchema) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
code = insInitBoundColsInfo(pTableMeta->tableInfo.numOfColumns, &pTableCxt->boundColsInfo);
}
if (TSDB_CODE_SUCCESS == code) {
pTableCxt->pValues = taosArrayInit(pTableMeta->tableInfo.numOfColumns, sizeof(SColVal));
if (NULL == pTableCxt->pValues) {
code = TSDB_CODE_OUT_OF_MEMORY;
} else {
initColValues(pTableMeta, pTableCxt->pValues);
}
}
if (TSDB_CODE_SUCCESS == code) {
pTableCxt->data.aRowP = taosArrayInit(128, POINTER_BYTES);
if (NULL == pTableCxt->data.aRowP) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
pTableCxt->pCreateTblReq = *pCreateTbReq;
*pCreateTbReq = NULL;
*pOutput = pTableCxt;
} else {
taosMemoryFree(pTableCxt);
}
return code;
}
int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* pTableMeta,
SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt) {
*pTableCxt = taosHashGet(pHash, id, idLen);
if (NULL != *pTableCxt) {
return TSDB_CODE_SUCCESS;
}
int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt);
if (TSDB_CODE_SUCCESS == code) {
code = taosHashPut(pHash, id, idLen, pTableCxt, POINTER_BYTES);
}
return code;
}
void insDestroyTableDataCxt(STableDataCxt* pTableCxt) {
if (NULL == pTableCxt) {
return;
}
taosMemoryFreeClear(pTableCxt->pMeta);
tDestroyTSchema(pTableCxt->pSchema);
destroyBoundColInfo(&pTableCxt->boundColsInfo);
taosArrayDestroyEx(pTableCxt->pValues, NULL /*todo*/);
tdDestroySVCreateTbReq(pTableCxt->pCreateTblReq);
taosMemoryFreeClear(pTableCxt->pCreateTblReq);
// todo free SSubmitTbData
}
void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) {
if (NULL == pVgCxt) {
return;
}
tDestroySSubmitReq2(&pVgCxt->data);
}
void insDestroyVgroupDataCxtList(SArray* pVgCxtList) {
if (NULL == pVgCxtList) {
return;
}
size_t size = taosArrayGetSize(pVgCxtList);
for (int32_t i = 0; i < size; i++) {
void* p = taosArrayGetP(pVgCxtList, i);
insDestroyVgroupDataCxt(p);
}
taosArrayDestroy(pVgCxtList);
}
void insDestroyVgroupDataCxtHashMap(SHashObj* pVgCxtHash) {
if (NULL == pVgCxtHash) {
return;
}
void** p = taosHashIterate(pVgCxtHash, NULL);
while (p) {
insDestroyVgroupDataCxt(*(SVgroupDataCxt**)p);
p = taosHashIterate(pVgCxtHash, p);
}
taosHashCleanup(pVgCxtHash);
}
void insDestroyTableDataCxtHashMap(SHashObj* pTableCxtHash) {
if (NULL == pTableCxtHash) {
return;
}
void** p = taosHashIterate(pTableCxtHash, NULL);
while (p) {
insDestroyTableDataCxt(*(STableDataCxt**)p);
p = taosHashIterate(pTableCxtHash, p);
}
taosHashCleanup(pTableCxtHash);
}
static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt) {
if (NULL != pTableCxt->pCreateTblReq) {
if (NULL == pVgCxt->data.aCreateTbReq) {
pVgCxt->data.aCreateTbReq = taosArrayInit(128, sizeof(SVCreateTbReq));
if (NULL == pVgCxt->data.aCreateTbReq) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
taosArrayPush(pVgCxt->data.aCreateTbReq, pTableCxt->pCreateTblReq);
}
if (NULL == pVgCxt->data.aSubmitTbData) {
pVgCxt->data.aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
if (NULL == pVgCxt->data.aSubmitTbData) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
taosArrayPush(pVgCxt->data.aSubmitTbData, &pTableCxt->data);
return TSDB_CODE_SUCCESS;
}
int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) {
SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
SArray* pVgroupList = taosArrayInit(8, POINTER_BYTES);
if (NULL == pVgroupHash || NULL == pVgroupList) {
taosHashCleanup(pVgroupHash);
taosArrayDestroy(pVgroupList);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int32_t code = TSDB_CODE_SUCCESS;
void* p = taosHashIterate(pTableHash, NULL);
while (TSDB_CODE_SUCCESS == code && NULL != p) {
STableDataCxt* pTableCxt = *(STableDataCxt**)p;
code = tRowMergeSort(pTableCxt->data.aRowP, pTableCxt->pSchema);
if (TSDB_CODE_SUCCESS == code) {
int32_t vgId = pTableCxt->pMeta->vgId;
SVgroupDataCxt* pVgCxt = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
if (NULL == pVgCxt) {
SVgroupDataCxt vgCxt = {.vgId = vgId};
code = fillVgroupDataCxt(pTableCxt, &vgCxt);
if (TSDB_CODE_SUCCESS == code) {
code = taosHashPut(pVgroupHash, &vgId, sizeof(vgId), &vgCxt, sizeof(vgCxt));
}
} else {
code = fillVgroupDataCxt(pTableCxt, pVgCxt);
}
}
if (TSDB_CODE_SUCCESS == code) {
p = taosHashIterate(pTableHash, p);
}
}
taosHashCleanup(pVgroupHash);
if (TSDB_CODE_SUCCESS == code) {
*pVgDataBlocks = pVgroupList;
} else {
taosArrayDestroy(pVgroupList);
}
return code;
}
static int32_t buildSubmitReq(SSubmitReq2* pReq, void** pData, uint32_t* pLen) {
int32_t code = TSDB_CODE_SUCCESS;
tEncodeSize(tEncodeSSubmitReq2, pReq, *pLen, code);
if (TSDB_CODE_SUCCESS == code) {
SEncoder encoder;
*pData = taosMemoryMalloc(*pLen);
if (NULL == *pData) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
tEncoderInit(&encoder, *pData, *pLen);
code = tEncodeSSubmitReq2(&encoder, pReq);
tEncoderClear(&encoder);
}
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFreeClear(*pData);
*pLen = 0;
}
return code;
}
int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, SArray** pVgDataBlocks) {
size_t numOfVg = taosArrayGetSize(pVgDataCxtList);
SArray* pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
if (NULL == pDataBlocks) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int32_t code = TSDB_CODE_SUCCESS;
for (size_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfVg; ++i) {
SVgroupDataCxt* src = taosArrayGetP(pVgDataCxtList, i);
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
if (NULL == dst) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
}
if (TSDB_CODE_SUCCESS == code) {
dst->numOfTables = taosArrayGetSize(src->data.aSubmitTbData);
code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildSubmitReq(&src->data, &dst->pData, &dst->size);
}
if (TSDB_CODE_SUCCESS == code) {
code = (NULL == taosArrayPush(pDataBlocks, &dst) ? TSDB_CODE_TSC_OUT_OF_MEMORY : TSDB_CODE_SUCCESS);
}
}
if (TSDB_CODE_SUCCESS == code) {
*pVgDataBlocks = pDataBlocks;
} else {
// todo
}
return code;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册