提交 2f340b7c 编写于 作者: H hjxilinx

Merge branch 'develop' into feature/query

...@@ -40,7 +40,7 @@ typedef struct { ...@@ -40,7 +40,7 @@ typedef struct {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
} SWriteMsg; } SWriteMsg;
typedef struct _thread_obj { typedef struct _wworker_pool {
int32_t max; // max number of workers int32_t max; // max number of workers
int32_t nextId; // from 0 to max-1, cyclic int32_t nextId; // from 0 to max-1, cyclic
SWriteWorker *writeWorker; SWriteWorker *writeWorker;
......
...@@ -92,7 +92,7 @@ void *taosAllocateQitem(int size) { ...@@ -92,7 +92,7 @@ void *taosAllocateQitem(int size) {
void taosFreeQitem(void *param) { void taosFreeQitem(void *param) {
if (param == NULL) return; if (param == NULL) return;
//pTrace("item:%p is freed", param); pTrace("item:%p is freed", param);
char *temp = (char *)param; char *temp = (char *)param;
temp -= sizeof(STaosQnode); temp -= sizeof(STaosQnode);
...@@ -117,7 +117,7 @@ int taosWriteQitem(taos_queue param, int type, void *item) { ...@@ -117,7 +117,7 @@ int taosWriteQitem(taos_queue param, int type, void *item) {
queue->numOfItems++; queue->numOfItems++;
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
//pTrace("item:%p is put into queue, type:%d items:%d", item, type, queue->numOfItems); pTrace("item:%p is put into queue, type:%d items:%d", item, type, queue->numOfItems);
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
...@@ -197,7 +197,7 @@ int taosGetQitem(taos_qall param, int *type, void **pitem) { ...@@ -197,7 +197,7 @@ int taosGetQitem(taos_qall param, int *type, void **pitem) {
*pitem = pNode->item; *pitem = pNode->item;
*type = pNode->type; *type = pNode->type;
num = 1; num = 1;
// pTrace("item:%p is fetched, type:%d", *pitem, *type); pTrace("item:%p is fetched, type:%d", *pitem, *type);
} }
return num; return num;
......
...@@ -165,7 +165,10 @@ void vnodeRelease(void *pVnodeRaw) { ...@@ -165,7 +165,10 @@ void vnodeRelease(void *pVnodeRaw) {
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
if (refCount > 0) return; if (refCount > 0) {
dTrace("pVnode:%p vgId:%d, release vnode, refCount:%d", pVnode, pVnode->vgId, refCount);
return;
}
// remove read queue // remove read queue
dnodeFreeRqueue(pVnode->rqueue); dnodeFreeRqueue(pVnode->rqueue);
......
...@@ -255,7 +255,8 @@ int vnodeWriteToQueue(void *param, SWalHead *pHead, int type) { ...@@ -255,7 +255,8 @@ int vnodeWriteToQueue(void *param, SWalHead *pHead, int type) {
SWalHead *pWal = (SWalHead *)taosAllocateQitem(size); SWalHead *pWal = (SWalHead *)taosAllocateQitem(size);
memcpy(pWal, pHead, size); memcpy(pWal, pHead, size);
taosWriteQitem(pVnode->wqueue, type, pHead); atomic_add_fetch_32(&pVnode->refCount, 1);
taosWriteQitem(pVnode->wqueue, type, pWal);
return 0; return 0;
} }
......
...@@ -50,7 +50,7 @@ enum { TSDB_REPO_STATE_ACTIVE, TSDB_REPO_STATE_CLOSED, TSDB_REPO_STATE_CONFIGURI ...@@ -50,7 +50,7 @@ enum { TSDB_REPO_STATE_ACTIVE, TSDB_REPO_STATE_CLOSED, TSDB_REPO_STATE_CONFIGURI
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg); static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg);
static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo); static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo);
static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo); static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo);
static int tsdbOpenMetaFile(char *tsdbDir); // static int tsdbOpenMetaFile(char *tsdbDir);
static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock); static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock);
static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg); static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg);
static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname); static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname);
...@@ -222,10 +222,10 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) { ...@@ -222,10 +222,10 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) {
return (tsdb_repo_t *)pRepo; return (tsdb_repo_t *)pRepo;
} }
static int32_t tsdbFlushCache(STsdbRepo *pRepo) { // static int32_t tsdbFlushCache(STsdbRepo *pRepo) {
// TODO // // TODO
return 0; // return 0;
} // }
/** /**
* Close a TSDB repository. Only free memory resources, and keep the files. * Close a TSDB repository. Only free memory resources, and keep the files.
...@@ -679,10 +679,10 @@ static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo) { ...@@ -679,10 +679,10 @@ static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo) {
return 0; return 0;
} }
static int tsdbOpenMetaFile(char *tsdbDir) { // static int tsdbOpenMetaFile(char *tsdbDir) {
// TODO // // TODO
return 0; // return 0;
} // }
static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable) { static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
// TODO // TODO
...@@ -937,7 +937,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters ...@@ -937,7 +937,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
TSDB_COMPBLOCK_GET_START_AND_SIZE(pCompInfo, pTBlock, nBlocks); TSDB_COMPBLOCK_GET_START_AND_SIZE(pCompInfo, pTBlock, nBlocks);
SCompBlock tBlock; SCompData tBlock;
int64_t toffset; int64_t toffset;
int32_t tlen; int32_t tlen;
tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_LAST], pTBlock, nBlocks, pCols, &tBlock); tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_LAST], pTBlock, nBlocks, pCols, &tBlock);
...@@ -968,7 +968,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters ...@@ -968,7 +968,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
pCompInfo->uid = pTable->tableId.uid; pCompInfo->uid = pTable->tableId.uid;
// Load SCompBlock part if neccessary // Load SCompBlock part if neccessary
int isCompBlockLoaded = 0; // int isCompBlockLoaded = 0;
if (0) { if (0) {
// if (pIdx->offset > 0) { // if (pIdx->offset > 0) {
if (pIdx->hasLast || tsdbHasDataInRange(pIter, minKey, pIdx->maxKey)) { if (pIdx->hasLast || tsdbHasDataInRange(pIter, minKey, pIdx->maxKey)) {
...@@ -976,7 +976,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters ...@@ -976,7 +976,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len + sizeof(SCompBlock) * 100); pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len + sizeof(SCompBlock) * 100);
if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */ if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */
} }
if (pCompInfo->uid == pTable->tableId.uid) isCompBlockLoaded = 1; // if (pCompInfo->uid == pTable->tableId.uid) isCompBlockLoaded = 1;
} else { } else {
// TODO: No need to load the SCompBlock part, just sendfile the SCompBlock part // TODO: No need to load the SCompBlock part, just sendfile the SCompBlock part
// and write those new blocks to it // and write those new blocks to it
...@@ -1023,7 +1023,6 @@ _table_over: ...@@ -1023,7 +1023,6 @@ _table_over:
int bytes = tsendfile(hFile.fd, pGroup->files[TSDB_FILE_TYPE_HEAD].fd, NULL, pIdx->len); int bytes = tsendfile(hFile.fd, pGroup->files[TSDB_FILE_TYPE_HEAD].fd, NULL, pIdx->len);
if (bytes < pIdx->len) { if (bytes < pIdx->len) {
printf("Failed to send file, reason: %s\n", strerror(errno)); printf("Failed to send file, reason: %s\n", strerror(errno));
int d = 1;
} }
if (nNewBlocks > 0) { if (nNewBlocks > 0) {
write(hFile.fd, (void *)(pCompInfo->blocks), sizeof(SCompBlock) * nNewBlocks); write(hFile.fd, (void *)(pCompInfo->blocks), sizeof(SCompBlock) * nNewBlocks);
......
...@@ -374,10 +374,10 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) { ...@@ -374,10 +374,10 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) {
return tsdbAddTableIntoMap(pMeta, pTable); return tsdbAddTableIntoMap(pMeta, pTable);
} }
static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) { // static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) {
// TODO // // TODO
return 0; // return 0;
} // }
static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable) { static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable) {
// TODO: add the table to the map // TODO: add the table to the map
......
...@@ -29,7 +29,7 @@ typedef struct { ...@@ -29,7 +29,7 @@ typedef struct {
} SRecordInfo; } SRecordInfo;
static int32_t tsdbGetMetaFileName(char *rootDir, char *fname); static int32_t tsdbGetMetaFileName(char *rootDir, char *fname);
static int32_t tsdbCheckMetaHeader(int fd); // static int32_t tsdbCheckMetaHeader(int fd);
static int32_t tsdbWriteMetaHeader(int fd); static int32_t tsdbWriteMetaHeader(int fd);
static int tsdbCreateMetaFile(char *fname); static int tsdbCreateMetaFile(char *fname);
static int tsdbRestoreFromMetaFile(char *fname, SMetaFile *mfh); static int tsdbRestoreFromMetaFile(char *fname, SMetaFile *mfh);
...@@ -185,10 +185,10 @@ static int32_t tsdbGetMetaFileName(char *rootDir, char *fname) { ...@@ -185,10 +185,10 @@ static int32_t tsdbGetMetaFileName(char *rootDir, char *fname) {
return 0; return 0;
} }
static int32_t tsdbCheckMetaHeader(int fd) { // static int32_t tsdbCheckMetaHeader(int fd) {
// TODO: write the meta file header check function // // TODO: write the meta file header check function
return 0; // return 0;
} // }
static int32_t tsdbWriteMetaHeader(int fd) { static int32_t tsdbWriteMetaHeader(int fd) {
// TODO: write the meta file header to file // TODO: write the meta file header to file
...@@ -199,10 +199,10 @@ static int32_t tsdbWriteMetaHeader(int fd) { ...@@ -199,10 +199,10 @@ static int32_t tsdbWriteMetaHeader(int fd) {
return 0; return 0;
} }
static int32_t tsdbReadMetaHeader(int fd) { // static int32_t tsdbReadMetaHeader(int fd) {
lseek(fd, TSDB_META_FILE_HEADER_SIZE, SEEK_SET); // lseek(fd, TSDB_META_FILE_HEADER_SIZE, SEEK_SET);
return 0; // return 0;
} // }
static int tsdbCreateMetaFile(char *fname) { static int tsdbCreateMetaFile(char *fname) {
int fd = open(fname, O_RDWR | O_CREAT, 0755); int fd = open(fname, O_RDWR | O_CREAT, 0755);
......
...@@ -287,6 +287,8 @@ static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SW ...@@ -287,6 +287,8 @@ static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SW
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL); (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL);
} }
free(buffer);
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册