提交 fd76fdb9 编写于 作者: wmmhello's avatar wmmhello

opti:taosx for write_raw_block

上级 259c2043
...@@ -115,7 +115,7 @@ int32_t smlBindData(SQuery* handle, bool dataFormat, SArray* tags, SArray* colsS ...@@ -115,7 +115,7 @@ int32_t smlBindData(SQuery* handle, bool dataFormat, SArray* tags, SArray* colsS
char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen); char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen);
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash); int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash);
int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, SRetrieveTableRsp* pRsp, SVCreateTbReq* pCreateTb); int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD *fields, int numFields);
int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray); int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray);
SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap); SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap);
......
...@@ -1073,7 +1073,7 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -1073,7 +1073,7 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
char sql[256] = {0}; char sql[256] = {0};
snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName, snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
req.tsColName, req.skey, req.tsColName, req.ekey); req.tsColName, req.skey, req.tsColName, req.ekey);
printf("delete sql:%s\n", sql); uDebug("delete sql:%s\n", sql);
TAOS_RES* res = taos_query(taos, sql); TAOS_RES* res = taos_query(taos, sql);
SRequestObj* pRequest = (SRequestObj*)res; SRequestObj* pRequest = (SRequestObj*)res;
...@@ -1205,7 +1205,7 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch ...@@ -1205,7 +1205,7 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
STableMeta* pTableMeta = NULL; STableMeta* pTableMeta = NULL;
SQuery* pQuery = NULL; SQuery* pQuery = NULL;
SSubmitReq* subReq = NULL; SHashObj* pVgHash = NULL;
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0); SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
if (!pRequest) { if (!pRequest) {
...@@ -1250,148 +1250,25 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch ...@@ -1250,148 +1250,25 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbname); uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbname);
goto end; goto end;
} }
uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid); pQuery = smlInitHandle();
uint64_t uid = pTableMeta->uid; if(pQuery == NULL){
int32_t numOfCols = pTableMeta->tableInfo.numOfColumns;
uint16_t fLen = 0;
int32_t rowSize = 0;
int16_t nVar = 0;
for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
SSchema* schema = pTableMeta->schema + i;
fLen += TYPE_BYTES[schema->type];
rowSize += schema->bytes;
if (IS_VAR_DATA_TYPE(schema->type)) {
nVar++;
}
}
fLen -= sizeof(TSKEY);
int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
(int32_t)TD_BITMAP_BYTES(numOfCols - 1);
int32_t schemaLen = 0;
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
int32_t totalLen = sizeof(SSubmitReq) + submitLen;
subReq = taosMemoryCalloc(1, totalLen);
SSubmitBlk* blk = POINTER_SHIFT(subReq, sizeof(SSubmitReq));
void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
SRowBuilder rb = {0};
tdSRowInit(&rb, pTableMeta->sversion);
tdSRowSetTpInfo(&rb, numOfCols, fLen);
int32_t dataLen = 0;
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length |
char* pStart = pData + getVersion1BlockMetaSize(pData, numFields);
int32_t* colLength = (int32_t*)pStart;
pStart += sizeof(int32_t) * numFields;
SResultColumn* pCol = taosMemoryCalloc(numFields, sizeof(SResultColumn));
for (int32_t i = 0; i < numFields; ++i) {
if (IS_VAR_DATA_TYPE(fields[i].type)) {
pCol[i].offset = (int32_t*)pStart;
pStart += rows * sizeof(int32_t);
} else {
pCol[i].nullbitmap = pStart;
pStart += BitmapLen(rows);
}
pCol[i].pData = pStart;
pStart += colLength[i];
}
SHashObj* schemaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
for (int i = 0; i < numFields; i++) {
TAOS_FIELD* schema = &fields[i];
taosHashPut(schemaHash, schema->name, strlen(schema->name), &i, sizeof(int32_t));
}
for (int32_t j = 0; j < rows; j++) {
tdSRowResetBuf(&rb, rowData);
int32_t offset = 0;
for (int32_t k = 0; k < numOfCols; k++) {
const SSchema* pColumn = &pTableMeta->schema[k];
int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name));
if (!index) { // add none
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, offset, k);
}else{
if (IS_VAR_DATA_TYPE(pColumn->type)) {
if (pCol[*index].offset[j] != -1) {
char* data = pCol[*index].pData + pCol[*index].offset[j];
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
} else {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
}
} else {
if (!colDataIsNull_f(pCol[*index].nullbitmap, j)) {
char* data = pCol[*index].pData + pColumn->bytes * j;
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
} else {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
}
}
}
if (pColumn->colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
offset += TYPE_BYTES[pColumn->type];
}
}
tdSRowEnd(&rb);
int32_t rowLen = TD_ROW_LEN(rowData);
rowData = POINTER_SHIFT(rowData, rowLen);
dataLen += rowLen;
}
taosHashCleanup(schemaHash);
taosMemoryFree(pCol);
blk->uid = htobe64(uid);
blk->suid = htobe64(suid);
blk->sversion = htonl(pTableMeta->sversion);
blk->schemaLen = htonl(schemaLen);
blk->numOfRows = htonl(rows);
blk->dataLen = htonl(dataLen);
subReq->length = sizeof(SSubmitReq) + sizeof(SSubmitBlk) + schemaLen + dataLen;
subReq->numOfBlocks = 1;
pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
if (NULL == pQuery) {
uError("create SQuery error");
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto end; goto end;
} }
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
pQuery->haveResultSet = false; taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData));
pQuery->msgType = TDMT_VND_SUBMIT;
pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields);
if (NULL == pQuery->pRoot) { if (code != TSDB_CODE_SUCCESS) {
uError("create pQuery->pRoot error"); uError("WriteRaw:rawBlockBindData failed");
code = TSDB_CODE_OUT_OF_MEMORY;
goto end; goto end;
} }
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
nodeStmt->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); code = smlBuildOutput(pQuery, pVgHash);
if (NULL == dst) { if (code != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_OUT_OF_MEMORY; uError("smlBuildOutput failed");
goto end; return code;
} }
dst->vg = vgData;
dst->numOfTables = subReq->numOfBlocks;
dst->size = subReq->length;
dst->pData = (char*)subReq;
subReq->header.vgId = htonl(dst->vg.vgId);
subReq->version = htonl(1);
subReq->header.contLen = htonl(subReq->length);
subReq->length = htonl(subReq->length);
subReq->numOfBlocks = htonl(subReq->numOfBlocks);
subReq = NULL; // no need free
taosArrayPush(nodeStmt->pDataBlocks, &dst);
launchQueryImpl(pRequest, pQuery, true, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code; code = pRequest->code;
...@@ -1399,7 +1276,8 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch ...@@ -1399,7 +1276,8 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
end: end:
taosMemoryFreeClear(pTableMeta); taosMemoryFreeClear(pTableMeta);
qDestroyQuery(pQuery); qDestroyQuery(pQuery);
taosMemoryFree(subReq); destroyRequest(pRequest);
taosHashCleanup(pVgHash);
return code; return code;
} }
...@@ -1407,7 +1285,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) ...@@ -1407,7 +1285,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
STableMeta* pTableMeta = NULL; STableMeta* pTableMeta = NULL;
SQuery* pQuery = NULL; SQuery* pQuery = NULL;
SSubmitReq* subReq = NULL; SHashObj* pVgHash = NULL;
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0); SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
if (!pRequest) { if (!pRequest) {
...@@ -1452,137 +1330,25 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) ...@@ -1452,137 +1330,25 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbname); uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbname);
goto end; goto end;
} }
uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid); pQuery = smlInitHandle();
uint64_t uid = pTableMeta->uid; if(pQuery == NULL){
int32_t numOfCols = pTableMeta->tableInfo.numOfColumns;
uint16_t fLen = 0;
int32_t rowSize = 0;
int16_t nVar = 0;
for (int i = 0; i < numOfCols; i++) {
SSchema* schema = pTableMeta->schema + i;
fLen += TYPE_BYTES[schema->type];
rowSize += schema->bytes;
if (IS_VAR_DATA_TYPE(schema->type)) {
nVar++;
}
}
fLen -= sizeof(TSKEY);
int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
(int32_t)TD_BITMAP_BYTES(numOfCols - 1);
int32_t schemaLen = 0;
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
int32_t totalLen = sizeof(SSubmitReq) + submitLen;
subReq = taosMemoryCalloc(1, totalLen);
SSubmitBlk* blk = POINTER_SHIFT(subReq, sizeof(SSubmitReq));
void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
SRowBuilder rb = {0};
tdSRowInit(&rb, pTableMeta->sversion);
tdSRowSetTpInfo(&rb, numOfCols, fLen);
int32_t dataLen = 0;
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length |
char* pStart = pData + getVersion1BlockMetaSize(pData, numOfCols);
int32_t* colLength = (int32_t*)pStart;
pStart += sizeof(int32_t) * numOfCols;
SResultColumn* pCol = taosMemoryCalloc(numOfCols, sizeof(SResultColumn));
for (int32_t i = 0; i < numOfCols; ++i) {
if (IS_VAR_DATA_TYPE(pTableMeta->schema[i].type)) {
pCol[i].offset = (int32_t*)pStart;
pStart += rows * sizeof(int32_t);
} else {
pCol[i].nullbitmap = pStart;
pStart += BitmapLen(rows);
}
pCol[i].pData = pStart;
pStart += colLength[i];
}
for (int32_t j = 0; j < rows; j++) {
tdSRowResetBuf(&rb, rowData);
int32_t offset = 0;
for (int32_t k = 0; k < numOfCols; k++) {
const SSchema* pColumn = &pTableMeta->schema[k];
if (IS_VAR_DATA_TYPE(pColumn->type)) {
if (pCol[k].offset[j] != -1) {
char* data = pCol[k].pData + pCol[k].offset[j];
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
} else {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
}
} else {
if (!colDataIsNull_f(pCol[k].nullbitmap, j)) {
char* data = pCol[k].pData + pColumn->bytes * j;
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
} else {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
}
}
if (pColumn->colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
offset += TYPE_BYTES[pColumn->type];
}
}
tdSRowEnd(&rb);
int32_t rowLen = TD_ROW_LEN(rowData);
rowData = POINTER_SHIFT(rowData, rowLen);
dataLen += rowLen;
}
taosMemoryFree(pCol);
blk->uid = htobe64(uid);
blk->suid = htobe64(suid);
blk->sversion = htonl(pTableMeta->sversion);
blk->schemaLen = htonl(schemaLen);
blk->numOfRows = htonl(rows);
blk->dataLen = htonl(dataLen);
subReq->length = sizeof(SSubmitReq) + sizeof(SSubmitBlk) + schemaLen + dataLen;
subReq->numOfBlocks = 1;
pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
if (NULL == pQuery) {
uError("create SQuery error");
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(subReq);
goto end; goto end;
} }
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
pQuery->haveResultSet = false; taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData));
pQuery->msgType = TDMT_VND_SUBMIT;
pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0);
if (NULL == pQuery->pRoot) { if (code != TSDB_CODE_SUCCESS) {
uError("create pQuery->pRoot error"); uError("WriteRaw:rawBlockBindData failed");
code = TSDB_CODE_OUT_OF_MEMORY;
goto end; goto end;
} }
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
nodeStmt->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); code = smlBuildOutput(pQuery, pVgHash);
if (NULL == dst) { if (code != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_OUT_OF_MEMORY; uError("smlBuildOutput failed");
goto end; return code;
} }
dst->vg = vgData;
dst->numOfTables = subReq->numOfBlocks;
dst->size = subReq->length;
dst->pData = (char*)subReq;
subReq->header.vgId = htonl(dst->vg.vgId);
subReq->version = htonl(1);
subReq->header.contLen = htonl(subReq->length);
subReq->length = htonl(subReq->length);
subReq->numOfBlocks = htonl(subReq->numOfBlocks);
subReq = NULL; // no need free
taosArrayPush(nodeStmt->pDataBlocks, &dst);
launchQueryImpl(pRequest, pQuery, true, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code; code = pRequest->code;
...@@ -1590,7 +1356,8 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) ...@@ -1590,7 +1356,8 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
end: end:
taosMemoryFreeClear(pTableMeta); taosMemoryFreeClear(pTableMeta);
qDestroyQuery(pQuery); qDestroyQuery(pQuery);
taosMemoryFree(subReq); destroyRequest(pRequest);
taosHashCleanup(pVgHash);
return code; return code;
} }
...@@ -1647,7 +1414,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { ...@@ -1647,7 +1414,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
} }
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
printf("raw data block num:%d\n", rspObj.rsp.blockNum); uDebug("raw data block num:%d\n", rspObj.rsp.blockNum);
while (++rspObj.resIter < rspObj.rsp.blockNum) { while (++rspObj.resIter < rspObj.rsp.blockNum) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter); SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter);
if (!rspObj.rsp.withSchema) { if (!rspObj.rsp.withSchema) {
...@@ -1662,7 +1429,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { ...@@ -1662,7 +1429,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
goto end; goto end;
} }
printf("raw data tbname:%s\n", tbName); uDebug("raw data tbname:%s\n", tbName);
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
strcpy(pName.dbname, pRequest->pDb); strcpy(pName.dbname, pRequest->pDb);
strcpy(pName.tname, tbName); strcpy(pName.tname, tbName);
...@@ -1690,7 +1457,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { ...@@ -1690,7 +1457,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
taosHashPut(pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg)); taosHashPut(pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
} }
code = rawBlockBindData(pQuery, pTableMeta, pRetrieve, NULL); code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, NULL, NULL, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw:rawBlockBindData failed"); uError("WriteRaw:rawBlockBindData failed");
goto end; goto end;
...@@ -1770,7 +1537,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) ...@@ -1770,7 +1537,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
} }
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
printf("raw data block num:%d\n", rspObj.rsp.blockNum); uDebug("raw data block num:%d\n", rspObj.rsp.blockNum);
while (++rspObj.resIter < rspObj.rsp.blockNum) { while (++rspObj.resIter < rspObj.rsp.blockNum) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter); SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter);
if (!rspObj.rsp.withSchema) { if (!rspObj.rsp.withSchema) {
...@@ -1785,7 +1552,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) ...@@ -1785,7 +1552,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
goto end; goto end;
} }
printf("raw data tbname:%s\n", tbName); uDebug("raw data tbname:%s\n", tbName);
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
strcpy(pName.dbname, pRequest->pDb); strcpy(pName.dbname, pRequest->pDb);
strcpy(pName.tname, tbName); strcpy(pName.tname, tbName);
...@@ -1837,7 +1604,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) ...@@ -1837,7 +1604,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
taosHashPut(pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg)); taosHashPut(pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
} }
code = rawBlockBindData(pQuery, pTableMeta, pRetrieve, &pCreateReq); code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, &pCreateReq, NULL, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw:rawBlockBindData failed"); uError("WriteRaw:rawBlockBindData failed");
goto end; goto end;
......
...@@ -3484,7 +3484,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, ...@@ -3484,7 +3484,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
tsdbRowMerge(&merge, pNextRow); tsdbRowMerge(&merge, pNextRow);
} }
code = doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader); code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(&current), pDelList, &merge, pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -3494,7 +3494,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, ...@@ -3494,7 +3494,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
return code; return code;
} }
pResRow->type = current.type; pResRow->type = TSDBROW_ROW_FMT;
tsdbRowMergerClear(&merge); tsdbRowMergerClear(&merge);
*freeTSRow = true; *freeTSRow = true;
......
...@@ -1373,7 +1373,52 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, ...@@ -1373,7 +1373,52 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList,
return code; return code;
} }
int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, SRetrieveTableRsp* pRsp, SVCreateTbReq *pCreateTb){ static int bindFileds(SBoundColInfo* pBoundInfo, SSchema* pSchema, TAOS_FIELD *fields, int numFields){
bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool));
if (NULL == pUseCols) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pBoundInfo->numOfBound = 0;
int16_t lastColIdx = -1; // last column found
int32_t code = TSDB_CODE_SUCCESS;
for(int i = 0; i < numFields; i++) {
SToken token;
token.z = fields[i].name;
token.n = strlen(fields[i].name);
int16_t t = lastColIdx + 1;
int16_t index = insFindCol(&token, t, pBoundInfo->numOfCols, pSchema);
if (index < 0 && t > 0) {
index = insFindCol(&token, 0, t, pSchema);
}
if (index < 0) {
uError("can not find column name:%s", token.z);
code = TSDB_CODE_PAR_INVALID_COLUMN;
break;
} else if (pUseCols[index]) {
code = TSDB_CODE_PAR_INVALID_COLUMN;
uError("duplicated column name:%s", token.z);
break;
} else {
lastColIdx = index;
pUseCols[index] = true;
pBoundInfo->pColIndex[pBoundInfo->numOfBound] = index;
++pBoundInfo->numOfBound;
}
}
if (TSDB_CODE_SUCCESS == code && !pUseCols[0]) {
uError("primary timestamp column can not be null:");
code = TSDB_CODE_PAR_INVALID_COLUMN;
}
taosMemoryFree(pUseCols);
return code;
}
int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, void* data, SVCreateTbReq *pCreateTb, TAOS_FIELD *tFields, int numFields){
STableDataCxt* pTableCxt = NULL; STableDataCxt* pTableCxt = NULL;
int ret = insGetTableDataCxt(((SVnodeModifOpStmt *)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid), int ret = insGetTableDataCxt(((SVnodeModifOpStmt *)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid),
pTableMeta, &pCreateTb, &pTableCxt, true); pTableMeta, &pCreateTb, &pTableCxt, true);
...@@ -1381,7 +1426,13 @@ int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, SRetrieveTableRsp* p ...@@ -1381,7 +1426,13 @@ int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, SRetrieveTableRsp* p
uError("insGetTableDataCxt error"); uError("insGetTableDataCxt error");
goto end; goto end;
} }
if (tFields != NULL){
ret = bindFileds(&pTableCxt->boundColsInfo, getTableColumnSchema(pTableMeta), tFields, numFields);
if (ret != TSDB_CODE_SUCCESS) {
uError("bindFileds error");
goto end;
}
}
// no need to bind, because select * get all fields // no need to bind, because select * get all fields
ret = initTableColSubmitData(pTableCxt); ret = initTableColSubmitData(pTableCxt);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
...@@ -1389,7 +1440,7 @@ int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, SRetrieveTableRsp* p ...@@ -1389,7 +1440,7 @@ int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, SRetrieveTableRsp* p
goto end; goto end;
} }
char* p = (char*)pRsp->data; char* p = (char*)data;
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length | // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length |
p += sizeof(int32_t); p += sizeof(int32_t);
p += sizeof(int32_t); p += sizeof(int32_t);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册