提交 47bd13a7 编写于 作者: dengyihao's avatar dengyihao

add backend

上级 5a8c46b7
...@@ -2769,32 +2769,24 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat ...@@ -2769,32 +2769,24 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat
SResultRow* pRow = (SResultRow*)pVal; SResultRow* pRow = (SResultRow*)pVal;
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset); doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
// no results, continue to check the next one // no results, continue to check the next one
qWarn("indx 1");
if (pRow->numOfRows == 0) { if (pRow->numOfRows == 0) {
pGroupResInfo->index += 1; pGroupResInfo->index += 1;
qWarn("indx 2");
releaseOutputBuf(pState, pKey, pRow); releaseOutputBuf(pState, pKey, pRow);
continue; continue;
} }
qWarn("indx 3");
if (pBlock->info.id.groupId == 0) { if (pBlock->info.id.groupId == 0) {
pBlock->info.id.groupId = pKey->groupId; pBlock->info.id.groupId = pKey->groupId;
void* tbname = NULL; void* tbname = NULL;
qWarn("indx 4");
if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) { if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
qWarn("indx 5");
pBlock->info.parTbName[0] = 0; pBlock->info.parTbName[0] = 0;
} else { } else {
qWarn("indx 6");
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
} }
qWarn("indx 7");
streamFreeVal(tbname); streamFreeVal(tbname);
} else { } else {
// current value belongs to different group, it can't be packed into one datablock // current value belongs to different group, it can't be packed into one datablock
if (pBlock->info.id.groupId != pKey->groupId) { if (pBlock->info.id.groupId != pKey->groupId) {
releaseOutputBuf(pState, pKey, pRow); releaseOutputBuf(pState, pKey, pRow);
qWarn("indx 8");
break; break;
} }
} }
...@@ -2804,36 +2796,30 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat ...@@ -2804,36 +2796,30 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat
releaseOutputBuf(pState, pKey, pRow); releaseOutputBuf(pState, pKey, pRow);
break; break;
} }
qWarn("indx 10");
pGroupResInfo->index += 1; pGroupResInfo->index += 1;
for (int32_t j = 0; j < numOfExprs; ++j) { for (int32_t j = 0; j < numOfExprs; ++j) {
int32_t slotId = pExprInfo[j].base.resSchema.slotId; int32_t slotId = pExprInfo[j].base.resSchema.slotId;
qWarn("indx 10");
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
SResultRowEntryInfo* pEnryInfo = pCtx[j].resultInfo; SResultRowEntryInfo* pEnryInfo = pCtx[j].resultInfo;
qWarn("initd:%d, complete:%d, null:%d, res:%d", pEnryInfo->initialized, pEnryInfo->complete, pEnryInfo->isNullRes, qWarn("initd:%d, complete:%d, null:%d, res:%d", pEnryInfo->initialized, pEnryInfo->complete, pEnryInfo->isNullRes,
pEnryInfo->numOfRes); pEnryInfo->numOfRes);
if (pCtx[j].fpSet.finalize) { if (pCtx[j].fpSet.finalize) {
qWarn("indx 14");
int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (TAOS_FAILED(code1)) { if (TAOS_FAILED(code1)) {
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1)); qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
T_LONG_JMP(pTaskInfo->env, code1); T_LONG_JMP(pTaskInfo->env, code1);
} }
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
qWarn("indx 11");
// do nothing, todo refactor // do nothing, todo refactor
} else { } else {
// expand the result into multiple rows. E.g., _wstart, top(k, 20) // expand the result into multiple rows. E.g., _wstart, top(k, 20)
// the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows. // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
qWarn("indx 12");
for (int32_t k = 0; k < pRow->numOfRows; ++k) { for (int32_t k = 0; k < pRow->numOfRows; ++k) {
colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes); colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
qWarn("indx 13");
} }
} }
} }
......
...@@ -301,6 +301,7 @@ int streamInitBackend(SStreamState* pState, char* path) { ...@@ -301,6 +301,7 @@ int streamInitBackend(SStreamState* pState, char* path) {
// create the DB if it's not already present // create the DB if it's not already present
rocksdb_options_set_create_if_missing(opts, 1); rocksdb_options_set_create_if_missing(opts, 1);
rocksdb_options_set_create_missing_column_families(opts, 1); rocksdb_options_set_create_missing_column_families(opts, 1);
rocksdb_options_set_write_buffer_size(opts, 64 << 20);
char* err = NULL; char* err = NULL;
int cfLen = sizeof(cfName) / sizeof(cfName[0]); int cfLen = sizeof(cfName) / sizeof(cfName[0]);
...@@ -423,10 +424,10 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa ...@@ -423,10 +424,10 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), (const char*)value, (size_t)vLen, &err); \ rocksdb_put_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), (const char*)value, (size_t)vLen, &err); \
if (err != NULL) { \ if (err != NULL) { \
taosMemoryFree(err); \ taosMemoryFree(err); \
qWarn("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ qDebug("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
code = -1; \ code = -1; \
} else { \ } else { \
qDebug("streamState str:%s succ to write to %s", toString, funcname); \ qDebug("streamState str:%s succ to write to %s, valLen:%d", toString, funcname, vLen); \
} \ } \
} while (0); } while (0);
...@@ -457,37 +458,37 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa ...@@ -457,37 +458,37 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
} \ } \
if (err != NULL) { \ if (err != NULL) { \
taosMemoryFree(err); \ taosMemoryFree(err); \
qWarn("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \ qDebug("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \
code = -1; \ code = -1; \
} else { \ } else { \
if (code == 0) qDebug("streamState str: %s succ to read from %s", toString, funcname); \ if (code == 0) qDebug("streamState str: %s succ to read from %s", toString, funcname); \
} \ } \
} while (0); } while (0);
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \ #define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \
do { \ do { \
code = 0; \ code = 0; \
char buf[128] = {0}; \ char buf[128] = {0}; \
char* err = NULL; \ char* err = NULL; \
int i = streamGetInit(funcname); \ int i = streamGetInit(funcname); \
if (i < 0) { \ if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \ qWarn("streamState failed to get cf name: %s", funcname); \
return -1; \ return -1; \
} \ } \
char toString[128] = {0}; \ char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
ginitDict[i].enFunc((void*)key, buf); \ ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
rocksdb_t* db = pState->pTdbState->rocksdb; \ rocksdb_t* db = pState->pTdbState->rocksdb; \
rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \
rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), &err); \ rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), &err); \
if (err != NULL) { \ if (err != NULL) { \
qWarn("streamState str: %s failed to del from %s, err: %s", toString, funcname, err); \ qDebug("streamState str: %s failed to del from %s, err: %s", toString, funcname, err); \
taosMemoryFree(err); \ taosMemoryFree(err); \
code = -1; \ code = -1; \
} else { \ } else { \
qDebug("streamState str: %s succ to del from %s", toString, funcname); \ qDebug("streamState str: %s succ to del from %s", toString, funcname); \
} \ } \
} while (0); } while (0);
int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) { int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册