提交 c1252c2d 编写于 作者: B Bomin Zhang

TD-803: use KV to transfer tags

上级 f65bb570
...@@ -891,11 +891,15 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) { ...@@ -891,11 +891,15 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
return tscInvalidSQLErrMsg(pCmd->payload, "keyword TAGS expected", sToken.z); return tscInvalidSQLErrMsg(pCmd->payload, "keyword TAGS expected", sToken.z);
} }
SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
uint32_t ignoreTokenTypes = TK_LP; uint32_t ignoreTokenTypes = TK_LP;
uint32_t numOfIgnoreToken = 1; uint32_t numOfIgnoreToken = 1;
for (int i = 0; i < spd.numOfAssignedCols; ++i) { for (int i = 0; i < spd.numOfAssignedCols; ++i) {
char * tagVal = pTag->data + spd.elems[i].offset; SSchema* pSchema = pTagSchema + spd.elems[i].colIndex;
int16_t colIndex = spd.elems[i].colIndex;
index = 0; index = 0;
sToken = tStrGetToken(sql, &index, true, numOfIgnoreToken, &ignoreTokenTypes); sToken = tStrGetToken(sql, &index, true, numOfIgnoreToken, &ignoreTokenTypes);
...@@ -911,12 +915,21 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) { ...@@ -911,12 +915,21 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
sToken.n -= 2; sToken.n -= 2;
} }
code = tsParseOneColumnData(&pTagSchema[colIndex], &sToken, tagVal, pCmd->payload, &sql, false, tinfo.precision); char tagVal[TSDB_MAX_TAGS_LEN];
code = tsParseOneColumnData(pSchema, &sToken, tagVal, pCmd->payload, &sql, false, tinfo.precision);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal);
} }
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
pTag->dataLen = kvRowLen(row);
memcpy(pTag->data, row, pTag->dataLen);
free(row);
tdDestroyKVRowBuilder(&kvRowBuilder);
index = 0; index = 0;
sToken = tStrGetToken(sql, &index, false, 0, NULL); sToken = tStrGetToken(sql, &index, false, 0, NULL);
sql += index; sql += index;
...@@ -924,29 +937,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) { ...@@ -924,29 +937,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
return tscInvalidSQLErrMsg(pCmd->payload, ") expected", sToken.z); return tscInvalidSQLErrMsg(pCmd->payload, ") expected", sToken.z);
} }
// 2. set the null value for the columns that do not assign values pCmd->payloadLen = sizeof(pTag->name) + sizeof(pTag->dataLen) + pTag->dataLen;
if (spd.numOfAssignedCols < spd.numOfCols) {
char *ptr = pTag->data;
for (int32_t i = 0; i < spd.numOfCols; ++i) {
if (!spd.hasVal[i]) { // current tag column do not have any value to insert, set it to null
if (pTagSchema[i].type == TSDB_DATA_TYPE_BINARY || pTagSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(ptr, pTagSchema[i].type);
} else {
setNull(ptr, pTagSchema[i].type, pTagSchema[i].bytes);
}
}
ptr += pTagSchema[i].bytes;
}
}
// 3. calculate the actual data size of STagData
pCmd->payloadLen = sizeof(pTag->name) + sizeof(pTag->dataLen);
for (int32_t t = 0; t < numOfTags; ++t) {
pTag->dataLen += pTagSchema[t].bytes;
pCmd->payloadLen += pTagSchema[t].bytes;
}
pTag->dataLen = htonl(pTag->dataLen); pTag->dataLen = htonl(pTag->dataLen);
if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) { if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) {
......
...@@ -5623,25 +5623,36 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) { ...@@ -5623,25 +5623,36 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
SSchema* pTagSchema = tscGetTableTagSchema(pStableMeterMetaInfo->pTableMeta); SSchema* pTagSchema = tscGetTableTagSchema(pStableMeterMetaInfo->pTableMeta);
STagData* pTag = &pCreateTable->usingInfo.tagdata; STagData* pTag = &pCreateTable->usingInfo.tagdata;
char* tagVal = pTag->data; SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < pList->nExpr; ++i) { for (int32_t i = 0; i < pList->nExpr; ++i) {
if (pTagSchema[i].type == TSDB_DATA_TYPE_BINARY || pTagSchema[i].type == TSDB_DATA_TYPE_NCHAR) { SSchema* pSchema = pTagSchema + i;
if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) {
// validate the length of binary // validate the length of binary
if (pList->a[i].pVar.nLen + VARSTR_HEADER_SIZE > pTagSchema[i].bytes) { if (pList->a[i].pVar.nLen + VARSTR_HEADER_SIZE > pSchema->bytes) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
} }
} }
ret = tVariantDump(&(pList->a[i].pVar), tagVal, pTagSchema[i].type, true); char tagVal[TSDB_MAX_TAGS_LEN];
ret = tVariantDump(&(pList->a[i].pVar), tagVal, pSchema->type, true);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
} }
tagVal += pTagSchema[i].bytes; tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal);
} }
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
pTag->dataLen = kvRowLen(row);
memcpy(pTag->data, row, pTag->dataLen);
free(row);
tdDestroyKVRowBuilder(&kvRowBuilder);
// table name // table name
if (tscValidateName(&pInfo->pCreateTableInfo->name) != TSDB_CODE_SUCCESS) { if (tscValidateName(&pInfo->pCreateTableInfo->name) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
...@@ -5653,7 +5664,7 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) { ...@@ -5653,7 +5664,7 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
return ret; return ret;
} }
pTag->dataLen = tagVal - pTag->data; //pTag->dataLen = tagVal - pTag->data;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -233,26 +233,10 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) { ...@@ -233,26 +233,10 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) {
if (tsdbTableSetSName(pCfg, pMsg->superTableId, true) < 0) goto _err; if (tsdbTableSetSName(pCfg, pMsg->superTableId, true) < 0) goto _err;
if (tsdbTableSetSuperUid(pCfg, htobe64(pMsg->superTableUid)) < 0) goto _err; if (tsdbTableSetSuperUid(pCfg, htobe64(pMsg->superTableUid)) < 0) goto _err;
// Decode tag values int32_t tagDataLen = htonl(pMsg->tagDataLen);
if (pMsg->tagDataLen) { if (tagDataLen) {
int accBytes = 0;
char *pTagData = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema); char *pTagData = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema);
tsdbTableSetTagValue(pCfg, pTagData, true);
SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
for (int i = numOfCols; i < numOfCols + numOfTags; i++) {
if (tdAddColToKVRow(&kvRowBuilder, htons(pSchema[i].colId), pSchema[i].type, pTagData + accBytes) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
accBytes += htons(pSchema[i].bytes);
}
tsdbTableSetTagValue(pCfg, tdGetKVRowFromBuilder(&kvRowBuilder), false);
tdDestroyKVRowBuilder(&kvRowBuilder);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册