提交 5d06474f 编写于 作者: wmmhello's avatar wmmhello

fix:send data batch if consume wal where subscribe db

上级 dcaab41a
...@@ -2439,6 +2439,10 @@ _exit: ...@@ -2439,6 +2439,10 @@ _exit:
int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, int32_t nRows, char *lengthOrbitmap, int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, int32_t nRows, char *lengthOrbitmap,
char *data) { char *data) {
int32_t code = 0; int32_t code = 0;
if(data == NULL){
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0);
goto _exit;
}
if (IS_VAR_DATA_TYPE(type)) { // var-length data type if (IS_VAR_DATA_TYPE(type)) { // var-length data type
for (int32_t i = 0; i < nRows; ++i) { for (int32_t i = 0; i < nRows; ++i) {
......
...@@ -629,6 +629,14 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* ...@@ -629,6 +629,14 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
// process meta // process meta
if (pHead->msgType != TDMT_VND_SUBMIT) { if (pHead->msgType != TDMT_VND_SUBMIT) {
if(totalRows > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead);
return code;
}
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType)); tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer); tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer);
metaRsp.resMsgType = pHead->msgType; metaRsp.resMsgType = pHead->msgType;
...@@ -656,6 +664,8 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* ...@@ -656,6 +664,8 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows) < 0) { if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows) < 0) {
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, vgId, tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, vgId,
pRequest->subKey); pRequest->subKey);
taosMemoryFreeClear(pCkHead);
tDeleteSTaosxRsp(&taosxRsp);
return -1; return -1;
} }
......
...@@ -346,7 +346,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR ...@@ -346,7 +346,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
SSDataBlock* pBlock = taosArrayGet(pBlocks, i); SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock), tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
pTq->pVnode->config.tsdbCfg.precision); pTq->pVnode->config.tsdbCfg.precision);
totalRows += pBlock->info.rows; *totalRows += pBlock->info.rows;
blockDataFreeRes(pBlock); blockDataFreeRes(pBlock);
SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i); SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
taosArrayPush(pRsp->blockSchema, &pSW); taosArrayPush(pRsp->blockSchema, &pSW);
......
...@@ -566,75 +566,14 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, ...@@ -566,75 +566,14 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList,
return code; return code;
} }
static int bindFileds(SBoundColInfo* pBoundInfo, SSchema* pSchema, TAOS_FIELD* fields, int numFields) { static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool));
if (NULL == pUseCols) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pBoundInfo->numOfBound = 0;
int16_t lastColIdx = -1; // last column found
int32_t code = TSDB_CODE_SUCCESS;
for (int i = 0; i < numFields; i++) {
SToken token;
token.z = fields[i].name;
token.n = strlen(fields[i].name);
int16_t t = lastColIdx + 1;
int16_t index = insFindCol(&token, t, pBoundInfo->numOfCols, pSchema);
if (index < 0 && t > 0) {
index = insFindCol(&token, 0, t, pSchema);
}
if (index < 0) {
uError("can not find column name:%s", token.z);
code = TSDB_CODE_PAR_INVALID_COLUMN;
break;
} else if (pUseCols[index]) {
code = TSDB_CODE_PAR_INVALID_COLUMN;
uError("duplicated column name:%s", token.z);
break;
} else {
lastColIdx = index;
pUseCols[index] = true;
pBoundInfo->pColIndex[pBoundInfo->numOfBound] = index;
++pBoundInfo->numOfBound;
}
}
if (TSDB_CODE_SUCCESS == code && !pUseCols[0]) {
uError("primary timestamp column can not be null:");
code = TSDB_CODE_PAR_INVALID_COLUMN;
}
taosMemoryFree(pUseCols);
return code;
}
static bool isSameBindFileds(SBoundColInfo* pBoundInfo, SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
int16_t lastColIdx = -1; // last column found
for (int i = 0; i < numFields; i++) { for (int i = 0; i < numFields; i++) {
SToken token; if(strcmp(pSchema->name, fields[i].name) == 0){
token.z = fields[i].name; return true;
token.n = strlen(fields[i].name);
int16_t t = lastColIdx + 1;
int16_t index = insFindCol(&token, t, pBoundInfo->numOfCols, pSchema);
if (index < 0 && t > 0) {
index = insFindCol(&token, 0, t, pSchema);
}
if (index < 0) {
uError("can not find column name:%s", token.z);
return false;
} else {
lastColIdx = index;
if(pBoundInfo->pColIndex[i] != index){
return false;
}
} }
} }
return true; return false;
} }
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD* tFields, int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD* tFields,
...@@ -648,39 +587,13 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate ...@@ -648,39 +587,13 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
goto end; goto end;
} }
do { if(tmp == NULL){
if (tmp != NULL){
if(!isSameBindFileds(&pTableCxt->boundColsInfo, getTableColumnSchema(pTableMeta), tFields, numFields)){
char* fieldNames = (char*)taosMemoryCalloc(numFields, sizeof(tFields[0].name));
for(int i = 0; i < numFields; i++){
memcpy(fieldNames + i * sizeof(tFields[0].name), tFields[i].name, sizeof(tFields[0].name));
}
ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, fieldNames,
numFields * sizeof(tFields[0].name), pTableMeta, &pCreateTb, &pTableCxt, true);
taosMemoryFree(fieldNames);
if (ret != TSDB_CODE_SUCCESS) {
uError("insGetTableDataCxt inner error");
goto end;
}
}else{
break;
}
}
if (tFields != NULL) {
ret = bindFileds(&pTableCxt->boundColsInfo, getTableColumnSchema(pTableMeta), tFields, numFields);
if (ret != TSDB_CODE_SUCCESS) {
uError("bindFileds error");
goto end;
}
}
// no need to bind, because select * get all fields
ret = initTableColSubmitData(pTableCxt); ret = initTableColSubmitData(pTableCxt);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
uError("initTableColSubmitData error"); uError("initTableColSubmitData error");
goto end; goto end;
} }
}while(0); }
char* p = (char*)data; char* p = (char*)data;
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
...@@ -708,35 +621,38 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate ...@@ -708,35 +621,38 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
SSchema* pSchema = getTableColumnSchema(pTableCxt->pMeta); SSchema* pSchema = getTableColumnSchema(pTableCxt->pMeta);
SBoundColInfo* boundInfo = &pTableCxt->boundColsInfo; SBoundColInfo* boundInfo = &pTableCxt->boundColsInfo;
if (boundInfo->numOfBound != numOfCols) { if (numFields != numOfCols) {
uError("boundInfo->numOfBound:%d != numOfCols:%d", boundInfo->numOfBound, numOfCols); uError("numFields:%d != numOfCols:%d", numFields, numOfCols);
ret = TSDB_CODE_INVALID_PARA; ret = TSDB_CODE_INVALID_PARA;
goto end; goto end;
} }
for (int c = 0; c < boundInfo->numOfBound; ++c) { for (int c = 0; c < boundInfo->numOfBound; ++c) {
SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]]; SSchema* pColSchema = &pSchema[c];
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c); SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
if(findFileds(pColSchema, tFields, numFields)){
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
uError("type or bytes not equal");
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) { int8_t* offset = pStart;
uError("type or bytes not equal"); if (IS_VAR_DATA_TYPE(pColSchema->type)) {
ret = TSDB_CODE_INVALID_PARA; pStart += numOfRows * sizeof(int32_t);
goto end; } else {
} pStart += BitmapLen(numOfRows);
}
int8_t* offset = pStart; char* pData = pStart;
if (IS_VAR_DATA_TYPE(pColSchema->type)) {
pStart += numOfRows * sizeof(int32_t);
} else {
pStart += BitmapLen(numOfRows);
}
char* pData = pStart;
tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData); tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);
fields += sizeof(int8_t) + sizeof(int32_t); fields += sizeof(int8_t) + sizeof(int32_t);
if (needChangeLength) { if (needChangeLength) {
pStart += htonl(colLength[c]); pStart += htonl(colLength[c]);
} else { } else {
pStart += colLength[c]; pStart += colLength[c];
}
}else{
tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, NULL, NULL);
} }
} }
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册