未验证 提交 f7109787 编写于 作者: H Hongze Cheng 提交者: GitHub

Merge pull request #6928 from taosdata/enhance/TD-5395

Enhance/td 5395
...@@ -108,6 +108,7 @@ int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *le ...@@ -108,6 +108,7 @@ int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *le
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks); int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta); void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta);
void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf);
int tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo); int tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo);
void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo); void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo);
......
...@@ -159,6 +159,7 @@ typedef struct SInsertStatementParam { ...@@ -159,6 +159,7 @@ typedef struct SInsertStatementParam {
SHashObj *pTableBlockHashList; // data block for each table SHashObj *pTableBlockHashList; // data block for each table
SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
int8_t schemaAttached; // denote if submit block is built with table schema or not int8_t schemaAttached; // denote if submit block is built with table schema or not
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
STagData tagData; // NOTE: pTagData->data is used as a variant length array STagData tagData; // NOTE: pTagData->data is used as a variant length array
int32_t batchSize; // for parameter ('?') binding and batch processing int32_t batchSize; // for parameter ('?') binding and batch processing
...@@ -170,6 +171,14 @@ typedef struct SInsertStatementParam { ...@@ -170,6 +171,14 @@ typedef struct SInsertStatementParam {
char *sql; // current sql statement position char *sql; // current sql statement position
} SInsertStatementParam; } SInsertStatementParam;
typedef enum {
PAYLOAD_TYPE_KV = 0,
PAYLOAD_TYPE_RAW = 1,
} EPayloadType;
#define IS_RAW_PAYLOAD(t) \
(((int)(t)) == PAYLOAD_TYPE_RAW) // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
// TODO extract sql parser supporter // TODO extract sql parser supporter
typedef struct { typedef struct {
int command; int command;
......
...@@ -425,7 +425,7 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pay ...@@ -425,7 +425,7 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pay
case TSDB_DATA_TYPE_BOOL: { // bool case TSDB_DATA_TYPE_BOOL: { // bool
if (isNullStr(pToken)) { if (isNullStr(pToken)) {
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
tdGetNullVal(TSDB_DATA_TYPE_BOOL), TYPE_BYTES[TSDB_DATA_TYPE_BOOL], tOffset); getNullValue(TSDB_DATA_TYPE_BOOL), TYPE_BYTES[TSDB_DATA_TYPE_BOOL], tOffset);
} else { } else {
if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) { if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
if (strncmp(pToken->z, "true", pToken->n) == 0) { if (strncmp(pToken->z, "true", pToken->n) == 0) {
...@@ -459,7 +459,7 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pay ...@@ -459,7 +459,7 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pay
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:
if (isNullStr(pToken)) { if (isNullStr(pToken)) {
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
tdGetNullVal(TSDB_DATA_TYPE_TINYINT), TYPE_BYTES[TSDB_DATA_TYPE_TINYINT], tOffset); getNullValue(TSDB_DATA_TYPE_TINYINT), TYPE_BYTES[TSDB_DATA_TYPE_TINYINT], tOffset);
} else { } else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
...@@ -479,7 +479,7 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pay ...@@ -479,7 +479,7 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pay
case TSDB_DATA_TYPE_UTINYINT: case TSDB_DATA_TYPE_UTINYINT:
if (isNullStr(pToken)) { if (isNullStr(pToken)) {
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
tdGetNullVal(TSDB_DATA_TYPE_UTINYINT), TYPE_BYTES[TSDB_DATA_TYPE_UTINYINT], tOffset); getNullValue(TSDB_DATA_TYPE_UTINYINT), TYPE_BYTES[TSDB_DATA_TYPE_UTINYINT], tOffset);
} else { } else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
...@@ -499,7 +499,7 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pay ...@@ -499,7 +499,7 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pay
case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_SMALLINT:
if (isNullStr(pToken)) { if (isNullStr(pToken)) {
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
tdGetNullVal(TSDB_DATA_TYPE_SMALLINT), TYPE_BYTES[TSDB_DATA_TYPE_SMALLINT], tOffset); getNullValue(TSDB_DATA_TYPE_SMALLINT), TYPE_BYTES[TSDB_DATA_TYPE_SMALLINT], tOffset);
} else { } else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
...@@ -520,7 +520,7 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pay ...@@ -520,7 +520,7 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pay
if (isNullStr(pToken)) { if (isNullStr(pToken)) {
*sizeAppend = *sizeAppend =
tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
tdGetNullVal(TSDB_DATA_TYPE_USMALLINT), TYPE_BYTES[TSDB_DATA_TYPE_USMALLINT], tOffset); getNullValue(TSDB_DATA_TYPE_USMALLINT), TYPE_BYTES[TSDB_DATA_TYPE_USMALLINT], tOffset);
} else { } else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
...@@ -540,7 +540,7 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pay ...@@ -540,7 +540,7 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pay
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
if (isNullStr(pToken)) { if (isNullStr(pToken)) {
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
tdGetNullVal(TSDB_DATA_TYPE_INT), TYPE_BYTES[TSDB_DATA_TYPE_INT], tOffset); getNullValue(TSDB_DATA_TYPE_INT), TYPE_BYTES[TSDB_DATA_TYPE_INT], tOffset);
} else { } else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
...@@ -560,7 +560,7 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pay ...@@ -560,7 +560,7 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pay
case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_UINT:
if (isNullStr(pToken)) { if (isNullStr(pToken)) {
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
tdGetNullVal(TSDB_DATA_TYPE_UINT), TYPE_BYTES[TSDB_DATA_TYPE_UINT], tOffset); getNullValue(TSDB_DATA_TYPE_UINT), TYPE_BYTES[TSDB_DATA_TYPE_UINT], tOffset);
} else { } else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
...@@ -580,7 +580,7 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pay ...@@ -580,7 +580,7 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pay
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
if (isNullStr(pToken)) { if (isNullStr(pToken)) {
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
tdGetNullVal(TSDB_DATA_TYPE_BIGINT), TYPE_BYTES[TSDB_DATA_TYPE_BIGINT], tOffset); getNullValue(TSDB_DATA_TYPE_BIGINT), TYPE_BYTES[TSDB_DATA_TYPE_BIGINT], tOffset);
} else { } else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
...@@ -598,7 +598,7 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pay ...@@ -598,7 +598,7 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pay
case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_UBIGINT:
if (isNullStr(pToken)) { if (isNullStr(pToken)) {
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
tdGetNullVal(TSDB_DATA_TYPE_UBIGINT), TYPE_BYTES[TSDB_DATA_TYPE_UBIGINT], tOffset); getNullValue(TSDB_DATA_TYPE_UBIGINT), TYPE_BYTES[TSDB_DATA_TYPE_UBIGINT], tOffset);
} else { } else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
...@@ -639,7 +639,7 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pay ...@@ -639,7 +639,7 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pay
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
if (isNullStr(pToken)) { if (isNullStr(pToken)) {
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
tdGetNullVal(TSDB_DATA_TYPE_DOUBLE), TYPE_BYTES[TSDB_DATA_TYPE_DOUBLE], tOffset); getNullValue(TSDB_DATA_TYPE_DOUBLE), TYPE_BYTES[TSDB_DATA_TYPE_DOUBLE], tOffset);
} else { } else {
double dv; double dv;
if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) { if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
...@@ -661,7 +661,7 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pay ...@@ -661,7 +661,7 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pay
if (pToken->type == TK_NULL) { if (pToken->type == TK_NULL) {
payloadColSetId(payload, pSchema->colId); payloadColSetId(payload, pSchema->colId);
payloadColSetType(payload, pSchema->type); payloadColSetType(payload, pSchema->type);
memcpy(POINTER_SHIFT(payloadStart, tOffset), tdGetNullVal(TSDB_DATA_TYPE_BINARY), VARSTR_HEADER_SIZE + CHAR_BYTES); memcpy(POINTER_SHIFT(payloadStart, tOffset), getNullValue(TSDB_DATA_TYPE_BINARY), VARSTR_HEADER_SIZE + CHAR_BYTES);
*sizeAppend = (TDRowLenT)(VARSTR_HEADER_SIZE + CHAR_BYTES); *sizeAppend = (TDRowLenT)(VARSTR_HEADER_SIZE + CHAR_BYTES);
} else { // too long values will return invalid sql, not be truncated automatically } else { // too long values will return invalid sql, not be truncated automatically
if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor
...@@ -684,7 +684,7 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pay ...@@ -684,7 +684,7 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pay
if (pToken->type == TK_NULL) { if (pToken->type == TK_NULL) {
payloadColSetId(payload, pSchema->colId); payloadColSetId(payload, pSchema->colId);
payloadColSetType(payload, pSchema->type); payloadColSetType(payload, pSchema->type);
memcpy(POINTER_SHIFT(payloadStart,tOffset), tdGetNullVal(TSDB_DATA_TYPE_NCHAR), VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE); memcpy(POINTER_SHIFT(payloadStart,tOffset), getNullValue(TSDB_DATA_TYPE_NCHAR), VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
*sizeAppend = (TDRowLenT)(VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE); *sizeAppend = (TDRowLenT)(VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
} else { } else {
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long' // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
...@@ -716,7 +716,7 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pay ...@@ -716,7 +716,7 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pay
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP]); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP]);
} else { } else {
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
tdGetNullVal(TSDB_DATA_TYPE_TIMESTAMP), getNullValue(TSDB_DATA_TYPE_TIMESTAMP),
TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], tOffset); TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], tOffset);
} }
} else { } else {
...@@ -1069,9 +1069,8 @@ static int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, ...@@ -1069,9 +1069,8 @@ static int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta,
} }
} }
#if 0
// data block is disordered, sort it in ascending order // data block is disordered, sort it in ascending order
static void tscSortRemoveDataBlockDupRowsOld(STableDataBlocks *dataBuf) { void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks *dataBuf) {
SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData; SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData;
// size is less than the total size, since duplicated rows may be removed yet. // size is less than the total size, since duplicated rows may be removed yet.
...@@ -1114,7 +1113,6 @@ static void tscSortRemoveDataBlockDupRowsOld(STableDataBlocks *dataBuf) { ...@@ -1114,7 +1113,6 @@ static void tscSortRemoveDataBlockDupRowsOld(STableDataBlocks *dataBuf) {
dataBuf->prevTS = INT64_MIN; dataBuf->prevTS = INT64_MIN;
} }
#endif
// data block is disordered, sort it in ascending order // data block is disordered, sort it in ascending order
int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlkKeyInfo) { int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlkKeyInfo) {
......
...@@ -291,7 +291,6 @@ static char* normalStmtBuildSql(STscStmt* stmt) { ...@@ -291,7 +291,6 @@ static char* normalStmtBuildSql(STscStmt* stmt) {
return taosStringBuilderGetResult(&sb, NULL); return taosStringBuilderGetResult(&sb, NULL);
} }
#if 0
static int fillColumnsNull(STableDataBlocks* pBlock, int32_t rowNum) { static int fillColumnsNull(STableDataBlocks* pBlock, int32_t rowNum) {
SParsedDataColInfo* spd = &pBlock->boundColumnInfo; SParsedDataColInfo* spd = &pBlock->boundColumnInfo;
int32_t offset = 0; int32_t offset = 0;
...@@ -319,129 +318,8 @@ static int fillColumnsNull(STableDataBlocks* pBlock, int32_t rowNum) { ...@@ -319,129 +318,8 @@ static int fillColumnsNull(STableDataBlocks* pBlock, int32_t rowNum) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#endif
/**
* input:
* - schema:
* - payload:
* - spd:
* output:
* - pBlock with data block replaced by K-V format
*/
static int refactorPayload(STableDataBlocks* pBlock, int32_t rowNum) {
SParsedDataColInfo* spd = &pBlock->boundColumnInfo;
SSchema* schema = (SSchema*)pBlock->pTableMeta->schema;
SMemRowHelper* pHelper = &pBlock->rowHelper;
STableMeta* pTableMeta = pBlock->pTableMeta;
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
int code = TSDB_CODE_SUCCESS;
int32_t extendedRowSize = getExtendedRowSize(&tinfo);
TDRowTLenT destPayloadSize = sizeof(SSubmitBlk);
ASSERT(pHelper->allNullLen >= 8);
TDRowTLenT destAllocSize = sizeof(SSubmitBlk) + rowNum * extendedRowSize;
SSubmitBlk* pDestBlock = tcalloc(destAllocSize, 1);
if (pDestBlock == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
memcpy(pDestBlock, pBlock->pData, sizeof(SSubmitBlk));
char* destPayload = (char*)pDestBlock + sizeof(SSubmitBlk);
char* srcPayload = (char*)pBlock->pData + sizeof(SSubmitBlk);
for (int n = 0; n < rowNum; ++n) {
payloadSetNCols(destPayload, spd->numOfBound);
TDRowTLenT dataRowLen = pHelper->allNullLen;
TDRowTLenT kvRowLen = TD_MEM_ROW_KV_VER_SIZE + sizeof(SColIdx) * spd->numOfBound;
TDRowTLenT payloadValOffset = payloadValuesOffset(destPayload); // rely on payloadNCols
TDRowLenT colValOffset = 0;
char* kvPrimaryKeyStart = destPayload + PAYLOAD_HEADER_LEN; // primaryKey in 1st column tuple
char* kvStart = kvPrimaryKeyStart + PAYLOAD_COL_HEAD_LEN; // the column tuple behind the primaryKey
for (int32_t i = 0; i < spd->numOfBound; ++i) {
int32_t colIndex = spd->boundedColumns[i];
ASSERT(spd->cols[colIndex].hasVal);
char* start = srcPayload + spd->cols[colIndex].offset;
SSchema* pSchema = &schema[colIndex]; // get colId here
bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX);
// the primary key locates in 1st column
if (!IS_DATA_COL_ORDERED(spd->orderStatus)) {
ASSERT(spd->colIdxInfo != NULL);
if (!isPrimaryKey) {
kvStart = POINTER_SHIFT(kvPrimaryKeyStart, spd->colIdxInfo[i].finalIdx * PAYLOAD_COL_HEAD_LEN);
} else {
ASSERT(spd->colIdxInfo[i].finalIdx == 0);
}
}
if (isPrimaryKey) {
payloadColSetId(kvPrimaryKeyStart, pSchema->colId);
payloadColSetType(kvPrimaryKeyStart, pSchema->type);
payloadColSetOffset(kvPrimaryKeyStart, colValOffset);
memcpy(POINTER_SHIFT(destPayload, payloadValOffset + colValOffset), start, TYPE_BYTES[pSchema->type]);
colValOffset += TYPE_BYTES[pSchema->type];
kvRowLen += TYPE_BYTES[pSchema->type];
} else {
payloadColSetId(kvStart, pSchema->colId);
payloadColSetType(kvStart, pSchema->type);
payloadColSetOffset(kvStart, colValOffset);
if (IS_VAR_DATA_TYPE(pSchema->type)) {
varDataCopy(POINTER_SHIFT(destPayload, payloadValOffset + colValOffset), start);
colValOffset += varDataTLen(start);
kvRowLen += varDataTLen(start);
if (pSchema->type == TSDB_DATA_TYPE_BINARY) {
dataRowLen += (varDataLen(start) - CHAR_BYTES);
} else if (pSchema->type == TSDB_DATA_TYPE_NCHAR) {
dataRowLen += (varDataLen(start) - TSDB_NCHAR_SIZE);
} else {
ASSERT(0);
}
} else {
memcpy(POINTER_SHIFT(destPayload, payloadValOffset + colValOffset), start, TYPE_BYTES[pSchema->type]);
colValOffset += TYPE_BYTES[pSchema->type];
kvRowLen += TYPE_BYTES[pSchema->type];
}
if (IS_DATA_COL_ORDERED(spd->orderStatus)) {
kvStart += PAYLOAD_COL_HEAD_LEN; // move to next column
}
}
} // end of column
if (kvRowLen < dataRowLen) {
payloadSetType(destPayload, SMEM_ROW_KV);
} else {
payloadSetType(destPayload, SMEM_ROW_DATA);
}
ASSERT(colValOffset <= TSDB_MAX_BYTES_PER_ROW);
TDRowTLenT len = payloadValOffset + colValOffset;
payloadSetTLen(destPayload, len);
// next loop
srcPayload += pBlock->rowSize;
destPayload += len;
destPayloadSize += len;
} // end of row
ASSERT(destPayloadSize <= destAllocSize);
tfree(pBlock->pData);
pBlock->pData = (char*)pDestBlock;
pBlock->nAllocSize = destAllocSize;
pBlock->size = destPayloadSize;
return code;
}
#if 0
int32_t fillTablesColumnsNull(SSqlObj* pSql) { int32_t fillTablesColumnsNull(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
...@@ -464,98 +342,17 @@ int32_t fillTablesColumnsNull(SSqlObj* pSql) { ...@@ -464,98 +342,17 @@ int32_t fillTablesColumnsNull(SSqlObj* pSql) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#endif
/**
* check and sort
*/
static int initPayloadEnv(STableDataBlocks* pBlock, int32_t rowNum) {
SParsedDataColInfo* spd = &pBlock->boundColumnInfo;
if (spd->orderStatus != ORDER_STATUS_UNKNOWN) {
return TSDB_CODE_SUCCESS;
}
bool isOrdered = true;
int32_t lastColIdx = -1;
for (int32_t i = 0; i < spd->numOfBound; ++i) {
ASSERT(spd->cols[i].hasVal);
int32_t colIdx = spd->boundedColumns[i];
if (isOrdered) {
if (lastColIdx > colIdx) {
isOrdered = false;
break;
} else {
lastColIdx = colIdx;
}
}
}
spd->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;
if (isOrdered) {
spd->colIdxInfo = NULL;
} else {
spd->colIdxInfo = calloc(spd->numOfBound, sizeof(SBoundIdxInfo));
if (spd->colIdxInfo == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SBoundIdxInfo* pColIdx = spd->colIdxInfo;
for (uint16_t i = 0; i < spd->numOfBound; ++i) {
pColIdx[i].schemaColIdx = (uint16_t)spd->boundedColumns[i];
pColIdx[i].boundIdx = i;
}
qsort(pColIdx, spd->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
for (uint16_t i = 0; i < spd->numOfBound; ++i) {
pColIdx[i].finalIdx = i;
}
qsort(pColIdx, spd->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
}
return TSDB_CODE_SUCCESS;
}
/**
* Refactor the raw payload structure to K-V format as the in tsParseOneRow()
*/
int32_t fillTablesPayload(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
int code = TSDB_CODE_SUCCESS;
STableDataBlocks** p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, NULL);
STableDataBlocks* pOneTableBlock = *p;
while (pOneTableBlock) {
SSubmitBlk* pBlocks = (SSubmitBlk*)pOneTableBlock->pData;
if (pBlocks->numOfRows > 0) { ////////////////////////////////////////////////////////////////////////////////
initSMemRowHelper(&pOneTableBlock->rowHelper, tscGetTableSchema(pOneTableBlock->pTableMeta), // functions for insertion statement preparation
tscGetNumOfColumns(pOneTableBlock->pTableMeta), 0); static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, TAOS_BIND* bind, int32_t colNum) {
if ((code = initPayloadEnv(pOneTableBlock, pBlocks->numOfRows)) != TSDB_CODE_SUCCESS) { if (bind->is_null != NULL && *(bind->is_null)) {
return code; setNull(data + param->offset, param->type, param->bytes);
} return TSDB_CODE_SUCCESS;
if ((code = refactorPayload(pOneTableBlock, pBlocks->numOfRows)) != TSDB_CODE_SUCCESS) {
return code;
};
}
p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, p);
if (p == NULL) {
break;
}
pOneTableBlock = *p;
} }
return code;
}
////////////////////////////////////////////////////////////////////////////////
// functions for insertion statement preparation
static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, TAOS_BIND* bind, int32_t colNum) {
if (bind->is_null != NULL && *(bind->is_null)) {
setNull(data + param->offset, param->type, param->bytes);
return TSDB_CODE_SUCCESS;
}
#if 0 #if 0
if (0) { if (0) {
// allow user bind param data with different type // allow user bind param data with different type
...@@ -1309,12 +1106,9 @@ static int insertStmtExecute(STscStmt* stmt) { ...@@ -1309,12 +1106,9 @@ static int insertStmtExecute(STscStmt* stmt) {
pBlk->uid = pTableMeta->id.uid; pBlk->uid = pTableMeta->id.uid;
pBlk->tid = pTableMeta->id.tid; pBlk->tid = pTableMeta->id.tid;
int code = fillTablesPayload(stmt->pSql); fillTablesColumnsNull(stmt->pSql);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = tscMergeTableDataBlocks(&stmt->pSql->cmd.insertParam, false); int code = tscMergeTableDataBlocks(&stmt->pSql->cmd.insertParam, false);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -1391,7 +1185,7 @@ static int insertBatchStmtExecute(STscStmt* pStmt) { ...@@ -1391,7 +1185,7 @@ static int insertBatchStmtExecute(STscStmt* pStmt) {
return TSDB_CODE_TSC_APP_ERROR; return TSDB_CODE_TSC_APP_ERROR;
} }
fillTablesPayload(pStmt->pSql); fillTablesColumnsNull(pStmt->pSql);
if ((code = tscMergeTableDataBlocks(&pStmt->pSql->cmd.insertParam, false)) != TSDB_CODE_SUCCESS) { if ((code = tscMergeTableDataBlocks(&pStmt->pSql->cmd.insertParam, false)) != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -2012,6 +1806,7 @@ int taos_stmt_execute(TAOS_STMT* stmt) { ...@@ -2012,6 +1806,7 @@ int taos_stmt_execute(TAOS_STMT* stmt) {
pStmt->last = STMT_EXECUTE; pStmt->last = STMT_EXECUTE;
pStmt->pSql->cmd.insertParam.payloadType = PAYLOAD_TYPE_RAW;
if (pStmt->multiTbInsert) { if (pStmt->multiTbInsert) {
ret = insertBatchStmtExecute(pStmt); ret = insertBatchStmtExecute(pStmt);
} else { } else {
......
...@@ -1813,14 +1813,14 @@ static SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) { ...@@ -1813,14 +1813,14 @@ static SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) {
p = payloadNextCol(p); p = payloadNextCol(p);
++i; ++i;
} else { } else {
tdAppendColVal(trow, tdGetNullVal(pSchema[j].type), pSchema[j].type, toffset); tdAppendColVal(trow, getNullValue(pSchema[j].type), pSchema[j].type, toffset);
toffset += TYPE_BYTES[pSchema[j].type]; toffset += TYPE_BYTES[pSchema[j].type];
++j; ++j;
} }
} }
while (j < nCols) { while (j < nCols) {
tdAppendColVal(trow, tdGetNullVal(pSchema[j].type), pSchema[j].type, toffset); tdAppendColVal(trow, getNullValue(pSchema[j].type), pSchema[j].type, toffset);
toffset += TYPE_BYTES[pSchema[j].type]; toffset += TYPE_BYTES[pSchema[j].type];
++j; ++j;
} }
...@@ -1860,7 +1860,8 @@ static SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) { ...@@ -1860,7 +1860,8 @@ static SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) {
} }
// Erase the empty space reserved for binary data // Erase the empty space reserved for binary data
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bool includeSchema, SBlockKeyTuple *blkKeyTuple) { static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SInsertStatementParam* insertParam,
SBlockKeyTuple* blkKeyTuple) {
// TODO: optimize this function, handle the case while binary is not presented // TODO: optimize this function, handle the case while binary is not presented
STableMeta* pTableMeta = pTableDataBlock->pTableMeta; STableMeta* pTableMeta = pTableDataBlock->pTableMeta;
STableComInfo tinfo = tscGetTableInfo(pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMeta);
...@@ -1873,7 +1874,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo ...@@ -1873,7 +1874,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo
int32_t flen = 0; // original total length of row int32_t flen = 0; // original total length of row
// schema needs to be included into the submit data block // schema needs to be included into the submit data block
if (includeSchema) { if (insertParam->schemaAttached) {
int32_t numOfCols = tscGetNumOfColumns(pTableDataBlock->pTableMeta); int32_t numOfCols = tscGetNumOfColumns(pTableDataBlock->pTableMeta);
for(int32_t j = 0; j < numOfCols; ++j) { for(int32_t j = 0; j < numOfCols; ++j) {
STColumn* pCol = (STColumn*) pDataBlock; STColumn* pCol = (STColumn*) pDataBlock;
...@@ -1900,18 +1901,38 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo ...@@ -1900,18 +1901,38 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo
pBlock->dataLen = 0; pBlock->dataLen = 0;
int32_t numOfRows = htons(pBlock->numOfRows); int32_t numOfRows = htons(pBlock->numOfRows);
SMemRowBuilder rowBuilder; if (IS_RAW_PAYLOAD(insertParam->payloadType)) {
rowBuilder.pSchema = pSchema; for (int32_t i = 0; i < numOfRows; ++i) {
rowBuilder.sversion = pTableMeta->sversion; SMemRow memRow = (SMemRow)pDataBlock;
rowBuilder.flen = flen; memRowSetType(memRow, SMEM_ROW_DATA);
rowBuilder.nCols = tinfo.numOfColumns; SDataRow trow = memRowDataBody(memRow);
rowBuilder.pDataBlock = pDataBlock; dataRowSetLen(trow, (uint16_t)(TD_DATA_ROW_HEAD_SIZE + flen));
rowBuilder.pSubmitBlk = pBlock; dataRowSetVersion(trow, pTableMeta->sversion);
rowBuilder.buf = p;
int toffset = 0;
for (int32_t j = 0; j < tinfo.numOfColumns; j++) {
tdAppendColVal(trow, p, pSchema[j].type, toffset);
toffset += TYPE_BYTES[pSchema[j].type];
p += pSchema[j].bytes;
}
pDataBlock = (char*)pDataBlock + memRowTLen(memRow);
pBlock->dataLen += memRowTLen(memRow);
}
} else {
SMemRowBuilder rowBuilder;
rowBuilder.pSchema = pSchema;
rowBuilder.sversion = pTableMeta->sversion;
rowBuilder.flen = flen;
rowBuilder.nCols = tinfo.numOfColumns;
rowBuilder.pDataBlock = pDataBlock;
rowBuilder.pSubmitBlk = pBlock;
rowBuilder.buf = p;
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
rowBuilder.buf = (blkKeyTuple + i)->payloadAddr; rowBuilder.buf = (blkKeyTuple + i)->payloadAddr;
tdGenMemRowFromBuilder(&rowBuilder); tdGenMemRowFromBuilder(&rowBuilder);
}
} }
int32_t len = pBlock->dataLen + pBlock->schemaLen; int32_t len = pBlock->dataLen + pBlock->schemaLen;
...@@ -1959,6 +1980,7 @@ static void extractTableNameList(SInsertStatementParam *pInsertParam, bool freeB ...@@ -1959,6 +1980,7 @@ static void extractTableNameList(SInsertStatementParam *pInsertParam, bool freeB
int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBlockMap) { int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBlockMap) {
const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg); const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
int code = 0; int code = 0;
bool isRawPayload = IS_RAW_PAYLOAD(pInsertParam->payloadType);
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES); SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
...@@ -1967,7 +1989,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl ...@@ -1967,7 +1989,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
STableDataBlocks* pOneTableBlock = *p; STableDataBlocks* pOneTableBlock = *p;
SBlockKeyInfo blkKeyInfo = {0}; // share by pOneTableBlock SBlockKeyInfo blkKeyInfo = {0}; // share by pOneTableBlock
while(pOneTableBlock) { while(pOneTableBlock) {
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData; SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
if (pBlocks->numOfRows > 0) { if (pBlocks->numOfRows > 0) {
...@@ -2008,21 +2030,29 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl ...@@ -2008,21 +2030,29 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
} }
} }
if ((code = tscSortRemoveDataBlockDupRows(pOneTableBlock, &blkKeyInfo)) != 0) { if (isRawPayload) {
taosHashCleanup(pVnodeDataBlockHashList); tscSortRemoveDataBlockDupRowsRaw(pOneTableBlock);
tscDestroyBlockArrayList(pVnodeDataBlockList); char* ekey = (char*)pBlocks->data + pOneTableBlock->rowSize * (pBlocks->numOfRows - 1);
tfree(dataBuf->pData);
tfree(blkKeyInfo.pKeyTuple);
return code;
}
ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0);
SBlockKeyTuple* pLastKeyTuple = blkKeyInfo.pKeyTuple + pBlocks->numOfRows - 1; tscDebug("0x%" PRIx64 " name:%s, tid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64,
tscDebug("0x%" PRIx64 " name:%s, tid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pInsertParam->objectId, tNameGetTableName(&pOneTableBlock->tableName), pBlocks->tid,
pInsertParam->objectId, tNameGetTableName(&pOneTableBlock->tableName), pBlocks->tid, pBlocks->numOfRows, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey));
pBlocks->sversion, blkKeyInfo.pKeyTuple->skey, pLastKeyTuple->skey); } else {
if ((code = tscSortRemoveDataBlockDupRows(pOneTableBlock, &blkKeyInfo)) != 0) {
taosHashCleanup(pVnodeDataBlockHashList);
tscDestroyBlockArrayList(pVnodeDataBlockList);
tfree(dataBuf->pData);
tfree(blkKeyInfo.pKeyTuple);
return code;
}
ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0);
SBlockKeyTuple* pLastKeyTuple = blkKeyInfo.pKeyTuple + pBlocks->numOfRows - 1;
tscDebug("0x%" PRIx64 " name:%s, tid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64,
pInsertParam->objectId, tNameGetTableName(&pOneTableBlock->tableName), pBlocks->tid,
pBlocks->numOfRows, pBlocks->sversion, blkKeyInfo.pKeyTuple->skey, pLastKeyTuple->skey);
}
int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + expandSize) + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta); int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + expandSize) + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
pBlocks->tid = htonl(pBlocks->tid); pBlocks->tid = htonl(pBlocks->tid);
...@@ -2032,7 +2062,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl ...@@ -2032,7 +2062,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
pBlocks->schemaLen = 0; pBlocks->schemaLen = 0;
// erase the empty space reserved for binary data // erase the empty space reserved for binary data
int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, pInsertParam->schemaAttached, blkKeyInfo.pKeyTuple); int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, pInsertParam, blkKeyInfo.pKeyTuple);
assert(finalLen <= len); assert(finalLen <= len);
dataBuf->size += (finalLen + sizeof(SSubmitBlk)); dataBuf->size += (finalLen + sizeof(SSubmitBlk));
......
...@@ -24,35 +24,6 @@ ...@@ -24,35 +24,6 @@
extern "C" { extern "C" {
#endif #endif
#pragma pack(push, 1)
typedef struct {
VarDataLenT len;
uint8_t data;
} SBinaryNullT;
typedef struct {
VarDataLenT len;
uint32_t data;
} SNCharNullT;
#pragma pack(pop)
extern const uint8_t BoolNull;
extern const uint8_t TinyintNull;
extern const uint16_t SmallintNull;
extern const uint32_t IntNull;
extern const uint64_t BigintNull;
extern const uint64_t TimestampNull;
extern const uint8_t UTinyintNull;
extern const uint16_t USmallintNull;
extern const uint32_t UIntNull;
extern const uint64_t UBigintNull;
extern const uint32_t FloatNull;
extern const uint64_t DoubleNull;
extern const SBinaryNullT BinaryNull;
extern const SNCharNullT NcharNull;
const void *tdGetNullVal(int8_t type);
#define STR_TO_VARSTR(x, str) \ #define STR_TO_VARSTR(x, str) \
do { \ do { \
VarDataLenT __len = (VarDataLenT)strlen(str); \ VarDataLenT __len = (VarDataLenT)strlen(str); \
...@@ -287,7 +258,7 @@ void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints); ...@@ -287,7 +258,7 @@ void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints);
// Get the data pointer from a column-wised data // Get the data pointer from a column-wised data
static FORCE_INLINE const void *tdGetColDataOfRow(SDataCol *pCol, int row) { static FORCE_INLINE const void *tdGetColDataOfRow(SDataCol *pCol, int row) {
if (isAllRowsNull(pCol)) { if (isAllRowsNull(pCol)) {
return tdGetNullVal(pCol->type); return getNullValue(pCol->type);
} }
if (IS_VAR_DATA_TYPE(pCol->type)) { if (IS_VAR_DATA_TYPE(pCol->type)) {
return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]); return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]);
......
...@@ -18,21 +18,6 @@ ...@@ -18,21 +18,6 @@
#include "tcoding.h" #include "tcoding.h"
#include "wchar.h" #include "wchar.h"
const uint8_t BoolNull = TSDB_DATA_BOOL_NULL;
const uint8_t TinyintNull = TSDB_DATA_TINYINT_NULL;
const uint16_t SmallintNull = TSDB_DATA_SMALLINT_NULL;
const uint32_t IntNull = TSDB_DATA_INT_NULL;
const uint64_t BigintNull = TSDB_DATA_BIGINT_NULL;
const uint64_t TimestampNull = TSDB_DATA_BIGINT_NULL;
const uint8_t UTinyintNull = TSDB_DATA_UTINYINT_NULL;
const uint16_t USmallintNull = TSDB_DATA_USMALLINT_NULL;
const uint32_t UIntNull = TSDB_DATA_UINT_NULL;
const uint64_t UBigintNull = TSDB_DATA_UBIGINT_NULL;
const uint32_t FloatNull = TSDB_DATA_FLOAT_NULL;
const uint64_t DoubleNull = TSDB_DATA_DOUBLE_NULL;
const SBinaryNullT BinaryNull = {1, TSDB_DATA_BINARY_NULL};
const SNCharNullT NcharNull = {4, TSDB_DATA_NCHAR_NULL};
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows); int limit2, int tRows);
...@@ -453,7 +438,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols ...@@ -453,7 +438,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
SDataCol *pDataCol = &(pCols->cols[dcol]); SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= schemaNCols(pSchema)) { if (rcol >= schemaNCols(pSchema)) {
// dataColSetNullAt(pDataCol, pCols->numOfRows); // dataColSetNullAt(pDataCol, pCols->numOfRows);
dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints); dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
dcol++; dcol++;
continue; continue;
} }
...@@ -468,7 +453,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols ...@@ -468,7 +453,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
rcol++; rcol++;
} else { } else {
// dataColSetNullAt(pDataCol, pCols->numOfRows); // dataColSetNullAt(pDataCol, pCols->numOfRows);
dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints); dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
dcol++; dcol++;
} }
} }
...@@ -498,7 +483,7 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo ...@@ -498,7 +483,7 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
SDataCol *pDataCol = &(pCols->cols[dcol]); SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) { if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) {
// dataColSetNullAt(pDataCol, pCols->numOfRows); // dataColSetNullAt(pDataCol, pCols->numOfRows);
dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints); dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
++dcol; ++dcol;
continue; continue;
} }
...@@ -514,7 +499,7 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo ...@@ -514,7 +499,7 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
++rcol; ++rcol;
} else { } else {
// dataColSetNullAt(pDataCol, pCols->numOfRows); // dataColSetNullAt(pDataCol, pCols->numOfRows);
dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints); dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
++dcol; ++dcol;
} }
} }
...@@ -799,40 +784,4 @@ SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) { ...@@ -799,40 +784,4 @@ SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size); memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
return row; return row;
}
const void *tdGetNullVal(int8_t type) {
switch (type) {
case TSDB_DATA_TYPE_BOOL:
return &BoolNull;
case TSDB_DATA_TYPE_TINYINT:
return &TinyintNull;
case TSDB_DATA_TYPE_SMALLINT:
return &SmallintNull;
case TSDB_DATA_TYPE_INT:
return &IntNull;
case TSDB_DATA_TYPE_BIGINT:
return &BigintNull;
case TSDB_DATA_TYPE_FLOAT:
return &FloatNull;
case TSDB_DATA_TYPE_DOUBLE:
return &DoubleNull;
case TSDB_DATA_TYPE_BINARY:
return &BinaryNull;
case TSDB_DATA_TYPE_TIMESTAMP:
return &TimestampNull;
case TSDB_DATA_TYPE_NCHAR:
return &NcharNull;
case TSDB_DATA_TYPE_UTINYINT:
return &UTinyintNull;
case TSDB_DATA_TYPE_USMALLINT:
return &USmallintNull;
case TSDB_DATA_TYPE_UINT:
return &UIntNull;
case TSDB_DATA_TYPE_UBIGINT:
return &UBigintNull;
default:
ASSERT(0);
return NULL;
}
} }
\ No newline at end of file
...@@ -492,30 +492,32 @@ void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) { ...@@ -492,30 +492,32 @@ void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) {
} }
} }
static uint8_t nullBool = TSDB_DATA_BOOL_NULL; static uint8_t nullBool = TSDB_DATA_BOOL_NULL;
static uint8_t nullTinyInt = TSDB_DATA_TINYINT_NULL; static uint8_t nullTinyInt = TSDB_DATA_TINYINT_NULL;
static uint16_t nullSmallInt = TSDB_DATA_SMALLINT_NULL; static uint16_t nullSmallInt = TSDB_DATA_SMALLINT_NULL;
static uint32_t nullInt = TSDB_DATA_INT_NULL; static uint32_t nullInt = TSDB_DATA_INT_NULL;
static uint64_t nullBigInt = TSDB_DATA_BIGINT_NULL; static uint64_t nullBigInt = TSDB_DATA_BIGINT_NULL;
static uint32_t nullFloat = TSDB_DATA_FLOAT_NULL; static uint32_t nullFloat = TSDB_DATA_FLOAT_NULL;
static uint64_t nullDouble = TSDB_DATA_DOUBLE_NULL; static uint64_t nullDouble = TSDB_DATA_DOUBLE_NULL;
static uint8_t nullTinyIntu = TSDB_DATA_UTINYINT_NULL; static uint8_t nullTinyIntu = TSDB_DATA_UTINYINT_NULL;
static uint16_t nullSmallIntu = TSDB_DATA_USMALLINT_NULL; static uint16_t nullSmallIntu = TSDB_DATA_USMALLINT_NULL;
static uint32_t nullIntu = TSDB_DATA_UINT_NULL; static uint32_t nullIntu = TSDB_DATA_UINT_NULL;
static uint64_t nullBigIntu = TSDB_DATA_UBIGINT_NULL; static uint64_t nullBigIntu = TSDB_DATA_UBIGINT_NULL;
static SBinaryNullT nullBinary = {1, TSDB_DATA_BINARY_NULL};
static union { static SNCharNullT nullNchar = {4, TSDB_DATA_NCHAR_NULL};
tstr str;
char pad[sizeof(tstr) + 4]; // static union {
} nullBinary = {.str = {.len = 1}}, nullNchar = {.str = {.len = 4}}; // tstr str;
// char pad[sizeof(tstr) + 4];
static void *nullValues[] = { // } nullBinary = {.str = {.len = 1}}, nullNchar = {.str = {.len = 4}};
static const void *nullValues[] = {
&nullBool, &nullTinyInt, &nullSmallInt, &nullInt, &nullBigInt, &nullBool, &nullTinyInt, &nullSmallInt, &nullInt, &nullBigInt,
&nullFloat, &nullDouble, &nullBinary, &nullBigInt, &nullNchar, &nullFloat, &nullDouble, &nullBinary, &nullBigInt, &nullNchar,
&nullTinyIntu, &nullSmallIntu, &nullIntu, &nullBigIntu, &nullTinyIntu, &nullSmallIntu, &nullIntu, &nullBigIntu,
}; };
void *getNullValue(int32_t type) { const void *getNullValue(int32_t type) {
assert(type >= TSDB_DATA_TYPE_BOOL && type <= TSDB_DATA_TYPE_UBIGINT); assert(type >= TSDB_DATA_TYPE_BOOL && type <= TSDB_DATA_TYPE_UBIGINT);
return nullValues[type - 1]; return nullValues[type - 1];
} }
......
...@@ -490,9 +490,9 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { ...@@ -490,9 +490,9 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
for (int32_t i = 0; i < pSchema->numOfCols; i++) { for (int32_t i = 0; i < pSchema->numOfCols; i++) {
STColumn *c = pSchema->columns + i; STColumn *c = pSchema->columns + i;
void* val = row[i]; void *val = row[i];
if (val == NULL) { if (val == NULL) {
val = getNullValue(c->type); val = (void *)getNullValue(c->type);
} else if (c->type == TSDB_DATA_TYPE_BINARY) { } else if (c->type == TSDB_DATA_TYPE_BINARY) {
val = ((char*)val) - sizeof(VarDataLenT); val = ((char*)val) - sizeof(VarDataLenT);
} else if (c->type == TSDB_DATA_TYPE_NCHAR) { } else if (c->type == TSDB_DATA_TYPE_NCHAR) {
......
...@@ -20,6 +20,18 @@ typedef struct tstr { ...@@ -20,6 +20,18 @@ typedef struct tstr {
char data[]; char data[];
} tstr; } tstr;
#pragma pack(push, 1)
typedef struct {
VarDataLenT len;
uint8_t data;
} SBinaryNullT;
typedef struct {
VarDataLenT len;
uint32_t data;
} SNCharNullT;
#pragma pack(pop)
#define VARSTR_HEADER_SIZE sizeof(VarDataLenT) #define VARSTR_HEADER_SIZE sizeof(VarDataLenT)
#define varDataLen(v) ((VarDataLenT *)(v))[0] #define varDataLen(v) ((VarDataLenT *)(v))[0]
...@@ -182,7 +194,7 @@ bool isValidDataType(int32_t type); ...@@ -182,7 +194,7 @@ bool isValidDataType(int32_t type);
void setVardataNull(char* val, int32_t type); void setVardataNull(char* val, int32_t type);
void setNull(char *val, int32_t type, int32_t bytes); void setNull(char *val, int32_t type, int32_t bytes);
void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems); void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems);
void *getNullValue(int32_t type); const void *getNullValue(int32_t type);
void assignVal(char *val, const char *src, int32_t len, int32_t type); void assignVal(char *val, const char *src, int32_t len, int32_t type);
void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf); void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf);
......
...@@ -787,7 +787,7 @@ static char *getTagIndexKey(const void *pData) { ...@@ -787,7 +787,7 @@ static char *getTagIndexKey(const void *pData) {
void * res = tdGetKVRowValOfCol(pTable->tagVal, pCol->colId); void * res = tdGetKVRowValOfCol(pTable->tagVal, pCol->colId);
if (res == NULL) { if (res == NULL) {
// treat the column as NULL if we cannot find it // treat the column as NULL if we cannot find it
res = getNullValue(pCol->type); res = (char*)getNullValue(pCol->type);
} }
return res; return res;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册