From 47bd13a7b162e2797da6aa815dbf80b51f01e46f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 27 Mar 2023 13:25:06 +0000 Subject: [PATCH] add backend --- source/libs/executor/src/executorimpl.c | 14 ------ source/libs/stream/src/streamStateRocksdb.c | 55 +++++++++++---------- 2 files changed, 28 insertions(+), 41 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 67e68160d7..5c0ca4d31a 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2769,32 +2769,24 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat SResultRow* pRow = (SResultRow*)pVal; doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset); // no results, continue to check the next one - qWarn("indx 1"); if (pRow->numOfRows == 0) { pGroupResInfo->index += 1; - qWarn("indx 2"); releaseOutputBuf(pState, pKey, pRow); continue; } - qWarn("indx 3"); if (pBlock->info.id.groupId == 0) { pBlock->info.id.groupId = pKey->groupId; void* tbname = NULL; - qWarn("indx 4"); if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) { - qWarn("indx 5"); pBlock->info.parTbName[0] = 0; } else { - qWarn("indx 6"); memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); } - qWarn("indx 7"); streamFreeVal(tbname); } else { // current value belongs to different group, it can't be packed into one datablock if (pBlock->info.id.groupId != pKey->groupId) { releaseOutputBuf(pState, pKey, pRow); - qWarn("indx 8"); break; } } @@ -2804,36 +2796,30 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat releaseOutputBuf(pState, pKey, pRow); break; } - qWarn("indx 10"); pGroupResInfo->index += 1; for (int32_t j = 0; j < numOfExprs; ++j) { int32_t slotId = pExprInfo[j].base.resSchema.slotId; - qWarn("indx 10"); pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); SResultRowEntryInfo* pEnryInfo = pCtx[j].resultInfo; qWarn("initd:%d, complete:%d, null:%d, res:%d", pEnryInfo->initialized, pEnryInfo->complete, pEnryInfo->isNullRes, pEnryInfo->numOfRes); if (pCtx[j].fpSet.finalize) { - qWarn("indx 14"); int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); if (TAOS_FAILED(code1)) { qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1)); T_LONG_JMP(pTaskInfo->env, code1); } } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { - qWarn("indx 11"); // do nothing, todo refactor } else { // expand the result into multiple rows. E.g., _wstart, top(k, 20) // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows. SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); - qWarn("indx 12"); for (int32_t k = 0; k < pRow->numOfRows; ++k) { colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes); - qWarn("indx 13"); } } } diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 2e4157ad29..8e6557254d 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -301,6 +301,7 @@ int streamInitBackend(SStreamState* pState, char* path) { // create the DB if it's not already present rocksdb_options_set_create_if_missing(opts, 1); rocksdb_options_set_create_missing_column_families(opts, 1); + rocksdb_options_set_write_buffer_size(opts, 64 << 20); char* err = NULL; int cfLen = sizeof(cfName) / sizeof(cfName[0]); @@ -423,10 +424,10 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa rocksdb_put_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), (const char*)value, (size_t)vLen, &err); \ if (err != NULL) { \ taosMemoryFree(err); \ - qWarn("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ + qDebug("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ code = -1; \ } else { \ - qDebug("streamState str:%s succ to write to %s", toString, funcname); \ + qDebug("streamState str:%s succ to write to %s, valLen:%d", toString, funcname, vLen); \ } \ } while (0); @@ -457,37 +458,37 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa } \ if (err != NULL) { \ taosMemoryFree(err); \ - qWarn("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \ + qDebug("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \ code = -1; \ } else { \ if (code == 0) qDebug("streamState str: %s succ to read from %s", toString, funcname); \ } \ } while (0); -#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \ - do { \ - code = 0; \ - char buf[128] = {0}; \ - char* err = NULL; \ - int i = streamGetInit(funcname); \ - if (i < 0) { \ - qWarn("streamState failed to get cf name: %s", funcname); \ - return -1; \ - } \ - char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ - ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ - rocksdb_t* db = pState->pTdbState->rocksdb; \ - rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ - rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), &err); \ - if (err != NULL) { \ - qWarn("streamState str: %s failed to del from %s, err: %s", toString, funcname, err); \ - taosMemoryFree(err); \ - code = -1; \ - } else { \ - qDebug("streamState str: %s succ to del from %s", toString, funcname); \ - } \ +#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \ + do { \ + code = 0; \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamGetInit(funcname); \ + if (i < 0) { \ + qWarn("streamState failed to get cf name: %s", funcname); \ + return -1; \ + } \ + char toString[128] = {0}; \ + if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ + rocksdb_t* db = pState->pTdbState->rocksdb; \ + rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ + rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), &err); \ + if (err != NULL) { \ + qDebug("streamState str: %s failed to del from %s, err: %s", toString, funcname, err); \ + taosMemoryFree(err); \ + code = -1; \ + } else { \ + qDebug("streamState str: %s succ to del from %s", toString, funcname); \ + } \ } while (0); int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) { -- GitLab