提交 9ed5a42c 编写于 作者: C Cary Xu

tsc raw data combination restructure

上级 e94bb935
......@@ -40,7 +40,8 @@ extern "C" {
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE))
#define KvRowNColsThresh 1 // default 1200. TODO: only for test, restore to default value after test finished
#define KvRowNColsThresh 1 // default 1200
#define KVRowRatio 0.85 // for NonVarType, we get value from SDataRow directly, while needs readdressing for SKVRow
#pragma pack(push,1)
// this struct is transfered as binary, padding two bytes to avoid
......@@ -96,11 +97,21 @@ typedef struct SVgroupTableInfo {
SArray *itemList; // SArray<STableIdInfo>
} SVgroupTableInfo;
typedef struct SBlockKeyTuple {
TSKEY skey;
void* payloadAddr;
} SBlockKeyTuple;
typedef struct SBlockKeyInfo {
int32_t nBytesAlloc;
SBlockKeyTuple* pKeyTuple;
} SBlockKeyInfo;
int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len);
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta);
void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf);
int tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo);
void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo);
void doRetrieveSubqueryData(SSchedMsg *pMsg);
......@@ -343,22 +354,6 @@ char* strdup_throw(const char* str);
bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src);
SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg);
typedef struct {
// for SDataRow
SSchema* pSchema;
int16_t sversion;
int32_t flen;
// for SKVRow
uint16_t nCols;
uint16_t size;
void* buf;
void* pDataBlock;
SSubmitBlk* pSubmitBlk;
} SMemRowBuilder;
SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder);
#ifdef __cplusplus
}
#endif
......
......@@ -95,6 +95,43 @@ typedef struct SParsedDataColInfo {
SBoundColumn *cols;
} SParsedDataColInfo;
typedef struct {
// for SDataRow
SSchema *pSchema;
int16_t sversion;
int32_t flen;
// for SKVRow
uint16_t nCols;
uint16_t size;
void * buf;
void * pDataBlock;
SSubmitBlk *pSubmitBlk;
uint16_t allNullLen;
} SMemRowBuilder;
int FORCE_INLINE initSMemRowBuilder(SMemRowBuilder *pBuilder, SSchema *pSSchema, uint16_t nCols,
uint16_t allNullColsLen) {
ASSERT(nCols > 0);
pBuilder->pSchema = pSSchema;
pBuilder->allNullLen = allNullColsLen; // TODO: get allNullColsLen when creating or altering table meta
if (pBuilder->allNullLen == 0) {
for (uint16_t i = 0; i < nCols; ++i) {
uint8_t type = pSSchema[i].type;
int32_t typeLen = TYPE_BYTES[type];
ASSERT(typeLen > 0);
pBuilder->allNullLen += typeLen;
if (TSDB_DATA_TYPE_BINARY == type) {
pBuilder->allNullLen += (sizeof(VarDataLenT) + CHAR_BYTES);
} else if (TSDB_DATA_TYPE_NCHAR == type) {
int len = sizeof(VarDataLenT) + TSDB_NCHAR_SIZE;
pBuilder->allNullLen += len;
}
}
}
return 0;
}
typedef struct STableDataBlocks {
SName tableName;
int8_t tsSource; // where does the UNIX timestamp come from, server or client
......@@ -114,7 +151,8 @@ typedef struct STableDataBlocks {
// for parameter ('?') binding
uint32_t numOfAllocedParams;
uint32_t numOfParams;
SParamInfo *params;
SParamInfo * params;
SMemRowBuilder rowBuilder;
} STableDataBlocks;
typedef struct {
......
......@@ -38,10 +38,16 @@ enum {
TSDB_USE_CLI_TS = 1,
};
static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;
static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t *numOfRows);
static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDataColInfo *pColInfo, SSchema *pSchema,
char *str, char **end);
static FORCE_INLINE int32_t getPaddingRowSize(STableComInfo *tinfo) {
return tinfo->rowSize + PAYLOAD_HEADER_LEN + PAYLOAD_ID_TYPE_LEN * tinfo->numOfColumns;
}
static int32_t tscToDouble(SStrToken *pToken, double *value, char **endPtr) {
errno = 0;
*value = strtold(pToken->z, endPtr);
......@@ -378,6 +384,349 @@ int32_t tsParseOneColumn(SSchema *pSchema, SStrToken *pToken, char *payload, cha
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE uint16_t tsSetColumnValue(char *payload, int16_t columnId, uint8_t columnType, void *value,
uint16_t valueLen) {
payloadColSetId(payload, columnId);
payloadColSetType(payload, columnType);
memcpy(payloadColValue(payload), value, valueLen);
return PAYLOAD_ID_TYPE_LEN + valueLen;
}
static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *payload, char *msg, char **str,
bool primaryKey, int16_t timePrec, uint16_t *sizeAppend, bool *isColNull,
TDRowLenT *dataRowColDeltaLen, TDRowLenT *kvRowColLen) {
int64_t iv;
int32_t ret;
char * endptr = NULL;
if (IS_NUMERIC_TYPE(pSchema->type) && pToken->n == 0) {
return tscInvalidOperationMsg(msg, "invalid numeric data", pToken->z);
}
switch (pSchema->type) {
case TSDB_DATA_TYPE_BOOL: { // bool
if (isNullStr(pToken)) {
// *((uint8_t *)payload) = TSDB_DATA_BOOL_NULL;
*isColNull = true;
} else {
if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
if (strncmp(pToken->z, "true", pToken->n) == 0) {
// *(uint8_t *)payload = TSDB_TRUE;
*sizeAppend =
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &TRUE_VALUE, TYPE_BYTES[TSDB_DATA_TYPE_BOOL]);
*kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL];
} else if (strncmp(pToken->z, "false", pToken->n) == 0) {
// *(uint8_t *)payload = TSDB_FALSE;
*sizeAppend =
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &FALSE_VALUE, TYPE_BYTES[TSDB_DATA_TYPE_BOOL]);
*kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL];
} else {
return tscSQLSyntaxErrMsg(msg, "invalid bool data", pToken->z);
}
} else if (pToken->type == TK_INTEGER) {
iv = strtoll(pToken->z, NULL, 10);
// *(uint8_t *)payload = (int8_t)((iv == 0) ? TSDB_FALSE : TSDB_TRUE);
*sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type,
((iv == 0) ? &FALSE_VALUE : &TRUE_VALUE), TYPE_BYTES[TSDB_DATA_TYPE_BOOL]);
*kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL];
} else if (pToken->type == TK_FLOAT) {
double dv = strtod(pToken->z, NULL);
// *(uint8_t *)payload = (int8_t)((dv == 0) ? TSDB_FALSE : TSDB_TRUE);
*sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type,
((dv == 0) ? &FALSE_VALUE : &TRUE_VALUE), TYPE_BYTES[TSDB_DATA_TYPE_BOOL]);
*kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL];
} else {
return tscInvalidOperationMsg(msg, "invalid bool data", pToken->z);
}
}
break;
}
case TSDB_DATA_TYPE_TINYINT:
if (isNullStr(pToken)) {
// *((uint8_t *)payload) = TSDB_DATA_TINYINT_NULL;
*isColNull = true;
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
return tscInvalidOperationMsg(msg, "invalid tinyint data", pToken->z);
} else if (!IS_VALID_TINYINT(iv)) {
return tscInvalidOperationMsg(msg, "data overflow", pToken->z);
}
// *((uint8_t *)payload) = (uint8_t)iv;
uint8_t tmpVal = (uint8_t)iv;
*sizeAppend =
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_TINYINT]);
*kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_TINYINT];
}
break;
case TSDB_DATA_TYPE_UTINYINT:
if (isNullStr(pToken)) {
// *((uint8_t *)payload) = TSDB_DATA_UTINYINT_NULL;
*isColNull = true;
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
return tscInvalidOperationMsg(msg, "invalid unsigned tinyint data", pToken->z);
} else if (!IS_VALID_UTINYINT(iv)) {
return tscInvalidOperationMsg(msg, "unsigned tinyint data overflow", pToken->z);
}
// *((uint8_t *)payload) = (uint8_t)iv;
uint8_t tmpVal = (uint8_t)iv;
*sizeAppend =
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_UTINYINT]);
*kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_UTINYINT];
}
break;
case TSDB_DATA_TYPE_SMALLINT:
if (isNullStr(pToken)) {
// *((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL;
*isColNull = true;
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
return tscInvalidOperationMsg(msg, "invalid smallint data", pToken->z);
} else if (!IS_VALID_SMALLINT(iv)) {
return tscInvalidOperationMsg(msg, "smallint data overflow", pToken->z);
}
int16_t tmpVal = (int16_t)iv;
*sizeAppend =
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_SMALLINT]);
*kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_SMALLINT];
}
break;
case TSDB_DATA_TYPE_USMALLINT:
if (isNullStr(pToken)) {
// *((uint16_t *)payload) = TSDB_DATA_USMALLINT_NULL;
*isColNull = true;
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
return tscInvalidOperationMsg(msg, "invalid unsigned smallint data", pToken->z);
} else if (!IS_VALID_USMALLINT(iv)) {
return tscInvalidOperationMsg(msg, "unsigned smallint data overflow", pToken->z);
}
// *((uint16_t *)payload) = (uint16_t)iv;
uint16_t tmpVal = (uint16_t)iv;
*sizeAppend =
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_USMALLINT]);
*kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_USMALLINT];
}
break;
case TSDB_DATA_TYPE_INT:
if (isNullStr(pToken)) {
// *((int32_t *)payload) = TSDB_DATA_INT_NULL;
*isColNull = true;
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
return tscInvalidOperationMsg(msg, "invalid int data", pToken->z);
} else if (!IS_VALID_INT(iv)) {
return tscInvalidOperationMsg(msg, "int data overflow", pToken->z);
}
// *((int32_t *)payload) = (int32_t)iv;
int32_t tmpVal = (int32_t)iv;
*sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_INT]);
*kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_INT];
}
break;
case TSDB_DATA_TYPE_UINT:
if (isNullStr(pToken)) {
// *((uint32_t *)payload) = TSDB_DATA_UINT_NULL;
*isColNull = true;
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
return tscInvalidOperationMsg(msg, "invalid unsigned int data", pToken->z);
} else if (!IS_VALID_UINT(iv)) {
return tscInvalidOperationMsg(msg, "unsigned int data overflow", pToken->z);
}
// *((uint32_t *)payload) = (uint32_t)iv;
uint32_t tmpVal = (uint32_t)iv;
*sizeAppend =
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_UINT]);
*kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_UINT];
}
break;
case TSDB_DATA_TYPE_BIGINT:
if (isNullStr(pToken)) {
// *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
*isColNull = true;
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
return tscInvalidOperationMsg(msg, "invalid bigint data", pToken->z);
} else if (!IS_VALID_BIGINT(iv)) {
return tscInvalidOperationMsg(msg, "bigint data overflow", pToken->z);
}
// *((int64_t *)payload) = iv;
// int64_t tmpVal = (int64_t)iv;
*sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &iv, TYPE_BYTES[TSDB_DATA_TYPE_BIGINT]);
*kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BIGINT];
}
break;
case TSDB_DATA_TYPE_UBIGINT:
if (isNullStr(pToken)) {
// *((uint64_t *)payload) = TSDB_DATA_UBIGINT_NULL;
*isColNull = true;
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
return tscInvalidOperationMsg(msg, "invalid unsigned bigint data", pToken->z);
} else if (!IS_VALID_UBIGINT((uint64_t)iv)) {
return tscInvalidOperationMsg(msg, "unsigned bigint data overflow", pToken->z);
}
// *((uint64_t *)payload) = iv;
*sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &iv, TYPE_BYTES[TSDB_DATA_TYPE_UBIGINT]);
*kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_UBIGINT];
}
break;
case TSDB_DATA_TYPE_FLOAT:
if (isNullStr(pToken)) {
// *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
*isColNull = true;
} else {
double dv;
if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
return tscInvalidOperationMsg(msg, "illegal float data", pToken->z);
}
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) ||
isnan(dv)) {
return tscInvalidOperationMsg(msg, "illegal float data", pToken->z);
}
// *((float *)payload) = (float)dv;
// SET_FLOAT_VAL(payload, dv);
float tmpVal = (float)dv;
*sizeAppend =
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_FLOAT]);
*kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_FLOAT];
}
break;
case TSDB_DATA_TYPE_DOUBLE:
if (isNullStr(pToken)) {
// *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
*isColNull = true;
} else {
double dv;
if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
return tscInvalidOperationMsg(msg, "illegal double data", pToken->z);
}
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
return tscInvalidOperationMsg(msg, "illegal double data", pToken->z);
}
// *((double *)payload) = dv;
*sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &dv, TYPE_BYTES[TSDB_DATA_TYPE_DOUBLE]);
*kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_DOUBLE];
}
break;
case TSDB_DATA_TYPE_BINARY:
// binary data cannot be null-terminated char string, otherwise the last char of the string is lost
if (pToken->type == TK_NULL) {
// setVardataNull(payload, TSDB_DATA_TYPE_BINARY);
*isColNull = true;
} else { // too long values will return invalid sql, not be truncated automatically
if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor
return tscInvalidOperationMsg(msg, "string data overflow", pToken->z);
}
// STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n);
payloadColSetId(payload, pSchema->colId);
payloadColSetType(payload, pSchema->type);
varDataSetLen(payload + PAYLOAD_ID_TYPE_LEN, pToken->n);
memcpy(varDataVal(payload + PAYLOAD_ID_TYPE_LEN), pToken->z, pToken->n);
*sizeAppend = PAYLOAD_ID_TYPE_LEN + sizeof(VarDataLenT) + pToken->n;
*dataRowColDeltaLen += (pToken->n - sizeof(uint8_t));
*kvRowColLen += sizeof(SColIdx) + sizeof(VarDataLenT) + pToken->n;
}
break;
case TSDB_DATA_TYPE_NCHAR:
if (pToken->type == TK_NULL) {
// setVardataNull(payload, TSDB_DATA_TYPE_NCHAR);
*isColNull = true;
} else {
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
int32_t output = 0;
payloadColSetId(payload, pSchema->colId);
payloadColSetType(payload, pSchema->type);
if (!taosMbsToUcs4(pToken->z, pToken->n, varDataVal(payload + PAYLOAD_ID_TYPE_LEN),
pSchema->bytes - VARSTR_HEADER_SIZE, &output)) {
char buf[512] = {0};
snprintf(buf, tListLen(buf), "%s", strerror(errno));
return tscInvalidOperationMsg(msg, buf, pToken->z);
}
varDataSetLen(payload + PAYLOAD_ID_TYPE_LEN, output);
*sizeAppend = PAYLOAD_ID_TYPE_LEN + sizeof(VarDataLenT) + output;
*dataRowColDeltaLen += (output - sizeof(uint32_t));
*kvRowColLen += sizeof(SColIdx) + sizeof(VarDataLenT) + output;
}
break;
case TSDB_DATA_TYPE_TIMESTAMP: {
if (pToken->type == TK_NULL) {
if (primaryKey) {
// *((int64_t *)payload) = 0;
// When building SKVRow primaryKey, we should not skip even with NULL value.
int64_t tmpVal = 0;
*sizeAppend =
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP]);
*kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP];
} else {
// *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
*isColNull = true;
}
} else {
int64_t tmpVal;
if (tsParseTime(pToken, &tmpVal, str, msg, timePrec) != TSDB_CODE_SUCCESS) {
return tscInvalidOperationMsg(msg, "invalid timestamp", pToken->z);
}
// *((int64_t *)payload) = tmpVal;
*sizeAppend =
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP]);
*kvRowColLen += sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP];
}
break;
}
}
return TSDB_CODE_SUCCESS;
}
/*
* The server time/client time should not be mixed up in one sql string
* Do not employ sort operation is not involved if server time is used.
......@@ -414,23 +763,32 @@ int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start) {
return TSDB_CODE_SUCCESS;
}
int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, int32_t *len,
char *tmpTokenBuf, SInsertStatementParam* pInsertParam) {
int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, int32_t *len, char *tmpTokenBuf,
SInsertStatementParam *pInsertParam) {
int32_t index = 0;
SStrToken sToken = {0};
char *payload = pDataBlocks->pData + pDataBlocks->size;
SMemRowBuilder *pBuilder = &pDataBlocks->rowBuilder;
char * payload = pDataBlocks->pData + pDataBlocks->size;
SParsedDataColInfo *spd = &pDataBlocks->boundColumnInfo;
SSchema *schema = tscGetTableSchema(pDataBlocks->pTableMeta);
SSchema * schema = tscGetTableSchema(pDataBlocks->pTableMeta);
// 1. set the parsed value from sql string
int32_t rowSize = 0;
uint16_t rowSizeAppended = 0;
uint16_t nColsNotNull = 0;
TDRowLenT dataRowLen = pBuilder->allNullLen;
TDRowLenT kvRowLen = TD_MEM_ROW_KV_VER_SIZE;
char *kvStart = payload;
for (int i = 0; i < spd->numOfBound; ++i) {
// the start position in data block buffer of current value in sql
int32_t colIndex = spd->boundedColumns[i];
int32_t colIndex = spd->boundedColumns[i]; // ordered
char *start = payload + spd->cols[colIndex].offset;
SSchema *pSchema = &schema[colIndex];
SSchema *pSchema = &schema[colIndex]; // get colId here
rowSize += pSchema->bytes;
index = 0;
......@@ -453,7 +811,8 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
int16_t type = sToken.type;
if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL &&
type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || (sToken.n == 0) || (type == TK_RP)) {
type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) ||
(sToken.n == 0) || (type == TK_RP)) {
return tscSQLSyntaxErrMsg(pInsertParam->msg, "invalid data or symbol", sToken.z);
}
......@@ -488,41 +847,43 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
}
bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX);
int32_t ret = tsParseOneColumn(pSchema, &sToken, start, pInsertParam->msg, str, isPrimaryKey, timePrec);
bool isColNull = false;
TDRowLenT dataRowDeltaColLen = 0; // When combine the data as SDataRow, the delta len between all NULL columns.
TDRowLenT kvRowColLen = 0;
uint16_t colSizeAppended = 0;
int32_t ret =
tsParseOneColumnKV(pSchema, &sToken, kvStart + PAYLOAD_HEADER_LEN, pInsertParam->msg, str, isPrimaryKey,
timePrec, &colSizeAppended, &isColNull, &dataRowDeltaColLen, &kvRowColLen);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, start) != TSDB_CODE_SUCCESS) {
if (isPrimaryKey &&
tsCheckTimestamp(pDataBlocks, payloadKeyAddr(kvStart)) != TSDB_CODE_SUCCESS) {
tscInvalidOperationMsg(pInsertParam->msg, "client time/server time can not be mixed up", sToken.z);
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
if (isColNull == false) {
++nColsNotNull;
}
kvRowLen += kvRowColLen;
dataRowLen += dataRowDeltaColLen;
kvStart += colSizeAppended; // move to next column
rowSizeAppended += colSizeAppended; // calculate rowLen
}
// 2. set the null value for the columns that do not assign values
if (spd->numOfBound < spd->numOfCols) {
char *ptr = payload;
for (int32_t i = 0; i < spd->numOfCols; ++i) {
if (!spd->cols[i].hasVal) { // current column do not have any value to insert, set it to null
if (schema[i].type == TSDB_DATA_TYPE_BINARY) {
varDataSetLen(ptr, sizeof(int8_t));
*(uint8_t*) varDataVal(ptr) = TSDB_DATA_BINARY_NULL;
} else if (schema[i].type == TSDB_DATA_TYPE_NCHAR) {
varDataSetLen(ptr, sizeof(int32_t));
*(uint32_t*) varDataVal(ptr) = TSDB_DATA_NCHAR_NULL;
if (kvRowLen < dataRowLen) {
payloadSetType(payload, SMEM_ROW_KV);
} else {
setNull(ptr, schema[i].type, schema[i].bytes);
}
payloadSetType(payload, SMEM_ROW_DATA);
}
ptr += schema[i].bytes;
}
*(uint16_t *)(payload + sizeof(uint8_t)) = nColsNotNull;
*len = PAYLOAD_HEADER_LEN + rowSizeAppended;
rowSize = (int32_t)(ptr - payload);
}
payloadSetNCols(payload, nColsNotNull);
payloadSetTLen(payload, *len);
*len = rowSize;
return TSDB_CODE_SUCCESS;
}
......@@ -551,21 +912,26 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
int32_t precision = tinfo.precision;
int32_t rowSizeWithColIdType = getPaddingRowSize(&tinfo);
initSMemRowBuilder(&pDataBlock->rowBuilder, tscGetTableSchema(pDataBlock->pTableMeta),
tscGetNumOfColumns(pDataBlock->pTableMeta), 0);
while (1) {
index = 0;
sToken = tStrGetToken(*str, &index, false);
if (sToken.n == 0 || sToken.type != TK_LP) break;
*str += index;
if ((*numOfRows) >= maxRows || pDataBlock->size + tinfo.rowSize >= pDataBlock->nAllocSize) {
if ((*numOfRows) >= maxRows || pDataBlock->size + rowSizeWithColIdType >= pDataBlock->nAllocSize) {
int32_t tSize;
code = tscAllocateMemIfNeed(pDataBlock, tinfo.rowSize, &tSize);
code = tscAllocateMemIfNeed(pDataBlock, rowSizeWithColIdType, &tSize);
if (code != TSDB_CODE_SUCCESS) { //TODO pass the correct error code to client
strcpy(pInsertParam->msg, "client out of memory");
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
ASSERT(tSize > maxRows);
ASSERT(tSize >= maxRows);
maxRows = tSize;
}
......@@ -623,7 +989,7 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int3
// expand the allocated size
if (remain < rowSize * factor) {
while (remain < rowSize * factor) {
pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 1.5);
pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 2);
remain = pDataBlock->nAllocSize - pDataBlock->size;
}
......@@ -657,7 +1023,7 @@ static int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta,
}
// data block is disordered, sort it in ascending order
void tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf) {
void tscSortRemoveDataBlockDupRowsXX(STableDataBlocks *dataBuf) {
SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData;
// size is less than the total size, since duplicated rows may be removed yet.
......@@ -701,11 +1067,83 @@ void tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf) {
dataBuf->prevTS = INT64_MIN;
}
// data block is disordered, sort it in ascending order
int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlkKeyInfo) {
SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData;
int16_t nRows = pBlocks->numOfRows;
// size is less than the total size, since duplicated rows may be removed yet.
// if use server time, this block must be ordered
if (dataBuf->tsSource == TSDB_USE_SERVER_TS) {
assert(dataBuf->ordered);
}
// allocate memory
size_t curBlkTupleSize = nRows * sizeof(SBlockKeyTuple);
if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->nBytesAlloc < curBlkTupleSize) {
char *tmp = realloc(pBlkKeyInfo->pKeyTuple, curBlkTupleSize);
if (tmp == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pBlkKeyInfo->pKeyTuple = (SBlockKeyTuple *)tmp;
pBlkKeyInfo->nBytesAlloc = curBlkTupleSize;
}
memset(pBlkKeyInfo->pKeyTuple, 0, curBlkTupleSize);
SBlockKeyTuple *pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
char * pBlockData = pBlocks->data;
int n = 0;
uint32_t totolPayloadLen = 0;
TDRowLenT payloadTLen = 0;
while (n < nRows) {
pBlkKeyTuple->skey = payloadKey(pBlockData);
pBlkKeyTuple->payloadAddr = pBlockData;
payloadTLen = payloadTLen(pBlockData);
totolPayloadLen += payloadTLen;
// next loop
pBlockData += payloadTLen;
++pBlkKeyTuple;
++n;
}
if (!dataBuf->ordered) {
qsort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataCompar);
pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
int32_t i = 0;
int32_t j = 1;
while (j < nRows) {
TSKEY ti = *(TSKEY *)(pBlkKeyTuple + sizeof(SBlockKeyTuple) * i);
TSKEY tj = *(TSKEY *)(pBlkKeyTuple + sizeof(SBlockKeyTuple) * j);
if (ti == tj) {
totolPayloadLen -= payloadTLen(pBlkKeyTuple + sizeof(SBlockKeyTuple) * j);
++j;
continue;
}
int32_t nextPos = (++i);
if (nextPos != j) {
memmove(pBlkKeyTuple + sizeof(SBlockKeyTuple) * nextPos, pBlkKeyTuple + sizeof(SBlockKeyTuple) * j, sizeof(SBlockKeyTuple));
}
++j;
}
dataBuf->ordered = true;
pBlocks->numOfRows = i + 1;
}
dataBuf->size = sizeof(SSubmitBlk) + totolPayloadLen;
dataBuf->prevTS = INT64_MIN;
return 0;
}
static int32_t doParseInsertStatement(SInsertStatementParam *pInsertParam, char **str, STableDataBlocks* dataBuf, int32_t *totalNum) {
STableComInfo tinfo = tscGetTableInfo(dataBuf->pTableMeta);
int32_t maxNumOfRows;
int32_t code = tscAllocateMemIfNeed(dataBuf, tinfo.rowSize, &maxNumOfRows);
int32_t code = tscAllocateMemIfNeed(dataBuf, getPaddingRowSize(&tinfo), &maxNumOfRows);
if (TSDB_CODE_SUCCESS != code) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......@@ -1489,21 +1927,24 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
}
STableDataBlocks *pTableDataBlock = NULL;
int32_t ret =
tscGetDataBlockFromList(pInsertParam->pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk),
tinfo.rowSize, &pTableMetaInfo->name, pTableMeta, &pTableDataBlock, NULL);
int32_t ret = tscGetDataBlockFromList(pInsertParam->pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE,
sizeof(SSubmitBlk), tinfo.rowSize, &pTableMetaInfo->name, pTableMeta,
&pTableDataBlock, NULL);
if (ret != TSDB_CODE_SUCCESS) {
pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
tscAllocateMemIfNeed(pTableDataBlock, tinfo.rowSize, &maxRows);
tscAllocateMemIfNeed(pTableDataBlock, getPaddingRowSize(&tinfo), &maxRows);
tokenBuf = calloc(1, TSDB_MAX_BYTES_PER_ROW);
if (tokenBuf == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
initSMemRowBuilder(&pTableDataBlock->rowBuilder, tscGetTableSchema(pTableDataBlock->pTableMeta),
tscGetNumOfColumns(pTableDataBlock->pTableMeta), 0);
while ((readLen = tgetline(&line, &n, fp)) != -1) {
if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
line[--readLen] = 0;
......
......@@ -1639,61 +1639,29 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE uint8_t checkTdRowType(SSchema* pSchema, void* pData, int32_t nCols, int32_t flen,
uint16_t* nColsNotNull) {
ASSERT(pData != NULL);
if (nCols < KvRowNColsThresh) {
return SMEM_ROW_DATA;
}
int32_t dataRowLength = flen;
int32_t kvRowLength = TD_MEM_ROW_KV_VER_SIZE;
uint16_t nColsNull = 0;
char* p = (char*)pData;
for (int i = 0; i < nCols; ++i) {
if (IS_VAR_DATA_TYPE(pSchema[i].type)) {
dataRowLength += varDataTLen(p);
if (!isNull(p, pSchema[i].type)) {
kvRowLength += (sizeof(SColIdx) + varDataTLen(p));
} else {
++nColsNull;
}
} else {
if (!isNull(p, pSchema[i].type)) {
kvRowLength += (sizeof(SColIdx) + TYPE_BYTES[pSchema[i].type]);
} else {
++nColsNull;
}
}
// next column
p += pSchema[i].bytes;
}
tscDebug("nColsNull %d, nCols: %d, kvRowLen: %d, dataRowLen: %d", (int32_t)nColsNull, nCols, kvRowLength,
dataRowLength);
if (kvRowLength < dataRowLength) {
if (nColsNotNull) {
*nColsNotNull = nCols - nColsNull;
}
return SMEM_ROW_KV;
}
return SMEM_ROW_DATA;
}
SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) {
static SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) {
SSchema* pSchema = pBuilder->pSchema;
char* p = (char*)pBuilder->buf;
int toffset = 0;
if(pBuilder->nCols <= 0){
uint16_t nCols = pBuilder->nCols;
// RawRow payload structure:
// |<---------- header ------------->|<------- column data array ------->|
// |SMemRowType| dataLen | nCols | colId | colType | value |...|...|
// +-----------+----------+----------+---------------------------------->|
// | uint8_t | uint16_t | uint16_t | int16_t | uint8_t | ??? |...|...|
// +-----------+----------+----------+---------------------------------->|
uint8_t memRowType = payloadType(p);
uint16_t nColsNotNull = payloadNCols(p);
if (pBuilder->nCols <= 0 || nColsNotNull <= 0) {
return NULL;
}
ASSERT(nColsNotNull <= nCols);
uint16_t nColsNotNull = 0;
uint8_t memRowType = checkTdRowType(pSchema, p, pBuilder->nCols, pBuilder->flen, &nColsNotNull);
// nColsNotNull = pBuilder->nCols;
SMemRow* memRow = (SMemRow)pBuilder->pDataBlock;
memRowSetType(memRow, memRowType);
......@@ -1702,12 +1670,39 @@ SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) {
dataRowSetLen(trow, (uint16_t)(TD_DATA_ROW_HEAD_SIZE + pBuilder->flen));
dataRowSetVersion(trow, pBuilder->sversion);
p = (char*)pBuilder->buf;
for (int32_t j = 0; j < pBuilder->nCols; ++j) {
tdAppendColVal(trow, p, pSchema[j].type, pSchema[j].bytes, toffset);
p = (char*)payloadBody(pBuilder->buf);
uint16_t i = 0, j = 0;
while (j < pBuilder->nCols) {
if (i >= nColsNotNull) {
break;
}
int16_t colId = *(int16_t*)p;
if (colId == pSchema[j].colId) {
tdAppendColVal(trow, payloadColValue(p), pSchema[j].type, toffset);
toffset += TYPE_BYTES[pSchema[j].type];
p += pSchema[j].bytes;
p = skipToNextEles(p);
++i;
++j;
} else if (colId < pSchema[j].colId) {
p = skipToNextEles(p);
++i;
} else {
tdAppendColVal(trow, tdGetNullVal(pSchema[j].type), pSchema[j].type, toffset);
toffset += TYPE_BYTES[pSchema[j].type];
++j;
}
}
while (j < pBuilder->nCols) {
tdAppendColVal(trow, tdGetNullVal(pSchema[j].type), pSchema[j].type, toffset);
toffset += TYPE_BYTES[pSchema[j].type];
++j;
}
while (i < nColsNotNull) {
p = skipToNextEles(p);
++i;
}
pBuilder->buf = p;
} else if (memRowType == SMEM_ROW_KV) {
ASSERT(nColsNotNull <= pBuilder->nCols);
......@@ -1717,14 +1712,17 @@ SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) {
kvRowSetNCols(kvRow, nColsNotNull);
memRowKvSetVersion(memRow, pBuilder->sversion);
p = (char*)pBuilder->buf;
for (int32_t j = 0; j < pBuilder->nCols; ++j) {
if(!isNull(p, pSchema[j].type)) {
tdAppendKvColVal(kvRow, p, pSchema[j].colId, pSchema[j].type, toffset);
p = (char*)payloadBody(pBuilder->buf);
int i = 0;
while (i < nColsNotNull) {
int16_t colId = payloadColId(p);
uint8_t colType = payloadColType(p);
tdAppendKvColVal(kvRow, payloadColValue(p), colId, colType, toffset);
toffset += sizeof(SColIdx);
p = skipToNextEles(p);
++i;
}
p += pSchema[j].bytes;
}
pBuilder->buf = p;
} else {
......@@ -1738,11 +1736,12 @@ SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) {
}
// Erase the empty space reserved for binary data
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bool includeSchema) {
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bool includeSchema, SBlockKeyTuple *blkKeyTuple) {
// TODO: optimize this function, handle the case while binary is not presented
STableMeta* pTableMeta = pTableDataBlock->pTableMeta;
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
SSchema* pSchema = tscGetTableSchema(pTableMeta);
SMemRowBuilder* pBuilder = &pTableDataBlock->rowBuilder;
SSubmitBlk* pBlock = pDataBlock;
memcpy(pDataBlock, pTableDataBlock->pData, sizeof(SSubmitBlk));
......@@ -1778,18 +1777,18 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo
pBlock->dataLen = 0;
int32_t numOfRows = htons(pBlock->numOfRows);
SMemRowBuilder mRowBuilder;
mRowBuilder.pSchema = pSchema;
mRowBuilder.sversion = pTableMeta->sversion;
mRowBuilder.flen = flen;
mRowBuilder.nCols = tinfo.numOfColumns;
mRowBuilder.pDataBlock = pDataBlock;
mRowBuilder.pSubmitBlk = pBlock;
mRowBuilder.buf = p;
mRowBuilder.size = 0;
pBuilder->pSchema = pSchema;
pBuilder->sversion = pTableMeta->sversion;
pBuilder->flen = flen;
pBuilder->nCols = tinfo.numOfColumns;
pBuilder->pDataBlock = pDataBlock;
pBuilder->pSubmitBlk = pBlock;
pBuilder->buf = p;
pBuilder->size = 0;
for (int32_t i = 0; i < numOfRows; ++i) {
tdGenMemRowFromBuilder(&mRowBuilder);
pBuilder->buf = (blkKeyTuple+i)->payloadAddr;
tdGenMemRowFromBuilder(pBuilder);
}
int32_t len = pBlock->dataLen + pBlock->schemaLen;
......@@ -1807,9 +1806,8 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
if (IS_VAR_DATA_TYPE((pSchema + i)->type)) {
result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
}
result += sizeof(SColIdx);
}
result += TD_MEM_ROW_TYPE_SIZE; // add len of SMemRow flag
result += TD_MEM_ROW_KV_TYPE_VER_SIZE; // add prefix len of KV type SMemRow(we may use SDataRow or SKVRow)
return result;
}
......@@ -1838,13 +1836,16 @@ static void extractTableNameList(SInsertStatementParam *pInsertParam, bool freeB
int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBlockMap) {
const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
int code = 0;
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
STableDataBlocks** p = taosHashIterate(pInsertParam->pTableBlockHashList, NULL);
STableDataBlocks* pOneTableBlock = *p;
SBlockKeyInfo blkKeyInfo = {0}; // share by pOneTableBlock
while(pOneTableBlock) {
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
if (pBlocks->numOfRows > 0) {
......@@ -1858,6 +1859,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
tscError("0x%"PRIx64" failed to prepare the data block buffer for merging table data, code:%d", pInsertParam->objectId, ret);
taosHashCleanup(pVnodeDataBlockHashList);
tscDestroyBlockArrayList(pVnodeDataBlockList);
tfree(blkKeyInfo.pKeyTuple);
return ret;
}
......@@ -1878,16 +1880,26 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
taosHashCleanup(pVnodeDataBlockHashList);
tscDestroyBlockArrayList(pVnodeDataBlockList);
tfree(dataBuf->pData);
tfree(blkKeyInfo.pKeyTuple);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
tscSortRemoveDataBlockDupRows(pOneTableBlock);
char* ekey = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1);
if((code = tscSortRemoveDataBlockDupRows(pOneTableBlock, &blkKeyInfo)) != 0){
taosHashCleanup(pVnodeDataBlockHashList);
tscDestroyBlockArrayList(pVnodeDataBlockList);
tfree(dataBuf->pData);
tfree(blkKeyInfo.pKeyTuple);
return code;
}
ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0);
tscDebug("0x%"PRIx64" name:%s, tid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pInsertParam->objectId, tNameGetTableName(&pOneTableBlock->tableName),
pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey));
SBlockKeyTuple* pLastKeyTuple = blkKeyInfo.pKeyTuple + pBlocks->numOfRows - 1;
tscDebug("0x%" PRIx64 " name:%s, tid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64,
pInsertParam->objectId, tNameGetTableName(&pOneTableBlock->tableName), pBlocks->tid, pBlocks->numOfRows,
pBlocks->sversion, blkKeyInfo.pKeyTuple->skey, pLastKeyTuple->skey);
int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + expandSize) + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
......@@ -1898,7 +1910,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
pBlocks->schemaLen = 0;
// erase the empty space reserved for binary data
int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, pInsertParam->schemaAttached);
int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, pInsertParam->schemaAttached, blkKeyInfo.pKeyTuple);
assert(finalLen <= len);
dataBuf->size += (finalLen + sizeof(SSubmitBlk));
......@@ -1926,6 +1938,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
// free the table data blocks;
pInsertParam->pDataBlocks = pVnodeDataBlockList;
taosHashCleanup(pVnodeDataBlockHashList);
tfree(blkKeyInfo.pKeyTuple);
return TSDB_CODE_SUCCESS;
}
......
......@@ -24,6 +24,33 @@
extern "C" {
#endif
#pragma pack(push, 1)
typedef struct {
VarDataLenT len;
uint8_t data;
} SBinaryNullT;
typedef struct {
VarDataLenT len;
uint32_t data;
} SNCharNullT;
#pragma pack(pop)
extern const uint8_t BoolNull;
extern const uint8_t TinyintNull;
extern const uint16_t SmallintNull;
extern const uint32_t IntNull;
extern const uint64_t BigintNull;
extern const uint64_t TimestampNull;
extern const uint8_t UTinyintNull;
extern const uint16_t USmallintNull;
extern const uint32_t UIntNull;
extern const uint64_t UBigintNull;
extern const uint32_t FloatNull;
extern const uint64_t DoubleNull;
extern const SBinaryNullT BinaryNull;
extern const SNCharNullT NcharNull;
#define STR_TO_VARSTR(x, str) \
do { \
VarDataLenT __len = (VarDataLenT)strlen(str); \
......@@ -207,7 +234,7 @@ SDataRow tdDataRowDup(SDataRow row);
SMemRow tdMemRowDup(SMemRow row);
// offset here not include dataRow header length
static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_t offset) {
static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t type, int32_t offset) {
ASSERT(value != NULL);
int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE;
char * ptr = (char *)POINTER_SHIFT(row, dataRowLen(row));
......@@ -260,6 +287,42 @@ void dataColSetOffset(SDataCol *pCol, int nEle);
bool isNEleNull(SDataCol *pCol, int nEle);
void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints);
static FORCE_INLINE const void *tdGetNullVal(int8_t type) {
switch (type) {
case TSDB_DATA_TYPE_BOOL:
return &BoolNull;
case TSDB_DATA_TYPE_TINYINT:
return &TinyintNull;
case TSDB_DATA_TYPE_SMALLINT:
return &SmallintNull;
case TSDB_DATA_TYPE_INT:
return &IntNull;
case TSDB_DATA_TYPE_BIGINT:
return &BigintNull;
case TSDB_DATA_TYPE_FLOAT:
return &FloatNull;
case TSDB_DATA_TYPE_DOUBLE:
return &DoubleNull;
case TSDB_DATA_TYPE_BINARY:
return &BinaryNull;
case TSDB_DATA_TYPE_TIMESTAMP:
return &TimestampNull;
case TSDB_DATA_TYPE_NCHAR:
return &NcharNull;
case TSDB_DATA_TYPE_UTINYINT:
return &UTinyintNull;
case TSDB_DATA_TYPE_USMALLINT:
return &USmallintNull;
case TSDB_DATA_TYPE_UINT:
return &UIntNull;
case TSDB_DATA_TYPE_UBIGINT:
return &UBigintNull;
default:
ASSERT(0);
return NULL;
}
}
// Get the data pointer from a column-wised data
static FORCE_INLINE void *tdGetColDataOfRow(SDataCol *pCol, int row) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
......@@ -545,6 +608,47 @@ static FORCE_INLINE void *tdGetMemRowDataOfCol(void *row, int8_t type, int32_t o
return NULL;
}
// RawRow payload structure:
// |<---------- header ------------->|<---- body: column data tuple ---->|
// |SMemRowType| dataLen | nCols | colId | colType | value |...|...|
// +-----------+----------+----------+---------------------------------->|
// | uint8_t | uint16_t | uint16_t | int16_t | uint8_t | ??? |...|...|
// +-----------+----------+----------+---------------------------------->|
#define PAYLOAD_NCOLS_LEN sizeof(uint16_t)
#define PAYLOAD_NCOLS_OFFSET (sizeof(uint8_t) + sizeof(TDRowLenT))
#define PAYLOAD_HEADER_LEN (PAYLOAD_NCOLS_OFFSET + PAYLOAD_NCOLS_LEN)
#define PAYLOAD_ID_LEN sizeof(int16_t)
#define PAYLOAD_ID_TYPE_LEN (sizeof(int16_t) + sizeof(uint8_t))
#define payloadBody(r) POINTER_SHIFT(r, PAYLOAD_HEADER_LEN)
#define payloadType(r) (*(uint8_t *)(r))
#define payloadSetType(r, t) (payloadType(r) = (t))
#define payloadTLen(r) (*(TDRowLenT *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE)) // including total header
#define payloadSetTLen(r, l) (payloadTLen(r) = (l))
#define payloadNCols(r) (*(TDRowLenT *)POINTER_SHIFT(r, PAYLOAD_NCOLS_OFFSET))
#define payloadSetNCols(r, n) (payloadNCols(r) = (n))
#define payloadColId(r) (*(int16_t *)(r))
#define payloadColType(r) (*(uint8_t *)POINTER_SHIFT(r, PAYLOAD_ID_LEN))
#define payloadColValue(r) POINTER_SHIFT(r, PAYLOAD_ID_TYPE_LEN)
#define payloadColSetId(r, i) (payloadColId(r) = (i))
#define payloadColSetType(r, t) (payloadColType(r) = (t))
#define payloadKeyAddr(r) POINTER_SHIFT(r, PAYLOAD_HEADER_LEN + PAYLOAD_ID_TYPE_LEN)
#define payloadTKey(r) (*(TKEY *)(payloadKeyAddr(r)))
#define payloadKey(r) tdGetKey(payloadTKey(r))
static FORCE_INLINE char *skipToNextEles(char *p) {
uint8_t colType = payloadColType(p);
if (IS_VAR_DATA_TYPE(colType)) {
return POINTER_SHIFT(p, PAYLOAD_ID_TYPE_LEN + varDataTLen(payloadColValue(p)));
} else {
return POINTER_SHIFT(p, PAYLOAD_ID_TYPE_LEN + TYPE_BYTES[colType]);
}
}
#ifdef __cplusplus
}
#endif
......
......@@ -18,6 +18,21 @@
#include "tcoding.h"
#include "wchar.h"
const uint8_t BoolNull = TSDB_DATA_BOOL_NULL;
const uint8_t TinyintNull = TSDB_DATA_TINYINT_NULL;
const uint16_t SmallintNull = TSDB_DATA_SMALLINT_NULL;
const uint32_t IntNull = TSDB_DATA_INT_NULL;
const uint64_t BigintNull = TSDB_DATA_BIGINT_NULL;
const uint64_t TimestampNull = TSDB_DATA_BIGINT_NULL;
const uint8_t UTinyintNull = TSDB_DATA_UTINYINT_NULL;
const uint16_t USmallintNull = TSDB_DATA_USMALLINT_NULL;
const uint32_t UIntNull = TSDB_DATA_UINT_NULL;
const uint64_t UBigintNull = TSDB_DATA_UBIGINT_NULL;
const uint32_t FloatNull = TSDB_DATA_FLOAT_NULL;
const uint64_t DoubleNull = TSDB_DATA_DOUBLE_NULL;
const SBinaryNullT BinaryNull = {1, TSDB_DATA_BINARY_NULL};
const SNCharNullT NcharNull = {4, TSDB_DATA_NCHAR_NULL};
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows);
......
......@@ -502,7 +502,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
memcpy((char *)val + sizeof(VarDataLenT), buf, len);
varDataLen(val) = len;
}
tdAppendColVal(dataRow, val, c->type, c->bytes, c->offset);
tdAppendColVal(dataRow, val, c->type, c->offset);
}
pBlk->dataLen = htonl(memRowDataTLen(trow));
pBlk->schemaLen = 0;
......
......@@ -720,7 +720,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
// OK,let's load row from backward to get not-null column
for (int32_t rowId = pBlock->numOfRows - 1; rowId >= 0; rowId--) {
SDataCol *pDataCol = pReadh->pDCols[0]->cols + i;
tdAppendColVal(memRowDataBody(row), tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset);
tdAppendColVal(memRowDataBody(row), tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->offset);
//SDataCol *pDataCol = readh.pDCols[0]->cols + j;
void *value = tdGetRowDataOfCol(memRowDataBody(row), (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset);
if (isNull(value, pCol->type)) {
......@@ -742,7 +742,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
// save row ts(in column 0)
pDataCol = pReadh->pDCols[0]->cols + 0;
pCol = schemaColAt(pSchema, 0);
tdAppendColVal(memRowDataBody(row), tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset);
tdAppendColVal(memRowDataBody(row), tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->offset);
pLastCol->ts = memRowKey(row);
pTable->restoreColumnNum += 1;
......@@ -790,7 +790,7 @@ static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh,
STColumn *pCol = schemaColAt(pSchema, icol);
SDataCol *pDataCol = pReadh->pDCols[0]->cols + icol;
tdAppendColVal(memRowDataBody(pTable->lastRow), tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type,
pCol->bytes, pCol->offset);
pCol->offset);
}
return 0;
......
......@@ -55,10 +55,10 @@ static int insertData(SInsertInfo *pInfo) {
for (int j = 0; j < schemaNCols(pInfo->pSchema); j++) {
STColumn *pTCol = schemaColAt(pInfo->pSchema, j);
if (j == 0) { // Just for timestamp
tdAppendColVal(row, (void *)(&start_time), pTCol->type, pTCol->bytes, pTCol->offset);
tdAppendColVal(row, (void *)(&start_time), pTCol->type, pTCol->offset);
} else { // For int
int val = 10;
tdAppendColVal(row, (void *)(&val), pTCol->type, pTCol->bytes, pTCol->offset);
tdAppendColVal(row, (void *)(&val), pTCol->type, pTCol->offset);
}
}
pBlock->dataLen += dataRowLen(row);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册