提交 4cd512a2 编写于 作者: S Shengliang Guan

Merge branch 'fix/mnode' into feature/sync-mnode-integration

...@@ -96,6 +96,7 @@ extern char *qtypeStr[]; ...@@ -96,6 +96,7 @@ extern char *qtypeStr[];
#define TSDB_PORT_HTTP 11 #define TSDB_PORT_HTTP 11
#undef TD_DEBUG_PRINT_ROW #undef TD_DEBUG_PRINT_ROW
#undef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -639,6 +639,7 @@ int32_t* taosGetErrno(); ...@@ -639,6 +639,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x264F) #define TSDB_CODE_PAR_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x264F)
#define TSDB_CODE_PAR_NOT_ALLOWED_WIN_QUERY TAOS_DEF_ERROR_CODE(0, 0x2650) #define TSDB_CODE_PAR_NOT_ALLOWED_WIN_QUERY TAOS_DEF_ERROR_CODE(0, 0x2650)
#define TSDB_CODE_PAR_INVALID_DROP_COL TAOS_DEF_ERROR_CODE(0, 0x2651) #define TSDB_CODE_PAR_INVALID_DROP_COL TAOS_DEF_ERROR_CODE(0, 0x2651)
#define TSDB_CODE_PAR_INVALID_COL_JSON TAOS_DEF_ERROR_CODE(0, 0x2652)
//planner //planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700) #define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
......
...@@ -234,6 +234,7 @@ typedef enum ELogicConditionType { ...@@ -234,6 +234,7 @@ typedef enum ELogicConditionType {
#define TSDB_MAX_TAG_CONDITIONS 1024 #define TSDB_MAX_TAG_CONDITIONS 1024
#define TSDB_MAX_JSON_TAG_LEN 16384 #define TSDB_MAX_JSON_TAG_LEN 16384
#define TSDB_MAX_JSON_KEY_LEN 256
#define TSDB_AUTH_LEN 16 #define TSDB_AUTH_LEN 16
#define TSDB_PASSWORD_LEN 32 #define TSDB_PASSWORD_LEN 32
......
...@@ -866,8 +866,7 @@ static char* parseTagDatatoJson(void* p) { ...@@ -866,8 +866,7 @@ static char* parseTagDatatoJson(void* p) {
if (j == 0) { if (j == 0) {
if (*val == TSDB_DATA_TYPE_NULL) { if (*val == TSDB_DATA_TYPE_NULL) {
string = taosMemoryCalloc(1, 8); string = taosMemoryCalloc(1, 8);
sprintf(varDataVal(string), "%s", TSDB_DATA_NULL_STR_L); sprintf(string, "%s", TSDB_DATA_NULL_STR_L);
varDataSetLen(string, strlen(varDataVal(string)));
goto end; goto end;
} }
continue; continue;
...@@ -1003,7 +1002,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int ...@@ -1003,7 +1002,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
length = 0; length = 0;
} }
varDataSetLen(dst, length + CHAR_BYTES * 2); varDataSetLen(dst, length + CHAR_BYTES * 2);
*(char*)(varDataVal(dst), length + CHAR_BYTES) = '\"'; *(char*)POINTER_SHIFT(varDataVal(dst), length + CHAR_BYTES) = '\"';
} else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) { } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
double jsonVd = *(double*)(jsonInnerData); double jsonVd = *(double*)(jsonInnerData);
sprintf(varDataVal(dst), "%.9lf", jsonVd); sprintf(varDataVal(dst), "%.9lf", jsonVd);
......
...@@ -37,6 +37,9 @@ typedef struct SMnodeMgmt { ...@@ -37,6 +37,9 @@ typedef struct SMnodeMgmt {
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
int8_t replica; int8_t replica;
int8_t selfIndex; int8_t selfIndex;
bool stopped;
int32_t refCount;
TdThreadRwlock lock;
} SMnodeMgmt; } SMnodeMgmt;
// mmFile.c // mmFile.c
...@@ -45,6 +48,8 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed); ...@@ -45,6 +48,8 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed);
// mmInt.c // mmInt.c
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg); int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg);
int32_t mmAcquire(SMnodeMgmt *pMgmt);
void mmRelease(SMnodeMgmt *pMgmt);
// mmHandle.c // mmHandle.c
SArray *mmGetMsgHandles(); SArray *mmGetMsgHandles();
......
...@@ -110,6 +110,7 @@ static void mmClose(SMnodeMgmt *pMgmt) { ...@@ -110,6 +110,7 @@ static void mmClose(SMnodeMgmt *pMgmt) {
if (pMgmt->pMnode != NULL) { if (pMgmt->pMnode != NULL) {
mmStopWorker(pMgmt); mmStopWorker(pMgmt);
mndClose(pMgmt->pMnode); mndClose(pMgmt->pMnode);
taosThreadRwlockDestroy(&pMgmt->lock);
pMgmt->pMnode = NULL; pMgmt->pMnode = NULL;
} }
...@@ -142,6 +143,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -142,6 +143,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue; pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue;
pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToSyncQueue; pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToSyncQueue;
pMgmt->msgCb.mgmt = pMgmt; pMgmt->msgCb.mgmt = pMgmt;
taosThreadRwlockInit(&pMgmt->lock, NULL);
bool deployed = false; bool deployed = false;
if (mmReadFile(pMgmt, &deployed) != 0) { if (mmReadFile(pMgmt, &deployed) != 0) {
...@@ -211,3 +213,22 @@ SMgmtFunc mmGetMgmtFunc() { ...@@ -211,3 +213,22 @@ SMgmtFunc mmGetMgmtFunc() {
return mgmtFunc; return mgmtFunc;
} }
int32_t mmAcquire(SMnodeMgmt *pMgmt) {
int32_t code = 0;
taosThreadRwlockRdlock(&pMgmt->lock);
if (pMgmt->stopped) {
code = -1;
} else {
atomic_add_fetch_32(&pMgmt->refCount, 1);
}
taosThreadRwlockUnlock(&pMgmt->lock);
return code;
}
void mmRelease(SMnodeMgmt *pMgmt) {
taosThreadRwlockRdlock(&pMgmt->lock);
atomic_sub_fetch_32(&pMgmt->refCount, 1);
taosThreadRwlockUnlock(&pMgmt->lock);
}
\ No newline at end of file
...@@ -111,7 +111,10 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -111,7 +111,10 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
} }
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); if (mmAcquire(pMgmt) != 0) return -1;
int32_t code = mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg);
mmRelease(pMgmt);
return code;
} }
int32_t mmStartWorker(SMnodeMgmt *pMgmt) { int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
...@@ -180,6 +183,11 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { ...@@ -180,6 +183,11 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
} }
void mmStopWorker(SMnodeMgmt *pMgmt) { void mmStopWorker(SMnodeMgmt *pMgmt) {
taosThreadRwlockWrlock(&pMgmt->lock);
pMgmt->stopped = 1;
taosThreadRwlockUnlock(&pMgmt->lock);
while (pMgmt->refCount > 0) taosMsleep(10);
tSingleWorkerCleanup(&pMgmt->monitorWorker); tSingleWorkerCleanup(&pMgmt->monitorWorker);
tSingleWorkerCleanup(&pMgmt->queryWorker); tSingleWorkerCleanup(&pMgmt->queryWorker);
tSingleWorkerCleanup(&pMgmt->readWorker); tSingleWorkerCleanup(&pMgmt->readWorker);
......
...@@ -25,7 +25,6 @@ extern "C" { ...@@ -25,7 +25,6 @@ extern "C" {
int32_t mndInitSync(SMnode *pMnode); int32_t mndInitSync(SMnode *pMnode);
void mndCleanupSync(SMnode *pMnode); void mndCleanupSync(SMnode *pMnode);
bool mndIsMaster(SMnode *pMnode); bool mndIsMaster(SMnode *pMnode);
bool mndIsRestored(SMnode *pMnode);
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw); int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw);
void mndSyncStart(SMnode *pMnode); void mndSyncStart(SMnode *pMnode);
void mndSyncStop(SMnode *pMnode); void mndSyncStop(SMnode *pMnode);
......
...@@ -152,9 +152,10 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { ...@@ -152,9 +152,10 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
} }
void mndSyncStart(SMnode *pMnode) { void mndSyncStart(SMnode *pMnode) {
syncSetMsgCb(pMnode->syncMgmt.sync, &pMnode->msgCb); SSyncMgmt *pMgmt = &pMnode->syncMgmt;
syncStart(pMnode->syncMgmt.sync); syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
mDebug("sync:%" PRId64 " is started", pMnode->syncMgmt.sync); syncStart(pMgmt->sync);
mDebug("sync:%" PRId64 " is started", pMgmt->sync);
} }
void mndSyncStop(SMnode *pMnode) {} void mndSyncStop(SMnode *pMnode) {}
...@@ -162,7 +163,6 @@ void mndSyncStop(SMnode *pMnode) {} ...@@ -162,7 +163,6 @@ void mndSyncStop(SMnode *pMnode) {}
bool mndIsMaster(SMnode *pMnode) { bool mndIsMaster(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->state = syncGetMyRole(pMgmt->sync); pMgmt->state = syncGetMyRole(pMgmt->sync);
return pMgmt->state == TAOS_SYNC_STATE_LEADER;
}
bool mndIsRestored(SMnode *pMnode) { return pMnode->syncMgmt.restored; } return (pMgmt->state == TAOS_SYNC_STATE_LEADER) && (pMnode->syncMgmt.restored);
\ No newline at end of file }
...@@ -1079,6 +1079,8 @@ static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans) { ...@@ -1079,6 +1079,8 @@ static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans) {
} }
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
if (!mndIsMaster(pMnode)) return false;
bool continueExec = true; bool continueExec = true;
int32_t code = mndTransExecuteRedoActions(pMnode, pTrans); int32_t code = mndTransExecuteRedoActions(pMnode, pTrans);
...@@ -1168,6 +1170,8 @@ static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans) { ...@@ -1168,6 +1170,8 @@ static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans) {
} }
static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) { static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
if (!mndIsMaster(pMnode)) return false;
bool continueExec = true; bool continueExec = true;
int32_t code = mndTransExecuteUndoActions(pMnode, pTrans); int32_t code = mndTransExecuteUndoActions(pMnode, pTrans);
......
...@@ -86,7 +86,6 @@ static void *mndThreadFp(void *param) { ...@@ -86,7 +86,6 @@ static void *mndThreadFp(void *param) {
lastTime++; lastTime++;
taosMsleep(100); taosMsleep(100);
if (pMnode->stopped) break; if (pMnode->stopped) break;
if (!mndIsMaster(pMnode) || !mndIsRestored(pMnode)) continue;
if (lastTime % (tsTransPullupInterval * 10) == 0) { if (lastTime % (tsTransPullupInterval * 10) == 0) {
mndPullupTrans(pMnode); mndPullupTrans(pMnode);
...@@ -436,8 +435,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { ...@@ -436,8 +435,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
} }
return ret; return ret;
return 0;
} }
int32_t mndProcessMsg(SRpcMsg *pMsg) { int32_t mndProcessMsg(SRpcMsg *pMsg) {
...@@ -446,7 +443,8 @@ int32_t mndProcessMsg(SRpcMsg *pMsg) { ...@@ -446,7 +443,8 @@ int32_t mndProcessMsg(SRpcMsg *pMsg) {
mTrace("msg:%p, will be processed, type:%s app:%p", pMsg, TMSG_INFO(pMsg->msgType), ahandle); mTrace("msg:%p, will be processed, type:%s app:%p", pMsg, TMSG_INFO(pMsg->msgType), ahandle);
if (IsReq(pMsg)) { if (IsReq(pMsg)) {
if (!mndIsMaster(pMnode) || !mndIsRestored(pMnode)) { if (!mndIsMaster(pMnode) && pMsg->msgType != TDMT_MND_TRANS_TIMER && pMsg->msgType != TDMT_MND_MQ_TIMER &&
pMsg->msgType != TDMT_MND_TELEM_TIMER) {
terrno = TSDB_CODE_APP_NOT_READY; terrno = TSDB_CODE_APP_NOT_READY;
mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle); mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
return -1; return -1;
...@@ -506,7 +504,7 @@ int64_t mndGenerateUid(char *name, int32_t len) { ...@@ -506,7 +504,7 @@ int64_t mndGenerateUid(char *name, int32_t len) {
int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo) { SMonGrantInfo *pGrantInfo) {
if (!mndIsMaster(pMnode) || !mndIsRestored(pMnode)) return -1; if (!mndIsMaster(pMnode)) return -1;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int64_t ms = taosGetTimestampMs(); int64_t ms = taosGetTimestampMs();
......
...@@ -35,4 +35,3 @@ target_include_directories( ...@@ -35,4 +35,3 @@ target_include_directories(
# NAME transTest2 # NAME transTest2
# COMMAND transTest2 # COMMAND transTest2
#) #)
...@@ -607,8 +607,15 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA ...@@ -607,8 +607,15 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
if (iCol == 0) { if (iCol == 0) {
// TODO : need to update tag index // TODO : need to update tag index
} }
ctbEntry.version = version; ctbEntry.version = version;
if(pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON){
ctbEntry.ctbEntry.pTags = taosMemoryMalloc(pAlterTbReq->nTagVal);
if(ctbEntry.ctbEntry.pTags == NULL){
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
memcpy((void*)ctbEntry.ctbEntry.pTags, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal);
}else{
SKVRowBuilder kvrb = {0}; SKVRowBuilder kvrb = {0};
const SKVRow pOldTag = (const SKVRow)ctbEntry.ctbEntry.pTags; const SKVRow pOldTag = (const SKVRow)ctbEntry.ctbEntry.pTags;
SKVRow pNewTag = NULL; SKVRow pNewTag = NULL;
...@@ -632,6 +639,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA ...@@ -632,6 +639,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
ctbEntry.ctbEntry.pTags = tdGetKVRowFromBuilder(&kvrb); ctbEntry.ctbEntry.pTags = tdGetKVRowFromBuilder(&kvrb);
tdDestroyKVRowBuilder(&kvrb); tdDestroyKVRowBuilder(&kvrb);
}
// save to table.db // save to table.db
metaSaveToTbDb(pMeta, &ctbEntry); metaSaveToTbDb(pMeta, &ctbEntry);
...@@ -641,6 +649,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA ...@@ -641,6 +649,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
tDecoderClear(&dc1); tDecoderClear(&dc1);
tDecoderClear(&dc2); tDecoderClear(&dc2);
if (ctbEntry.ctbEntry.pTags) taosMemoryFree((void*)ctbEntry.ctbEntry.pTags);
if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf); if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf);
if (stbEntry.pBuf) tdbFree(stbEntry.pBuf); if (stbEntry.pBuf) tdbFree(stbEntry.pBuf);
tdbTbcClose(pTbDbc); tdbTbcClose(pTbDbc);
......
...@@ -252,6 +252,45 @@ static FORCE_INLINE void tsdbSwapDataCols(SDataCols *pDest, SDataCols *pSrc) { ...@@ -252,6 +252,45 @@ static FORCE_INLINE void tsdbSwapDataCols(SDataCols *pDest, SDataCols *pSrc) {
pSrc->cols = pCols; pSrc->cols = pCols;
} }
static void printTsdbLoadBlkData(SReadH *readh, SDataCols *pDCols, SBlock *pBlock, const char *tag, int32_t ln) {
printf("%s:%d:%" PRIi64 " ================\n", tag, ln, taosGetSelfPthreadId());
if (pBlock) {
SDFile *pHeadf = TSDB_READ_HEAD_FILE(readh);
printf("%s:%d:%" PRIi64 ":%p:%d %s\n", tag, ln, taosGetSelfPthreadId(), pBlock, (int32_t)pBlock->len,
pHeadf->f.aname);
SDFile *pDFile = pBlock->last ? TSDB_READ_LAST_FILE(readh) : TSDB_READ_DATA_FILE(readh);
printf("%s:%d:%" PRIi64 ":%p:%d %s\n", tag, ln, taosGetSelfPthreadId(), pBlock, (int32_t)pBlock->len,
pDFile->f.aname);
}
SDataCol *pDCol = pDCols->cols + 0;
if (TSKEY_MIN == *(int64_t *)pDCol->pData) {
ASSERT(0);
}
int rows = pDCols->numOfRows;
for (int r = 0; r < rows; ++r) {
if (pBlock) {
printf("%s:%d:%" PRIi64 ":%p:%d rows[%d][%d] ", tag, ln, taosGetSelfPthreadId(), pBlock, (int32_t)pBlock->len,
rows, r);
} else {
printf("%s:%d:%" PRIi64 ":%s rows[%d][%d] ", tag, ln, taosGetSelfPthreadId(), "=== merge === ", rows, r);
}
int nDataCols = pDCols->numOfCols;
int j = 0;
SCellVal sVal = {0};
while (j < nDataCols) {
SDataCol *pDataCol = pDCols->cols + j;
tdGetColDataOfRow(&sVal, pDataCol, r, pDCols->bitmapMode);
tdSCellValPrint(&sVal, pDataCol->type);
++j;
}
printf("\n");
}
fflush(stdout);
}
int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) { int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
ASSERT(pBlock->numOfSubBlocks > 0); ASSERT(pBlock->numOfSubBlocks > 0);
STsdbCfg *pCfg = REPO_CFG(pReadh->pRepo); STsdbCfg *pCfg = REPO_CFG(pReadh->pRepo);
...@@ -266,15 +305,23 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) { ...@@ -266,15 +305,23 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
} }
} }
if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[0], TSDB_BITMODE_ONE_BIT) < 0) return -1; if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[0], TSDB_BITMODE_ONE_BIT) < 0) return -1;
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
printTsdbLoadBlkData(pReadh, pReadh->pDCols[0], iBlock, __func__, __LINE__);
#endif
for (int i = 1; i < pBlock->numOfSubBlocks; i++) { for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
iBlock++; iBlock++;
if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1], TSDB_BITMODE_DEFAULT) < 0) return -1; if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1], TSDB_BITMODE_DEFAULT) < 0) return -1;
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
printTsdbLoadBlkData(pReadh, pReadh->pDCols[1], iBlock, __func__, __LINE__);
#endif
// TODO: use the real maxVersion to replace the UINT64_MAX to support Multi-Version // TODO: use the real maxVersion to replace the UINT64_MAX to support Multi-Version
if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL, if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL,
TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0) TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0)
return -1; return -1;
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
printTsdbLoadBlkData(pReadh, pReadh->pDCols[0], iBlock, " === MERGE === ", __LINE__);
#endif
} }
// if ((pBlock->numOfSubBlocks == 1) && (iBlock->hasDupKey)) { // TODO: use this line // if ((pBlock->numOfSubBlocks == 1) && (iBlock->hasDupKey)) { // TODO: use this line
if (pBlock->numOfSubBlocks == 1) { if (pBlock->numOfSubBlocks == 1) {
...@@ -286,6 +333,9 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) { ...@@ -286,6 +333,9 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
} }
tsdbSwapDataCols(pReadh->pDCols[0], pReadh->pDCols[1]); tsdbSwapDataCols(pReadh->pDCols[0], pReadh->pDCols[1]);
ASSERT(pReadh->pDCols[0]->bitmapMode != 0); ASSERT(pReadh->pDCols[0]->bitmapMode != 0);
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
printTsdbLoadBlkData(pReadh, pReadh->pDCols[0], iBlock, " === UPDATE FILTER === ", __LINE__);
#endif
} }
ASSERT(pReadh->pDCols[0]->numOfRows <= pBlock->numOfRows); ASSERT(pReadh->pDCols[0]->numOfRows <= pBlock->numOfRows);
...@@ -295,6 +345,53 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) { ...@@ -295,6 +345,53 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
return 0; return 0;
} }
static void printTsdbLoadBlkDataCols(SReadH *readh, SDataCols *pDCols, SBlock *pBlock, const int16_t *colIds,
int numOfColsIds, const char *tag, int32_t ln) {
printf("%s:%d:%" PRIi64 " ================\n", tag, ln, taosGetSelfPthreadId());
if (pBlock) {
SDFile *pHeadf = TSDB_READ_HEAD_FILE(readh);
printf("%s:%d:%" PRIi64 ":%p:%d %s\n", tag, ln, taosGetSelfPthreadId(), pBlock, (int32_t)pBlock->len,
pHeadf->f.aname);
SDFile *pDFile = pBlock->last ? TSDB_READ_LAST_FILE(readh) : TSDB_READ_DATA_FILE(readh);
printf("%s:%d:%" PRIi64 ":%p:%d %s\n", tag, ln, taosGetSelfPthreadId(), pBlock, (int32_t)pBlock->len,
pDFile->f.aname);
}
int rows = pDCols->numOfRows;
for (int r = 0; r < rows; ++r) {
if (pBlock) {
printf("%s:%d:%" PRIi64 ":%p:%d rows[%d][%d] ", tag, ln, taosGetSelfPthreadId(), pBlock, (int32_t)pBlock->len,
rows, r);
} else {
printf("%s:%d:%" PRIi64 ":%s rows[%d][%d] ", tag, ln, taosGetSelfPthreadId(), "=== merge === ", rows, r);
}
int nDataCols = pDCols->numOfCols;
int j = 0, k = 0;
SCellVal sVal = {0};
while (j < nDataCols) {
if (k >= numOfColsIds) break;
SDataCol *pDataCol = pDCols->cols + j;
int16_t colId1 = pDataCol->colId;
int16_t colId2 = *(colIds + k);
if (colId1 < colId2) {
++j;
} else if (colId1 > colId2) {
++k; // colId2 not exists in SDataCols
printf("NotExists ");
} else {
tdGetColDataOfRow(&sVal, pDataCol, r, pDCols->bitmapMode);
tdSCellValPrint(&sVal, pDataCol->type);
++j;
++k;
}
}
printf("\n");
}
fflush(stdout);
}
// TODO: filter by Multi-Version // TODO: filter by Multi-Version
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds, int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds,
bool mergeBitmap) { bool mergeBitmap) {
...@@ -310,14 +407,25 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, ...@@ -310,14 +407,25 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
} }
} }
if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[0], colIds, numOfColsIds, TSDB_BITMODE_ONE_BIT) < 0) return -1; if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[0], colIds, numOfColsIds, TSDB_BITMODE_ONE_BIT) < 0)
return -1;
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
printTsdbLoadBlkDataCols(pReadh, pReadh->pDCols[0], iBlock, colIds, numOfColsIds, __func__, __LINE__);
#endif
for (int i = 1; i < pBlock->numOfSubBlocks; i++) { for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
iBlock++; iBlock++;
if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds, TSDB_BITMODE_DEFAULT) < 0) return -1; if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds, TSDB_BITMODE_DEFAULT) < 0)
return -1;
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
printTsdbLoadBlkDataCols(pReadh, pReadh->pDCols[1], iBlock, colIds, numOfColsIds, __func__, __LINE__);
#endif
// TODO: use the real maxVersion to replace the UINT64_MAX to support Multi-Version // TODO: use the real maxVersion to replace the UINT64_MAX to support Multi-Version
if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL, if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL,
TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0) TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0)
return -1; return -1;
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
printTsdbLoadBlkDataCols(pReadh, pReadh->pDCols[0], NULL, colIds, numOfColsIds, __func__, __LINE__);
#endif
} }
// if ((pBlock->numOfSubBlocks == 1) && (iBlock->hasDupKey)) { // TODO: use this line // if ((pBlock->numOfSubBlocks == 1) && (iBlock->hasDupKey)) { // TODO: use this line
if (pBlock->numOfSubBlocks == 1) { if (pBlock->numOfSubBlocks == 1) {
...@@ -329,18 +437,23 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, ...@@ -329,18 +437,23 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
} }
tsdbSwapDataCols(pReadh->pDCols[0], pReadh->pDCols[1]); tsdbSwapDataCols(pReadh->pDCols[0], pReadh->pDCols[1]);
ASSERT(pReadh->pDCols[0]->bitmapMode != 0); ASSERT(pReadh->pDCols[0]->bitmapMode != 0);
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
printTsdbLoadBlkDataCols(pReadh, pReadh->pDCols[0], NULL, colIds, numOfColsIds,
" === update filter === ", __LINE__);
#endif
} }
if (mergeBitmap && !tdDataColsIsBitmapI(pReadh->pDCols[0])) { if (mergeBitmap && !tdDataColsIsBitmapI(pReadh->pDCols[0])) {
for (int i = 0; i < numOfColsIds; ++i) { for (int i = 0; i < numOfColsIds; ++i) {
SDataCol *pDataCol = pReadh->pDCols[0]->cols + i; SDataCol *pDataCol = pReadh->pDCols[0]->cols + i;
if (pDataCol->len > 0 && pDataCol->bitmap) { if (pDataCol->len > 0 && pDataCol->bitmap) {
ASSERT(pDataCol->colId != PRIMARYKEY_TIMESTAMP_COL_ID);
ASSERT(pDataCol->pBitmap);
tdMergeBitmap(pDataCol->pBitmap, pReadh->pDCols[0]->numOfRows, pDataCol->pBitmap); tdMergeBitmap(pDataCol->pBitmap, pReadh->pDCols[0]->numOfRows, pDataCol->pBitmap);
tdDataColsSetBitmapI(pReadh->pDCols[0]); tdDataColsSetBitmapI(pReadh->pDCols[0]);
} }
} }
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
printTsdbLoadBlkDataCols(pReadh, pReadh->pDCols[0], NULL, colIds, numOfColsIds, " === merge bitmap === ", __LINE__);
#endif
} }
ASSERT(pReadh->pDCols[0]->numOfRows <= pBlock->numOfRows); ASSERT(pReadh->pDCols[0]->numOfRows <= pBlock->numOfRows);
...@@ -551,9 +664,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat ...@@ -551,9 +664,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
tdResetDataCols(pDataCols); tdResetDataCols(pDataCols);
if (tdIsBitmapModeI(bitmapMode)) { pDataCols->bitmapMode = bitmapMode;
tdDataColsSetBitmapI(pDataCols);
}
if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pBlock->len) < 0) return -1; if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pBlock->len) < 0) return -1;
...@@ -740,9 +851,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols * ...@@ -740,9 +851,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
tdResetDataCols(pDataCols); tdResetDataCols(pDataCols);
if (tdIsBitmapModeI(bitmapMode)) { pDataCols->bitmapMode = bitmapMode;
tdDataColsSetBitmapI(pDataCols);
}
// If only load timestamp column, no need to load SBlockData part // If only load timestamp column, no need to load SBlockData part
if (numOfColIds > 1 && tsdbLoadBlockOffset(pReadh, pBlock) < 0) return -1; if (numOfColIds > 1 && tsdbLoadBlockOffset(pReadh, pBlock) < 0) return -1;
......
...@@ -21,6 +21,7 @@ static int32_t getSchemaBytes(const SSchema* pSchema) { ...@@ -21,6 +21,7 @@ static int32_t getSchemaBytes(const SSchema* pSchema) {
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
return (pSchema->bytes - VARSTR_HEADER_SIZE); return (pSchema->bytes - VARSTR_HEADER_SIZE);
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_JSON:
return (pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; return (pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
default: default:
return pSchema->bytes; return pSchema->bytes;
......
...@@ -34,7 +34,7 @@ if (${BUILD_WITH_INVERTEDINDEX}) ...@@ -34,7 +34,7 @@ if (${BUILD_WITH_INVERTEDINDEX})
endif(${BUILD_WITH_INVERTEDINDEX}) endif(${BUILD_WITH_INVERTEDINDEX})
#if (${BUILD_TEST}) if (${BUILD_TEST})
# add_subdirectory(test) add_subdirectory(test)
#endif(${BUILD_TEST}) endif(${BUILD_TEST})
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
#include "lucene++/Lucene_c.h" #include "lucene++/Lucene_c.h"
#endif #endif
#define INDEX_NUM_OF_THREADS 4 #define INDEX_NUM_OF_THREADS 1
#define INDEX_QUEUE_SIZE 200 #define INDEX_QUEUE_SIZE 200
#define INDEX_DATA_BOOL_NULL 0x02 #define INDEX_DATA_BOOL_NULL 0x02
...@@ -117,7 +117,6 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { ...@@ -117,7 +117,6 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
sIdx->path = tstrdup(path); sIdx->path = tstrdup(path);
taosThreadMutexInit(&sIdx->mtx, NULL); taosThreadMutexInit(&sIdx->mtx, NULL);
tsem_init(&sIdx->sem, 0, 0); tsem_init(&sIdx->sem, 0, 0);
// taosThreadCondInit(&sIdx->finished, NULL);
sIdx->refId = indexAddRef(sIdx); sIdx->refId = indexAddRef(sIdx);
indexAcquireRef(sIdx->refId); indexAcquireRef(sIdx->refId);
...@@ -143,13 +142,13 @@ void indexDestroy(void* handle) { ...@@ -143,13 +142,13 @@ void indexDestroy(void* handle) {
return; return;
} }
void indexClose(SIndex* sIdx) { void indexClose(SIndex* sIdx) {
indexReleaseRef(sIdx->refId);
bool ref = 0; bool ref = 0;
if (sIdx->colObj != NULL) { if (sIdx->colObj != NULL) {
void* iter = taosHashIterate(sIdx->colObj, NULL); void* iter = taosHashIterate(sIdx->colObj, NULL);
while (iter) { while (iter) {
IndexCache** pCache = iter; IndexCache** pCache = iter;
indexCacheForceToMerge((void*)(*pCache)); indexCacheForceToMerge((void*)(*pCache));
indexInfo("%s wait to merge", (*pCache)->colName);
indexWait((void*)(sIdx)); indexWait((void*)(sIdx));
iter = taosHashIterate(sIdx->colObj, iter); iter = taosHashIterate(sIdx->colObj, iter);
indexCacheUnRef(*pCache); indexCacheUnRef(*pCache);
...@@ -157,7 +156,7 @@ void indexClose(SIndex* sIdx) { ...@@ -157,7 +156,7 @@ void indexClose(SIndex* sIdx) {
taosHashCleanup(sIdx->colObj); taosHashCleanup(sIdx->colObj);
sIdx->colObj = NULL; sIdx->colObj = NULL;
} }
// taosMsleep(1000 * 5); indexReleaseRef(sIdx->refId);
indexRemoveRef(sIdx->refId); indexRemoveRef(sIdx->refId);
} }
int64_t indexAddRef(void* p) { int64_t indexAddRef(void* p) {
...@@ -554,8 +553,29 @@ void iterateValueDestroy(IterateValue* value, bool destroy) { ...@@ -554,8 +553,29 @@ void iterateValueDestroy(IterateValue* value, bool destroy) {
taosMemoryFree(value->colVal); taosMemoryFree(value->colVal);
value->colVal = NULL; value->colVal = NULL;
} }
static int64_t indexGetAvaialbleVer(SIndex* sIdx, IndexCache* cache) {
ICacheKey key = {.suid = cache->suid, .colName = cache->colName, .nColName = strlen(cache->colName)};
int64_t ver = CACHE_VERSION(cache);
taosThreadMutexLock(&sIdx->mtx);
TFileReader* trd = tfileCacheGet(((IndexTFile*)sIdx->tindex)->cache, &key);
if (trd != NULL) {
if (ver < trd->header.version) {
ver = trd->header.version + 1;
} else {
ver += 1;
}
indexInfo("header: %d, ver: %" PRId64 "", trd->header.version, ver);
tfileReaderUnRef(trd);
} else {
indexInfo("not found reader base %p", trd);
}
taosThreadMutexUnlock(&sIdx->mtx);
return ver;
}
static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
int32_t version = CACHE_VERSION(cache); int64_t version = indexGetAvaialbleVer(sIdx, cache);
indexInfo("file name version: %" PRId64 "", version);
uint8_t colType = cache->type; uint8_t colType = cache->type;
TFileWriter* tw = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, colType); TFileWriter* tw = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, colType);
...@@ -575,6 +595,7 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { ...@@ -575,6 +595,7 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
if (reader == NULL) { if (reader == NULL) {
return -1; return -1;
} }
indexInfo("success to create tfile, reopen it, %s", reader->ctx->file.buf);
TFileHeader* header = &reader->header; TFileHeader* header = &reader->header;
ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)}; ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
......
...@@ -335,6 +335,9 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in ...@@ -335,6 +335,9 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in
taosThreadCondInit(&cache->finished, NULL); taosThreadCondInit(&cache->finished, NULL);
indexCacheRef(cache); indexCacheRef(cache);
if (idx != NULL) {
indexAcquireRef(idx->refId);
}
return cache; return cache;
} }
void indexCacheDebug(IndexCache* cache) { void indexCacheDebug(IndexCache* cache) {
...@@ -426,13 +429,16 @@ void indexCacheDestroy(void* cache) { ...@@ -426,13 +429,16 @@ void indexCacheDestroy(void* cache) {
if (pCache == NULL) { if (pCache == NULL) {
return; return;
} }
indexMemUnRef(pCache->mem); indexMemUnRef(pCache->mem);
indexMemUnRef(pCache->imm); indexMemUnRef(pCache->imm);
taosMemoryFree(pCache->colName); taosMemoryFree(pCache->colName);
taosThreadMutexDestroy(&pCache->mtx); taosThreadMutexDestroy(&pCache->mtx);
taosThreadCondDestroy(&pCache->finished); taosThreadCondDestroy(&pCache->finished);
if (pCache->index != NULL) {
indexReleaseRef(((SIndex*)pCache->index)->refId);
}
taosMemoryFree(pCache); taosMemoryFree(pCache);
} }
......
...@@ -97,6 +97,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int ...@@ -97,6 +97,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
int64_t file_size; int64_t file_size;
taosStatFile(path, &file_size, NULL); taosStatFile(path, &file_size, NULL);
ctx->file.size = (int)file_size; ctx->file.size = (int)file_size;
} else { } else {
// ctx->file.pFile = open(path, O_RDONLY, S_IRWXU | S_IRWXG | S_IRWXO); // ctx->file.pFile = open(path, O_RDONLY, S_IRWXU | S_IRWXG | S_IRWXO);
ctx->file.pFile = taosOpenFile(path, TD_FILE_READ); ctx->file.pFile = taosOpenFile(path, TD_FILE_READ);
......
/* /*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com> * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
p *
* This program is free software: you can use, redistribute, and/or modify * This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3 * it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation. * or later ("AGPL"), as published by the Free Software Foundation.
...@@ -152,10 +151,13 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) { ...@@ -152,10 +151,13 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
char buf[128] = {0}; char buf[128] = {0};
int32_t sz = indexSerialCacheKey(key, buf); int32_t sz = indexSerialCacheKey(key, buf);
assert(sz < sizeof(buf)); assert(sz < sizeof(buf));
indexInfo("Try to get key: %s", buf);
TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz); TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz);
if (reader == NULL) { if (reader == NULL || *reader == NULL) {
indexInfo("failed to get key: %s", buf);
return NULL; return NULL;
} }
indexInfo("Get key: %s file: %s", buf, (*reader)->ctx->file.buf);
tfileReaderRef(*reader); tfileReaderRef(*reader);
return *reader; return *reader;
...@@ -165,9 +167,10 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) { ...@@ -165,9 +167,10 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
int32_t sz = indexSerialCacheKey(key, buf); int32_t sz = indexSerialCacheKey(key, buf);
// remove last version index reader // remove last version index reader
TFileReader** p = taosHashGet(tcache->tableCache, buf, sz); TFileReader** p = taosHashGet(tcache->tableCache, buf, sz);
if (p != NULL) { if (p != NULL && *p != NULL) {
TFileReader* oldReader = *p; TFileReader* oldReader = *p;
taosHashRemove(tcache->tableCache, buf, sz); taosHashRemove(tcache->tableCache, buf, sz);
indexInfo("found %s, remove file %s", buf, oldReader->ctx->file.buf);
oldReader->remove = true; oldReader->remove = true;
tfileReaderUnRef(oldReader); tfileReaderUnRef(oldReader);
} }
...@@ -180,7 +183,6 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) { ...@@ -180,7 +183,6 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
if (reader == NULL) { if (reader == NULL) {
return NULL; return NULL;
} }
reader->ctx = ctx; reader->ctx = ctx;
if (0 != tfileReaderVerify(reader)) { if (0 != tfileReaderVerify(reader)) {
...@@ -202,6 +204,7 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) { ...@@ -202,6 +204,7 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
tfileReaderDestroy(reader); tfileReaderDestroy(reader);
return NULL; return NULL;
} }
reader->remove = false;
return reader; return reader;
} }
...@@ -536,7 +539,7 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c ...@@ -536,7 +539,7 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c
indexError("failed to open readonly file: %s, reason: %s", fullname, terrstr()); indexError("failed to open readonly file: %s, reason: %s", fullname, terrstr());
return NULL; return NULL;
} }
indexInfo("open read file name:%s, file size: %d", wc->file.buf, wc->file.size); indexTrace("open read file name:%s, file size: %d", wc->file.buf, wc->file.size);
TFileReader* reader = tfileReaderCreate(wc); TFileReader* reader = tfileReaderCreate(wc);
return reader; return reader;
......
...@@ -674,11 +674,14 @@ class IndexObj { ...@@ -674,11 +674,14 @@ class IndexObj {
// opt // opt
numOfWrite = 0; numOfWrite = 0;
numOfRead = 0; numOfRead = 0;
indexInit(); // indexInit();
} }
int Init(const std::string& dir) { int Init(const std::string& dir, bool remove = true) {
if (remove) {
taosRemoveDir(dir.c_str()); taosRemoveDir(dir.c_str());
taosMkDir(dir.c_str()); taosMkDir(dir.c_str());
}
taosMkDir(dir.c_str());
int ret = indexOpen(&opts, dir.c_str(), &idx); int ret = indexOpen(&opts, dir.c_str(), &idx);
if (ret != 0) { if (ret != 0) {
// opt // opt
...@@ -838,7 +841,10 @@ class IndexEnv2 : public ::testing::Test { ...@@ -838,7 +841,10 @@ class IndexEnv2 : public ::testing::Test {
initLog(); initLog();
index = new IndexObj(); index = new IndexObj();
} }
virtual void TearDown() { delete index; } virtual void TearDown() {
// taosMsleep(500);
delete index;
}
IndexObj* index; IndexObj* index;
}; };
TEST_F(IndexEnv2, testIndexOpen) { TEST_F(IndexEnv2, testIndexOpen) {
...@@ -951,6 +957,8 @@ static void single_write_and_search(IndexObj* idx) { ...@@ -951,6 +957,8 @@ static void single_write_and_search(IndexObj* idx) {
target = idx->SearchOne("tag2", "Test"); target = idx->SearchOne("tag2", "Test");
} }
static void multi_write_and_search(IndexObj* idx) { static void multi_write_and_search(IndexObj* idx) {
idx->PutOne("tag1", "Hello");
idx->PutOne("tag2", "Test");
int target = idx->SearchOne("tag1", "Hello"); int target = idx->SearchOne("tag1", "Hello");
target = idx->SearchOne("tag2", "Test"); target = idx->SearchOne("tag2", "Test");
idx->WriteMultiMillonData("tag1", "hello world test", 100 * 100); idx->WriteMultiMillonData("tag1", "hello world test", 100 * 100);
...@@ -992,16 +1000,16 @@ TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) { ...@@ -992,16 +1000,16 @@ TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) {
} }
} }
// TEST_F(IndexEnv2, testIndex_restart) { TEST_F(IndexEnv2, testIndex_restart) {
// std::string path = TD_TMP_DIR_PATH "cache_and_tfile"; std::string path = TD_TMP_DIR_PATH "cache_and_tfile";
// if (index->Init(path) != 0) { if (index->Init(path, false) != 0) {
// } }
// index->SearchOneTarget("tag1", "Hello", 10); index->SearchOneTarget("tag1", "Hello", 10);
// index->SearchOneTarget("tag2", "Test", 10); index->SearchOneTarget("tag2", "Test", 10);
//} }
// TEST_F(IndexEnv2, testIndex_restart1) { // TEST_F(IndexEnv2, testIndex_restart1) {
// std::string path = TD_TMP_DIR_PATH "cache_and_tfile"; // std::string path = TD_TMP_DIR_PATH "cache_and_tfile";
// if (index->Init(path) != 0) { // if (index->Init(path, false) != 0) {
// } // }
// index->ReadMultiMillonData("tag1", "coding"); // index->ReadMultiMillonData("tag1", "coding");
// index->SearchOneTarget("tag1", "Hello", 10); // index->SearchOneTarget("tag1", "Hello", 10);
...@@ -1018,16 +1026,16 @@ TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) { ...@@ -1018,16 +1026,16 @@ TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) {
// std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl; // std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl;
// assert(3 == index->SearchOne("tag1", "Hello")); // assert(3 == index->SearchOne("tag1", "Hello"));
//} //}
// TEST_F(IndexEnv2, testIndexMultiTag) { TEST_F(IndexEnv2, testIndexMultiTag) {
// std::string path = TD_TMP_DIR_PATH "multi_tag"; std::string path = TD_TMP_DIR_PATH "multi_tag";
// if (index->Init(path) != 0) { if (index->Init(path) != 0) {
// } }
// int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
// int32_t num = 1000 * 10000; int32_t num = 100 * 100;
// index->WriteMultiMillonData("tag1", "xxxxxxxxxxxxxxx", num); index->WriteMultiMillonData("tag1", "xxxxxxxxxxxxxxx", num);
// std::cout << "numOfRow: " << num << "\ttime cost:" << taosGetTimestampUs() - st << std::endl; std::cout << "numOfRow: " << num << "\ttime cost:" << taosGetTimestampUs() - st << std::endl;
// // index->WriteMultiMillonData("tag2", "xxxxxxxxxxxxxxxxxxxxxxxxx", 100 * 10000); // index->WriteMultiMillonData("tag2", "xxxxxxxxxxxxxxxxxxxxxxxxx", 100 * 10000);
//} }
TEST_F(IndexEnv2, testLongComVal1) { TEST_F(IndexEnv2, testLongComVal1) {
std::string path = TD_TMP_DIR_PATH "long_colVal"; std::string path = TD_TMP_DIR_PATH "long_colVal";
if (index->Init(path) != 0) { if (index->Init(path) != 0) {
......
...@@ -1119,6 +1119,7 @@ bool nodesIsComparisonOp(const SOperatorNode* pOp) { ...@@ -1119,6 +1119,7 @@ bool nodesIsComparisonOp(const SOperatorNode* pOp) {
bool nodesIsJsonOp(const SOperatorNode* pOp) { bool nodesIsJsonOp(const SOperatorNode* pOp) {
switch (pOp->opType) { switch (pOp->opType) {
case OP_TYPE_JSON_GET_VALUE: case OP_TYPE_JSON_GET_VALUE:
case OP_TYPE_JSON_CONTAINS:
return true; return true;
default: default:
break; break;
......
...@@ -677,7 +677,6 @@ static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SD ...@@ -677,7 +677,6 @@ static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SD
varDataSetLen(pVal->datum.p, len); varDataSetLen(pVal->datum.p, len);
break; break;
} }
case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_DECIMAL: case TSDB_DATA_TYPE_DECIMAL:
case TSDB_DATA_TYPE_BLOB: case TSDB_DATA_TYPE_BLOB:
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal); return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
...@@ -738,8 +737,7 @@ static EDealRes translateUnaryOperator(STranslateContext* pCxt, SOperatorNode* p ...@@ -738,8 +737,7 @@ static EDealRes translateUnaryOperator(STranslateContext* pCxt, SOperatorNode* p
static EDealRes translateArithmeticOperator(STranslateContext* pCxt, SOperatorNode* pOp) { static EDealRes translateArithmeticOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
SDataType ldt = ((SExprNode*)(pOp->pLeft))->resType; SDataType ldt = ((SExprNode*)(pOp->pLeft))->resType;
SDataType rdt = ((SExprNode*)(pOp->pRight))->resType; SDataType rdt = ((SExprNode*)(pOp->pRight))->resType;
if (TSDB_DATA_TYPE_JSON == ldt.type || TSDB_DATA_TYPE_BLOB == ldt.type || TSDB_DATA_TYPE_JSON == rdt.type || if (TSDB_DATA_TYPE_BLOB == ldt.type || TSDB_DATA_TYPE_BLOB == rdt.type) {
TSDB_DATA_TYPE_BLOB == rdt.type) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName); return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
} }
if ((TSDB_DATA_TYPE_TIMESTAMP == ldt.type && TSDB_DATA_TYPE_TIMESTAMP == rdt.type) || if ((TSDB_DATA_TYPE_TIMESTAMP == ldt.type && TSDB_DATA_TYPE_TIMESTAMP == rdt.type) ||
...@@ -764,14 +762,14 @@ static EDealRes translateArithmeticOperator(STranslateContext* pCxt, SOperatorNo ...@@ -764,14 +762,14 @@ static EDealRes translateArithmeticOperator(STranslateContext* pCxt, SOperatorNo
static EDealRes translateComparisonOperator(STranslateContext* pCxt, SOperatorNode* pOp) { static EDealRes translateComparisonOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
SDataType ldt = ((SExprNode*)(pOp->pLeft))->resType; SDataType ldt = ((SExprNode*)(pOp->pLeft))->resType;
SDataType rdt = ((SExprNode*)(pOp->pRight))->resType; SDataType rdt = ((SExprNode*)(pOp->pRight))->resType;
if (TSDB_DATA_TYPE_BLOB == ldt.type || TSDB_DATA_TYPE_JSON == rdt.type || TSDB_DATA_TYPE_BLOB == rdt.type) { if (TSDB_DATA_TYPE_BLOB == ldt.type || TSDB_DATA_TYPE_BLOB == rdt.type) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName); return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
} }
if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) { if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) {
((SExprNode*)pOp->pRight)->resType = ((SExprNode*)pOp->pLeft)->resType; ((SExprNode*)pOp->pRight)->resType = ((SExprNode*)pOp->pLeft)->resType;
} }
if (nodesIsRegularOp(pOp)) { if (nodesIsRegularOp(pOp)) {
if (!IS_STR_DATA_TYPE(((SExprNode*)(pOp->pLeft))->resType.type)) { if (!IS_VAR_DATA_TYPE(((SExprNode*)(pOp->pLeft))->resType.type)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pLeft))->aliasName); return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pLeft))->aliasName);
} }
if (QUERY_NODE_VALUE != nodeType(pOp->pRight) || !IS_STR_DATA_TYPE(((SExprNode*)(pOp->pRight))->resType.type)) { if (QUERY_NODE_VALUE != nodeType(pOp->pRight) || !IS_STR_DATA_TYPE(((SExprNode*)(pOp->pRight))->resType.type)) {
...@@ -2485,6 +2483,9 @@ static int32_t checkTableColsSchema(STranslateContext* pCxt, SHashObj* pHash, SN ...@@ -2485,6 +2483,9 @@ static int32_t checkTableColsSchema(STranslateContext* pCxt, SHashObj* pHash, SN
code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FIRST_COLUMN); code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FIRST_COLUMN);
} }
} }
if (TSDB_CODE_SUCCESS == code && pCol->dataType.type == TSDB_DATA_TYPE_JSON) {
code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COL_JSON);
}
int32_t len = strlen(pCol->colName); int32_t len = strlen(pCol->colName);
if (TSDB_CODE_SUCCESS == code && NULL != taosHashGet(pHash, pCol->colName, len)) { if (TSDB_CODE_SUCCESS == code && NULL != taosHashGet(pHash, pCol->colName, len)) {
code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_DUPLICATED_COLUMN); code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_DUPLICATED_COLUMN);
...@@ -2492,7 +2493,7 @@ static int32_t checkTableColsSchema(STranslateContext* pCxt, SHashObj* pHash, SN ...@@ -2492,7 +2493,7 @@ static int32_t checkTableColsSchema(STranslateContext* pCxt, SHashObj* pHash, SN
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
if ((TSDB_DATA_TYPE_VARCHAR == pCol->dataType.type && calcTypeBytes(pCol->dataType) > TSDB_MAX_BINARY_LEN) || if ((TSDB_DATA_TYPE_VARCHAR == pCol->dataType.type && calcTypeBytes(pCol->dataType) > TSDB_MAX_BINARY_LEN) ||
(TSDB_DATA_TYPE_NCHAR == pCol->dataType.type && calcTypeBytes(pCol->dataType) > TSDB_MAX_NCHAR_LEN)) { (TSDB_DATA_TYPE_NCHAR == pCol->dataType.type && calcTypeBytes(pCol->dataType) > TSDB_MAX_NCHAR_LEN)) {
code = code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN); code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN);
} }
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -4451,11 +4452,38 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS ...@@ -4451,11 +4452,38 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
} }
pReq->isNull = (TSDB_DATA_TYPE_NULL == pStmt->pVal->node.resType.type); pReq->isNull = (TSDB_DATA_TYPE_NULL == pStmt->pVal->node.resType.type);
if(pStmt->pVal->node.resType.type == TSDB_DATA_TYPE_JSON){
SKVRowBuilder kvRowBuilder = {0};
int32_t code = tdInitKVRowBuilder(&kvRowBuilder);
if (TSDB_CODE_SUCCESS != code) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pStmt->pVal->literal && strlen(pStmt->pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
return buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pStmt->pVal->literal);
}
code = parseJsontoTagData(pStmt->pVal->literal, &kvRowBuilder, &pCxt->msgBuf, pSchema->colId);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
if (NULL == row) {
tdDestroyKVRowBuilder(&kvRowBuilder);
return TSDB_CODE_OUT_OF_MEMORY;
}
pReq->nTagVal = kvRowLen(row);
pReq->pTagVal = row;
pStmt->pVal->datum.p = row; // for free
tdDestroyKVRowBuilder(&kvRowBuilder);
}else{
pReq->nTagVal = pStmt->pVal->node.resType.bytes; pReq->nTagVal = pStmt->pVal->node.resType.bytes;
if (TSDB_DATA_TYPE_NCHAR == pStmt->pVal->node.resType.type) { if (TSDB_DATA_TYPE_NCHAR == pStmt->pVal->node.resType.type) {
pReq->nTagVal = pReq->nTagVal * TSDB_NCHAR_SIZE; pReq->nTagVal = pReq->nTagVal * TSDB_NCHAR_SIZE;
} }
pReq->pTagVal = nodesGetValueFromNode(pStmt->pVal); pReq->pTagVal = nodesGetValueFromNode(pStmt->pVal);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -4652,7 +4680,26 @@ static int32_t rewriteAlterTable(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -4652,7 +4680,26 @@ static int32_t rewriteAlterTable(STranslateContext* pCxt, SQuery* pQuery) {
return code; return code;
} }
if (pStmt->dataType.type == TSDB_DATA_TYPE_JSON && pStmt->alterType == TSDB_ALTER_TABLE_ADD_TAG) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_ONE_JSON_TAG);
}
if (pStmt->dataType.type == TSDB_DATA_TYPE_JSON && pStmt->alterType == TSDB_ALTER_TABLE_ADD_COLUMN) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COL_JSON);
}
if (getNumOfTags(pTableMeta) == 1 && pStmt->alterType == TSDB_ALTER_TABLE_DROP_TAG) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE, "can not drop tag if there is only one tag");
}
if (TSDB_SUPER_TABLE == pTableMeta->tableType) { if (TSDB_SUPER_TABLE == pTableMeta->tableType) {
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
if (getNumOfTags(pTableMeta) == 1 && pTagsSchema->type == TSDB_DATA_TYPE_JSON &&
(pStmt->alterType == TSDB_ALTER_TABLE_ADD_TAG ||
pStmt->alterType == TSDB_ALTER_TABLE_DROP_TAG ||
pStmt->alterType == TSDB_ALTER_TABLE_UPDATE_TAG_BYTES)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_ONE_JSON_TAG);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (TSDB_CHILD_TABLE != pTableMeta->tableType && TSDB_NORMAL_TABLE != pTableMeta->tableType) { } else if (TSDB_CHILD_TABLE != pTableMeta->tableType && TSDB_NORMAL_TABLE != pTableMeta->tableType) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE);
......
...@@ -171,6 +171,8 @@ static char* getSyntaxErrFormat(int32_t errCode) { ...@@ -171,6 +171,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "Window query not supported, since the result of subquery not include valid timestamp column"; return "Window query not supported, since the result of subquery not include valid timestamp column";
case TSDB_CODE_PAR_INVALID_DROP_COL: case TSDB_CODE_PAR_INVALID_DROP_COL:
return "No columns can be dropped"; return "No columns can be dropped";
case TSDB_CODE_PAR_INVALID_COL_JSON:
return "Only tag can be json type";
case TSDB_CODE_OUT_OF_MEMORY: case TSDB_CODE_OUT_OF_MEMORY:
return "Out of memory"; return "Out of memory";
default: default:
...@@ -328,7 +330,7 @@ int parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* p ...@@ -328,7 +330,7 @@ int parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* p
// set json NULL data // set json NULL data
uint8_t jsonNULL = TSDB_DATA_TYPE_NULL; uint8_t jsonNULL = TSDB_DATA_TYPE_NULL;
int jsonIndex = startColId + 1; int jsonIndex = startColId + 1;
if (!json || strcasecmp(json, TSDB_DATA_NULL_STR_L) == 0) { if (!json || strtrim((char*)json) == 0 ||strcasecmp(json, TSDB_DATA_NULL_STR_L) == 0) {
tdAddColToKVRow(kvRowBuilder, jsonIndex, &jsonNULL, CHAR_BYTES); tdAddColToKVRow(kvRowBuilder, jsonIndex, &jsonNULL, CHAR_BYTES);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -360,12 +362,12 @@ int parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* p ...@@ -360,12 +362,12 @@ int parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* p
retCode = buildSyntaxErrMsg(pMsgBuf, "json key not validate", jsonKey); retCode = buildSyntaxErrMsg(pMsgBuf, "json key not validate", jsonKey);
goto end; goto end;
} }
// if(strlen(jsonKey) > TSDB_MAX_JSON_KEY_LEN){
// tscError("json key too long error");
// retCode = tscSQLSyntaxErrMsg(errMsg, "json key too long, more than 256", NULL);
// goto end;
// }
size_t keyLen = strlen(jsonKey); size_t keyLen = strlen(jsonKey);
if(keyLen > TSDB_MAX_JSON_KEY_LEN){
qError("json key too long error");
retCode = buildSyntaxErrMsg(pMsgBuf, "json key too long, more than 256", jsonKey);
goto end;
}
if (keyLen == 0 || taosHashGet(keyHash, jsonKey, keyLen) != NULL) { if (keyLen == 0 || taosHashGet(keyHash, jsonKey, keyLen) != NULL) {
continue; continue;
} }
......
...@@ -1089,7 +1089,7 @@ void makeCalculate(void *json, void *key, int32_t rightType, void *rightData, do ...@@ -1089,7 +1089,7 @@ void makeCalculate(void *json, void *key, int32_t rightType, void *rightData, do
}else if(opType == OP_TYPE_ADD || opType == OP_TYPE_SUB || opType == OP_TYPE_MULTI || opType == OP_TYPE_DIV || }else if(opType == OP_TYPE_ADD || opType == OP_TYPE_SUB || opType == OP_TYPE_MULTI || opType == OP_TYPE_DIV ||
opType == OP_TYPE_MOD || opType == OP_TYPE_MINUS){ opType == OP_TYPE_MOD || opType == OP_TYPE_MINUS){
printf("1result:%f,except:%f\n", *((double *)colDataGetData(column, 0)), exceptValue); printf("1result:%f,except:%f\n", *((double *)colDataGetData(column, 0)), exceptValue);
ASSERT_TRUE(abs(*((double *)colDataGetData(column, 0)) - exceptValue) < 1e-15); ASSERT_TRUE(fabs(*((double *)colDataGetData(column, 0)) - exceptValue) < 0.0001);
}else if(opType == OP_TYPE_BIT_AND || opType == OP_TYPE_BIT_OR){ }else if(opType == OP_TYPE_BIT_AND || opType == OP_TYPE_BIT_OR){
printf("2result:%ld,except:%f\n", *((int64_t *)colDataGetData(column, 0)), exceptValue); printf("2result:%ld,except:%f\n", *((int64_t *)colDataGetData(column, 0)), exceptValue);
ASSERT_EQ(*((int64_t *)colDataGetData(column, 0)), exceptValue); ASSERT_EQ(*((int64_t *)colDataGetData(column, 0)), exceptValue);
...@@ -1107,8 +1107,10 @@ void makeCalculate(void *json, void *key, int32_t rightType, void *rightData, do ...@@ -1107,8 +1107,10 @@ void makeCalculate(void *json, void *key, int32_t rightType, void *rightData, do
TEST(columnTest, json_column_arith_op) { TEST(columnTest, json_column_arith_op) {
scltInitLogFile(); scltInitLogFile();
char *rightv= "{\"k1\":4,\"k2\":\"hello\",\"k3\":null,\"k4\":true,\"k5\":5.44}"; char *rightvTmp= "{\"k1\":4,\"k2\":\"hello\",\"k3\":null,\"k4\":true,\"k5\":5.44}";
char rightv[256] = {0};
memcpy(rightv, rightvTmp, strlen(rightvTmp));
SKVRowBuilder kvRowBuilder; SKVRowBuilder kvRowBuilder;
tdInitKVRowBuilder(&kvRowBuilder); tdInitKVRowBuilder(&kvRowBuilder);
parseJsontoTagData(rightv, &kvRowBuilder, NULL, 0); parseJsontoTagData(rightv, &kvRowBuilder, NULL, 0);
...@@ -1189,8 +1191,10 @@ void *prepareNchar(char* rightData){ ...@@ -1189,8 +1191,10 @@ void *prepareNchar(char* rightData){
TEST(columnTest, json_column_logic_op) { TEST(columnTest, json_column_logic_op) {
scltInitLogFile(); scltInitLogFile();
char *rightv= "{\"k1\":4,\"k2\":\"hello\",\"k3\":null,\"k4\":true,\"k5\":5.44,\"k6\":\"6.6hello\"}"; char *rightvTmp= "{\"k1\":4,\"k2\":\"hello\",\"k3\":null,\"k4\":true,\"k5\":5.44,\"k6\":\"6.6hello\"}";
char rightv[256] = {0};
memcpy(rightv, rightvTmp, strlen(rightvTmp));
SKVRowBuilder kvRowBuilder; SKVRowBuilder kvRowBuilder;
tdInitKVRowBuilder(&kvRowBuilder); tdInitKVRowBuilder(&kvRowBuilder);
parseJsontoTagData(rightv, &kvRowBuilder, NULL, 0); parseJsontoTagData(rightv, &kvRowBuilder, NULL, 0);
......
...@@ -264,7 +264,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, i ...@@ -264,7 +264,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, i
SSchJob *schAcquireJob(int64_t refId); SSchJob *schAcquireJob(int64_t refId);
int32_t schReleaseJob(int64_t refId); int32_t schReleaseJob(int64_t refId);
void schFreeFlowCtrl(SSchJob *pJob); void schFreeFlowCtrl(SSchJob *pJob);
int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel); int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel);
int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask); int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask);
int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough); int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough);
int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask); int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask);
...@@ -275,6 +275,32 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId); ...@@ -275,6 +275,32 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId);
int32_t schCloneSMsgSendInfo(void *src, void **dst); int32_t schCloneSMsgSendInfo(void *src, void **dst);
int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob); int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob);
void schFreeJobImpl(void *job); void schFreeJobImpl(void *job);
int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam);
int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx);
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask);
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans);
int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code);
void schFreeRpcCtx(SRpcCtx *pCtx);
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp);
bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus);
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask);
int32_t schSaveJobQueryRes(SSchJob *pJob, SResReadyRsp *rsp);
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp);
void schProcessOnDataFetched(SSchJob *job);
int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode);
void schFreeRpcCtxVal(const void *arg);
int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb);
int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, void *handle);
int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
bool syncSchedule);
int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
int64_t startTs, bool sync);
int32_t schChkUpdateJobStatus(SSchJob *pJob, int8_t newStatus);
int32_t schCancelJob(SSchJob *pJob);
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode);
uint64_t schGenTaskId(void);
void schCloseJobRef(void);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -40,7 +40,7 @@ void schFreeFlowCtrl(SSchJob *pJob) { ...@@ -40,7 +40,7 @@ void schFreeFlowCtrl(SSchJob *pJob) {
pJob->flowCtrl = NULL; pJob->flowCtrl = NULL;
} }
int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) { int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
if (!SCH_IS_QUERY_JOB(pJob)) { if (!SCH_IS_QUERY_JOB(pJob)) {
SCH_JOB_DLOG("job no need flow ctrl, queryJob:%d", SCH_IS_QUERY_JOB(pJob)); SCH_JOB_DLOG("job no need flow ctrl, queryJob:%d", SCH_IS_QUERY_JOB(pJob));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
此差异已折叠。
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "catalog.h"
#include "command.h"
#include "query.h"
#include "schedulerInt.h"
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
void schCloseJobRef(void) {
if (!atomic_load_8((int8_t *)&schMgmt.exit)) {
return;
}
SCH_LOCK(SCH_WRITE, &schMgmt.lock);
if (atomic_load_32(&schMgmt.jobNum) <= 0 && schMgmt.jobRef >= 0) {
taosCloseRef(schMgmt.jobRef);
schMgmt.jobRef = -1;
}
SCH_UNLOCK(SCH_WRITE, &schMgmt.lock);
}
uint64_t schGenTaskId(void) { return atomic_add_fetch_64(&schMgmt.taskId, 1); }
uint64_t schGenUUID(void) {
static uint64_t hashId = 0;
static int32_t requestSerialId = 0;
if (hashId == 0) {
char uid[64];
int32_t code = taosGetSystemUUID(uid, tListLen(uid));
if (code != TSDB_CODE_SUCCESS) {
qError("Failed to get the system uid, reason:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
hashId = MurmurHash3_32(uid, strlen(uid));
}
}
int64_t ts = taosGetTimestampMs();
uint64_t pid = taosGetPId();
int32_t val = atomic_add_fetch_32(&requestSerialId, 1);
uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
return id;
}
void schFreeRpcCtxVal(const void *arg) {
if (NULL == arg) {
return;
}
SMsgSendInfo *pMsgSendInfo = (SMsgSendInfo *)arg;
taosMemoryFreeClear(pMsgSendInfo->param);
taosMemoryFreeClear(pMsgSendInfo);
}
void schFreeRpcCtx(SRpcCtx *pCtx) {
if (NULL == pCtx) {
return;
}
void *pIter = taosHashIterate(pCtx->args, NULL);
while (pIter) {
SRpcCtxVal *ctxVal = (SRpcCtxVal *)pIter;
(*ctxVal->freeFunc)(ctxVal->val);
pIter = taosHashIterate(pCtx->args, pIter);
}
taosHashCleanup(pCtx->args);
if (pCtx->brokenVal.freeFunc) {
(*pCtx->brokenVal.freeFunc)(pCtx->brokenVal.val);
}
}
...@@ -52,7 +52,7 @@ size_t strtrim(char *z) { ...@@ -52,7 +52,7 @@ size_t strtrim(char *z) {
int32_t j = 0; int32_t j = 0;
int32_t delta = 0; int32_t delta = 0;
while (z[j] == ' ') { while (isspace(z[j])) {
++j; ++j;
} }
...@@ -65,9 +65,9 @@ size_t strtrim(char *z) { ...@@ -65,9 +65,9 @@ size_t strtrim(char *z) {
int32_t stop = 0; int32_t stop = 0;
while (z[j] != 0) { while (z[j] != 0) {
if (z[j] == ' ' && stop == 0) { if (isspace(z[j]) && stop == 0) {
stop = j; stop = j;
} else if (z[j] != ' ' && stop != 0) { } else if (!isspace(z[j]) && stop != 0) {
stop = 0; stop = 0;
} }
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册