提交 bf3ff27d 编写于 作者: H Haojun Liao

[TD-225] merge develop

......@@ -90,7 +90,7 @@ TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMic
```mysql
ALTER DATABASE db_name REPLICA 2;
```
REPLICA参数是指修改数据库副本数,取值范围[1, 3]。在集群中使用,副本数必须小于dnode的数目。
REPLICA参数是指修改数据库副本数,取值范围[1, 3]。在集群中使用,副本数必须小于或等于dnode的数目。
```mysql
ALTER DATABASE db_name KEEP 365;
......
......@@ -4,16 +4,99 @@
### 物联网典型场景
在典型的物联网、车联网、运维监测场景中,往往有多种不同类型的数据采集设备,采集一个到多个不同的物理量。而同一种采集设备类型,往往又有多个具体的采集设备分布在不同的地点。大数据处理系统就是要将各种采集的数据汇总,然后进行计算和分析。对于同一类设备,其采集的数据都是很规则的。以智能电表为例,假设每个智能电表采集电流、电压、相位三个量,其采集的数据类似如下的表格:
| Device ID | Time Stamp | current | voltage | phase | location | groupId |
| :-------: | :-----------: | :-----: | :-----: | :---: | :--------------: | :-----: |
| d1001 | 1538548685000 | 10.3 | 219 | 0.31 | Beijing.Chaoyang | 2 |
| d1002 | 1538548684000 | 10.2 | 220 | 0.23 | Beijing.Chaoyang | 3 |
| d1003 | 1538548686500 | 11.5 | 221 | 0.35 | Beijing.Haidian | 3 |
| d1004 | 1538548685500 | 13.4 | 223 | 0.29 | Beijing.Haidian | 2 |
| d1001 | 1538548695000 | 12.6 | 218 | 0.33 | Beijing.Chaoyang | 2 |
| d1004 | 1538548696600 | 11.8 | 221 | 0.28 | Beijing.Haidian | 2 |
| d1002 | 1538548696650 | 10.3 | 218 | 0.25 | Beijing.Chaoyang | 3 |
| d1001 | 1538548696800 | 12.3 | 221 | 0.31 | Beijing.Chaoyang | 2 |
<figure><table>
<thead><tr>
<th style="text-align:center;">设备ID</th>
<th style="text-align:center;">时间戳</th>
<th style="text-align:center;" colspan="3">采集量</th>
<th style="text-align:center;" colspan="2">标签</th>
</tr>
<tr>
<th style="text-align:center;">Device ID</th>
<th style="text-align:center;">Time Stamp</th>
<th style="text-align:center;">current</th>
<th style="text-align:center;">voltage</th>
<th style="text-align:center;">phase</th>
<th style="text-align:center;">location</th>
<th style="text-align:center;">groupId</th>
</tr>
</thead>
<tbody>
<tr>
<td style="text-align:center;">d1001</td>
<td style="text-align:center;">1538548685000</td>
<td style="text-align:center;">10.3</td>
<td style="text-align:center;">219</td>
<td style="text-align:center;">0.31</td>
<td style="text-align:center;">Beijing.Chaoyang</td>
<td style="text-align:center;">2</td>
</tr>
<tr>
<td style="text-align:center;">d1002</td>
<td style="text-align:center;">1538548684000</td>
<td style="text-align:center;">10.2</td>
<td style="text-align:center;">220</td>
<td style="text-align:center;">0.23</td>
<td style="text-align:center;">Beijing.Chaoyang</td>
<td style="text-align:center;">3</td>
</tr>
<tr>
<td style="text-align:center;">d1003</td>
<td style="text-align:center;">1538548686500</td>
<td style="text-align:center;">11.5</td>
<td style="text-align:center;">221</td>
<td style="text-align:center;">0.35</td>
<td style="text-align:center;">Beijing.Haidian</td>
<td style="text-align:center;">3</td>
</tr>
<tr>
<td style="text-align:center;">d1004</td>
<td style="text-align:center;">1538548685500</td>
<td style="text-align:center;">13.4</td>
<td style="text-align:center;">223</td>
<td style="text-align:center;">0.29</td>
<td style="text-align:center;">Beijing.Haidian</td>
<td style="text-align:center;">2</td>
</tr>
<tr>
<td style="text-align:center;">d1001</td>
<td style="text-align:center;">1538548695000</td>
<td style="text-align:center;">12.6</td>
<td style="text-align:center;">218</td>
<td style="text-align:center;">0.33</td>
<td style="text-align:center;">Beijing.Chaoyang</td>
<td style="text-align:center;">2</td>
</tr>
<tr>
<td style="text-align:center;">d1004</td>
<td style="text-align:center;">1538548696600</td>
<td style="text-align:center;">11.8</td>
<td style="text-align:center;">221</td>
<td style="text-align:center;">0.28</td>
<td style="text-align:center;">Beijing.Haidian</td>
<td style="text-align:center;">2</td>
</tr>
<tr>
<td style="text-align:center;">d1002</td>
<td style="text-align:center;">1538548696650</td>
<td style="text-align:center;">10.3</td>
<td style="text-align:center;">218</td>
<td style="text-align:center;">0.25</td>
<td style="text-align:center;">Beijing.Chaoyang</td>
<td style="text-align:center;">3</td>
</tr>
<tr>
<td style="text-align:center;">d1001</td>
<td style="text-align:center;">1538548696800</td>
<td style="text-align:center;">12.3</td>
<td style="text-align:center;">221</td>
<td style="text-align:center;">0.31</td>
<td style="text-align:center;">Beijing.Chaoyang</td>
<td style="text-align:center;">2</td>
</tr>
</tbody>
</table></figure>
<center> 表1:智能电表数据示例</center>
......
......@@ -571,8 +571,8 @@ static void balanceCheckDnodeAccess() {
if (pDnode->status != TAOS_DN_STATUS_DROPPING && pDnode->status != TAOS_DN_STATUS_OFFLINE) {
pDnode->status = TAOS_DN_STATUS_OFFLINE;
pDnode->offlineReason = TAOS_DN_OFF_STATUS_MSG_TIMEOUT;
mInfo("dnode:%d, set to offline state, access seq:%d, last seq:%d", pDnode->dnodeId, tsAccessSquence,
pDnode->lastAccess);
mInfo("dnode:%d, set to offline state, access seq:%d last seq:%d laststat:%d", pDnode->dnodeId, tsAccessSquence,
pDnode->lastAccess, pDnode->status);
balanceSetVgroupOffline(pDnode);
}
}
......
......@@ -1235,8 +1235,7 @@ void tscColumnListDestroy(SArray* pColumnList) {
*
*/
static int32_t validateQuoteToken(SStrToken* pToken) {
strdequote(pToken->z);
pToken->n = (uint32_t)strtrim(pToken->z);
tscDequoteAndTrimToken(pToken);
int32_t k = tSQLGetToken(pToken->z, &pToken->type);
......@@ -1251,8 +1250,6 @@ static int32_t validateQuoteToken(SStrToken* pToken) {
}
void tscDequoteAndTrimToken(SStrToken* pToken) {
assert(pToken->type == TK_STRING);
uint32_t first = 0, last = pToken->n;
// trim leading spaces
......@@ -1364,7 +1361,8 @@ int32_t tscValidateName(SStrToken* pToken) {
} else {
pStr[firstPartLen] = TS_PATH_DELIMITER[0];
memmove(&pStr[firstPartLen + 1], pToken->z, pToken->n);
pStr[firstPartLen + sizeof(TS_PATH_DELIMITER[0]) + pToken->n] = 0;
uint32_t offset = (uint32_t)(pToken->z - (pStr + firstPartLen + 1));
memset(pToken->z + pToken->n - offset, ' ', offset);
}
pToken->n += (firstPartLen + sizeof(TS_PATH_DELIMITER[0]));
pToken->z = pStr;
......
......@@ -44,6 +44,7 @@ extern int32_t tsMaxShellConns;
extern int32_t tsShellActivityTimer;
extern uint32_t tsMaxTmrCtrl;
extern float tsNumOfThreadsPerCore;
extern int32_t tsNumOfCommitThreads;
extern float tsRatioOfQueryThreads; // todo remove it
extern int8_t tsDaylight;
extern char tsTimezone[];
......
......@@ -51,6 +51,7 @@ int32_t tsMaxShellConns = 5000;
int32_t tsMaxConnections = 5000;
int32_t tsShellActivityTimer = 3; // second
float tsNumOfThreadsPerCore = 1.0f;
int32_t tsNumOfCommitThreads = 1;
float tsRatioOfQueryThreads = 0.5f;
int8_t tsDaylight = 0;
char tsTimezone[TSDB_TIMEZONE_LEN] = {0};
......@@ -426,6 +427,16 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "numOfCommitThreads";
cfg.ptr = &tsNumOfCommitThreads;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG;
cfg.minValue = 1;
cfg.maxValue = 100;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "ratioOfQueryThreads";
cfg.ptr = &tsRatioOfQueryThreads;
cfg.valType = TAOS_CFG_VTYPE_FLOAT;
......
Subproject commit ec77d9049a719dabfd1a7c1122a209e201861944
Subproject commit d598db167eb256fe67409b7bb3d0eb7fffc3ff8c
......@@ -77,15 +77,15 @@ void dnodeUpdateMInfos(SMnodeInfos *minfos) {
void dnodeUpdateEpSetForPeer(SRpcEpSet *ep) {
if (ep->numOfEps <= 0) {
dError("mnode EP list for peer is changed, but content is invalid, discard it");
dError("minfos is changed, but content is invalid, discard it");
return;
}
pthread_mutex_lock(&tsMInfosMutex);
dInfo("mnode EP list for peer is changed, numOfEps:%d inUse:%d", ep->numOfEps, ep->inUse);
dInfo("minfos is changed, numOfEps:%d inUse:%d", ep->numOfEps, ep->inUse);
for (int i = 0; i < ep->numOfEps; ++i) {
ep->port[i] -= TSDB_PORT_DNODEDNODE;
dInfo("mnode index:%d %s:%u", i, ep->fqdn[i], ep->port[i]);
dInfo("minfo:%d %s:%u", i, ep->fqdn[i], ep->port[i]);
}
tsMEpSet = *ep;
pthread_mutex_unlock(&tsMInfosMutex);
......
......@@ -321,6 +321,12 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle);
*/
void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage);
int tsdbInitCommitQueue(int nthreads);
void tsdbDestroyCommitQueue();
int tsdbSyncCommit(TSDB_REPO_T *repo);
void tsdbIncCommitRef(int vgId);
void tsdbDecCommitRef(int vgId);
#ifdef __cplusplus
}
#endif
......
......@@ -54,16 +54,17 @@ typedef int32_t FWalWrite(void *ahandle, void *pHead, int32_t qtype, void *pMsg)
int32_t walInit();
void walCleanUp();
twalh walOpen(char *path, SWalCfg *pCfg);
int32_t walAlter(twalh pWal, SWalCfg *pCfg);
void walStop(twalh);
void walClose(twalh);
int32_t walRenew(twalh);
void walRemoveOldFiles(twalh);
int32_t walWrite(twalh, SWalHead *);
void walFsync(twalh, bool forceFsync);
int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp);
int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId);
twalh walOpen(char *path, SWalCfg *pCfg);
int32_t walAlter(twalh pWal, SWalCfg *pCfg);
void walStop(twalh);
void walClose(twalh);
int32_t walRenew(twalh);
void walRemoveOneOldFile(twalh);
void walRemoveAllOldFiles(twalh);
int32_t walWrite(twalh, SWalHead *);
void walFsync(twalh, bool forceFsync);
int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp);
int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId);
uint64_t walGetVersion(twalh);
#ifdef __cplusplus
......
......@@ -475,6 +475,7 @@ typedef struct {
tsem_t mutex_sem;
int notFinished;
tsem_t lock_sem;
int counter;
} info;
typedef struct {
......@@ -766,6 +767,7 @@ int main(int argc, char *argv[]) {
t_info->data_of_rate = rate;
t_info->end_table_id = i < b ? last + a : last + a - 1;
last = t_info->end_table_id + 1;
t_info->counter = 0;
tsem_init(&(t_info->mutex_sem), 0, 1);
t_info->notFinished = t_info->end_table_id - t_info->start_table_id + 1;
......@@ -788,14 +790,14 @@ int main(int argc, char *argv[]) {
printf("ASYNC Insert with %d connections:\n", threads);
}
fprintf(fp, "|%10.d | %10.2f | %10.2f | %10.4f |\n\n",
ntables * nrecords_per_table, ntables * nrecords_per_table / t,
(ntables * nrecords_per_table) / (t * nrecords_per_request),
fprintf(fp, "|%"PRIu64" | %10.2f | %10.2f | %10.4f |\n\n",
(int64_t)ntables * nrecords_per_table, ntables * nrecords_per_table / t,
((int64_t)ntables * nrecords_per_table) / (t * nrecords_per_request),
t * 1000);
printf("Spent %.4f seconds to insert %lld records with %d record(s) per request: %.2f records/second\n",
t, (long long int)ntables * nrecords_per_table, nrecords_per_request,
((long long int)ntables * nrecords_per_table) / t);
printf("Spent %.4f seconds to insert %"PRIu64" records with %d record(s) per request: %.2f records/second\n",
t, (int64_t)ntables * nrecords_per_table, nrecords_per_request,
(int64_t)ntables * nrecords_per_table / t);
for (int i = 0; i < threads; i++) {
info *t_info = infos + i;
......@@ -879,6 +881,7 @@ int main(int argc, char *argv[]) {
taos_close(rInfo->taos);
}
taos_cleanup();
return 0;
}
......@@ -1283,68 +1286,39 @@ void *syncWrite(void *sarg) {
void *asyncWrite(void *sarg) {
info *winfo = (info *)sarg;
sTable *tb_infos = (sTable *)malloc(sizeof(sTable) * (winfo->end_table_id - winfo->start_table_id + 1));
for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) {
sTable *tb_info = tb_infos + tID - winfo->start_table_id;
tb_info->data_type = winfo->datatype;
tb_info->ncols_per_record = winfo->ncols_per_record;
tb_info->taos = winfo->taos;
sprintf(tb_info->tb_name, "%s.%s%d", winfo->db_name, winfo->tb_prefix, tID);
tb_info->timestamp = winfo->start_time;
tb_info->counter = 0;
tb_info->target = winfo->nrecords_per_table;
tb_info->len_of_binary = winfo->len_of_binary;
tb_info->nrecords_per_request = winfo->nrecords_per_request;
tb_info->mutex_sem = &(winfo->mutex_sem);
tb_info->notFinished = &(winfo->notFinished);
tb_info->lock_sem = &(winfo->lock_sem);
tb_info->data_of_order = winfo->data_of_order;
tb_info->data_of_rate = winfo->data_of_rate;
/* char buff[BUFFER_SIZE] = "\0"; */
/* sprintf(buff, "insert into %s values (0, 0)", tb_info->tb_name); */
/* queryDB(tb_info->taos,buff); */
taos_query_a(winfo->taos, "show databases", callBack, tb_info);
}
taos_query_a(winfo->taos, "show databases", callBack, winfo);
tsem_wait(&(winfo->lock_sem));
free(tb_infos);
return NULL;
}
void callBack(void *param, TAOS_RES *res, int code) {
sTable *tb_info = (sTable *)param;
char **datatype = tb_info->data_type;
int ncols_per_record = tb_info->ncols_per_record;
int len_of_binary = tb_info->len_of_binary;
int64_t tmp_time = tb_info->timestamp;
if (code < 0) {
fprintf(stderr, "failed to insert data %d:reason; %s\n", code, taos_errstr(res));
exit(EXIT_FAILURE);
}
info* winfo = (info*)param;
char **datatype = winfo->datatype;
int ncols_per_record = winfo->ncols_per_record;
int len_of_binary = winfo->len_of_binary;
// If finished;
if (tb_info->counter >= tb_info->target) {
tsem_wait(tb_info->mutex_sem);
(*(tb_info->notFinished))--;
if (*(tb_info->notFinished) == 0) tsem_post(tb_info->lock_sem);
tsem_post(tb_info->mutex_sem);
int64_t tmp_time = winfo->start_time;
char *buffer = calloc(1, BUFFER_SIZE);
char *data = calloc(1, MAX_DATA_SIZE);
char *pstr = buffer;
pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, winfo->start_table_id);
if (winfo->counter >= winfo->nrecords_per_table) {
winfo->start_table_id++;
winfo->counter = 0;
}
if (winfo->start_table_id > winfo->end_table_id) {
tsem_post(&winfo->lock_sem);
free(buffer);
free(data);
taos_free_result(res);
return;
}
char buffer[BUFFER_SIZE] = "\0";
char data[MAX_DATA_SIZE];
char *pstr = buffer;
pstr += sprintf(pstr, "insert into %s values", tb_info->tb_name);
for (int i = 0; i < tb_info->nrecords_per_request; i++) {
for (int i = 0; i < winfo->nrecords_per_request; i++) {
int rand_num = rand() % 100;
if (tb_info->data_of_order ==1 && rand_num < tb_info->data_of_rate)
if (winfo->data_of_order ==1 && rand_num < winfo->data_of_rate)
{
int64_t d = tmp_time - rand() % 1000000 + rand_num;
generateData(data, datatype, ncols_per_record, d, len_of_binary);
......@@ -1353,15 +1327,15 @@ void callBack(void *param, TAOS_RES *res, int code) {
generateData(data, datatype, ncols_per_record, tmp_time += 1000, len_of_binary);
}
pstr += sprintf(pstr, "%s", data);
tb_info->counter++;
winfo->counter++;
if (tb_info->counter >= tb_info->target) {
if (winfo->counter >= winfo->nrecords_per_table) {
break;
}
}
tb_info->timestamp = tmp_time;
taos_query_a(tb_info->taos, buffer, callBack, tb_info);
taos_query_a(winfo->taos, buffer, callBack, winfo);
free(buffer);
free(data);
taos_free_result(res);
}
......
......@@ -584,7 +584,7 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_CLUSTER_CFG_INCONSISTENT;
}
mDebug("dnode:%d, from offline to online", pDnode->dnodeId);
mInfo("dnode:%d, from offline to online", pDnode->dnodeId);
pDnode->status = TAOS_DN_STATUS_READY;
pDnode->offlineReason = TAOS_DN_OFF_ONLINE;
balanceSyncNotify();
......
......@@ -63,9 +63,9 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort + TSDB_PORT_DNODEDNODE) {
epSet->inUse = (i + 1) % epSet->numOfEps;
mDebug("mnode index:%d ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
mDebug("mpeer:%d ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
} else {
mDebug("mnode index:%d ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i]));
mDebug("mpeer:%d ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i]));
}
}
......
......@@ -132,7 +132,7 @@ int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size) {
// if (leftbytes > 1000000000) leftbytes = 1000000000;
sentbytes = sendfile(dfd, sfd, offset, leftbytes);
if (sentbytes == -1) {
if (errno == EINTR) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
continue;
} else {
return -1;
......
......@@ -7277,7 +7277,7 @@ void qCleanupQueryMgmt(void* pQMgmt) {
pthread_mutex_destroy(&pQueryMgmt->lock);
tfree(pQueryMgmt);
qDebug("vgId:%d queryMgmt cleanup completed", vgId);
qDebug("vgId:%d, queryMgmt cleanup completed", vgId);
}
void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
......
......@@ -486,7 +486,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
pPeer->ip = ip;
pPeer->port = pInfo->nodePort;
pPeer->fqdn[sizeof(pPeer->fqdn) - 1] = 0;
snprintf(pPeer->id, sizeof(pPeer->id), "vgId:%d peer:%s:%u", pNode->vgId, pPeer->fqdn, pPeer->port);
snprintf(pPeer->id, sizeof(pPeer->id), "vgId:%d, peer:%s:%u", pNode->vgId, pPeer->fqdn, pPeer->port);
pPeer->peerFd = -1;
pPeer->syncFd = -1;
......@@ -1222,7 +1222,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
// always update version
nodeVersion = pWalHead->version;
sDebug("vgId:%d, forward to peer, replica:%d role:%s qtype:%s hver:%" PRIu64, pNode->vgId, pNode->replica,
sTrace("vgId:%d, forward to peer, replica:%d role:%s qtype:%s hver:%" PRIu64, pNode->vgId, pNode->replica,
syncRole[nodeRole], qtypeStr[qtype], pWalHead->version);
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;
......
......@@ -128,7 +128,7 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
}
firstPkt.fqdn[sizeof(firstPkt.fqdn) - 1] = 0;
snprintf(pNode->id, sizeof(pNode->id), "vgId:%d peer:%s:%d", firstPkt.sourceId, firstPkt.fqdn, firstPkt.port);
snprintf(pNode->id, sizeof(pNode->id), "vgId:%d, peer:%s:%d", firstPkt.sourceId, firstPkt.fqdn, firstPkt.port);
if (firstPkt.syncHead.vgId) {
sDebug("%s, vgId in head is not zero, close the connection", pNode->id);
tfree(pNode);
......
......@@ -220,8 +220,7 @@ typedef struct {
SMemTable* mem;
SMemTable* imem;
STsdbFileH* tsdbFileH;
int commit;
pthread_t commitThread;
sem_t readyToCommit;
pthread_mutex_t mutex;
bool repoLocked;
} STsdbRepo;
......@@ -440,6 +439,7 @@ void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
int tsdbAsyncCommit(STsdbRepo* pRepo);
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo);
void* tsdbCommitData(STsdbRepo* pRepo);
static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) {
if (pIter == NULL) return NULL;
......@@ -588,6 +588,9 @@ int tsdbScanSCompBlock(STsdbScanHandle* pScanHandle, int idx);
int tsdbCloseScanFile(STsdbScanHandle* pScanHandle);
void tsdbFreeScanHandle(STsdbScanHandle* pScanHandle);
// ------------------ tsdbCommitQueue.c
int tsdbScheduleCommit(STsdbRepo *pRepo);
#ifdef __cplusplus
}
#endif
......
......@@ -110,7 +110,7 @@ void tsdbCloseBufPool(STsdbRepo *pRepo) {
}
}
tsdbDebug("vgId:%d buffer pool is closed", REPO_ID(pRepo));
tsdbDebug("vgId:%d, buffer pool is closed", REPO_ID(pRepo));
}
SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) {
......@@ -134,7 +134,7 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) {
pBufBlock->offset = 0;
pBufBlock->remain = pBufPool->bufBlockSize;
tsdbDebug("vgId:%d buffer block is allocated, blockId:%" PRId64, REPO_ID(pRepo), pBufBlock->blockId);
tsdbDebug("vgId:%d, buffer block is allocated, blockId:%" PRId64, REPO_ID(pRepo), pBufBlock->blockId);
return pNode;
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tlist.h"
#include "tref.h"
#include "tsdbMain.h"
typedef struct {
bool stop;
pthread_mutex_t lock;
pthread_cond_t queueNotEmpty;
int nthreads;
int refCount;
SList * queue;
pthread_t * threads;
} SCommitQueue;
typedef struct {
STsdbRepo *pRepo;
} SCommitReq;
static void *tsdbLoopCommit(void *arg);
SCommitQueue tsCommitQueue = {0};
int tsdbInitCommitQueue(int nthreads) {
SCommitQueue *pQueue = &tsCommitQueue;
if (nthreads < 1) nthreads = 1;
pQueue->stop = false;
pQueue->nthreads = nthreads;
pQueue->queue = tdListNew(0);
if (pQueue->queue == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
pQueue->threads = (pthread_t *)calloc(nthreads, sizeof(pthread_t));
if (pQueue->threads == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tdListFree(pQueue->queue);
return -1;
}
pthread_mutex_init(&(pQueue->lock), NULL);
pthread_cond_init(&(pQueue->queueNotEmpty), NULL);
for (int i = 0; i < nthreads; i++) {
pthread_create(pQueue->threads + i, NULL, tsdbLoopCommit, NULL);
}
return 0;
}
void tsdbDestroyCommitQueue() {
SCommitQueue *pQueue = &tsCommitQueue;
pthread_mutex_lock(&(pQueue->lock));
if (pQueue->stop) {
pthread_mutex_unlock(&(pQueue->lock));
return;
}
pQueue->stop = true;
pthread_cond_broadcast(&(pQueue->queueNotEmpty));
pthread_mutex_unlock(&(pQueue->lock));
for (size_t i = 0; i < pQueue->nthreads; i++) {
pthread_join(pQueue->threads[i], NULL);
}
free(pQueue->threads);
tdListFree(pQueue->queue);
pthread_cond_destroy(&(pQueue->queueNotEmpty));
pthread_mutex_destroy(&(pQueue->lock));
}
int tsdbScheduleCommit(STsdbRepo *pRepo) {
SCommitQueue *pQueue = &tsCommitQueue;
SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(SCommitReq));
if (pNode == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
((SCommitReq *)pNode->data)->pRepo = pRepo;
pthread_mutex_lock(&(pQueue->lock));
// ASSERT(pQueue->stop);
tdListAppendNode(pQueue->queue, pNode);
pthread_cond_signal(&(pQueue->queueNotEmpty));
pthread_mutex_unlock(&(pQueue->lock));
return 0;
}
static void *tsdbLoopCommit(void *arg) {
SCommitQueue *pQueue = &tsCommitQueue;
SListNode * pNode = NULL;
STsdbRepo * pRepo = NULL;
while (true) {
pthread_mutex_lock(&(pQueue->lock));
while (true) {
pNode = tdListPopHead(pQueue->queue);
if (pNode == NULL) {
if (pQueue->stop && pQueue->refCount <= 0) {
pthread_mutex_unlock(&(pQueue->lock));
goto _exit;
} else {
pthread_cond_wait(&(pQueue->queueNotEmpty), &(pQueue->lock));
}
} else {
break;
}
}
pthread_mutex_unlock(&(pQueue->lock));
pRepo = ((SCommitReq *)pNode->data)->pRepo;
tsdbCommitData(pRepo);
listNodeFree(pNode);
}
_exit:
return NULL;
}
void tsdbIncCommitRef(int vgId) {
int refCount = atomic_add_fetch_32(&tsCommitQueue.refCount, 1);
tsdbDebug("vgId:%d, inc commit queue ref to %d", vgId, refCount);
}
void tsdbDecCommitRef(int vgId) {
int refCount = atomic_sub_fetch_32(&tsCommitQueue.refCount, 1);
pthread_cond_broadcast(&(tsCommitQueue.queueNotEmpty));
tsdbDebug("vgId:%d, dec commit queue ref to %d", vgId, refCount);
}
\ No newline at end of file
......@@ -163,7 +163,7 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
if (toCommit) {
tsdbAsyncCommit(pRepo);
if (pRepo->commit) pthread_join(pRepo->commitThread, NULL);
sem_wait(&(pRepo->readyToCommit));
}
tsdbUnRefMemTable(pRepo, pRepo->mem);
tsdbUnRefMemTable(pRepo, pRepo->imem);
......@@ -675,6 +675,12 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) {
goto _err;
}
code = sem_init(&(pRepo->readyToCommit), 0, 1);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
goto _err;
}
pRepo->repoLocked = false;
pRepo->rootDir = strdup(rootDir);
......@@ -719,6 +725,7 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) {
// tsdbFreeMemTable(pRepo->mem);
// tsdbFreeMemTable(pRepo->imem);
tfree(pRepo->rootDir);
sem_destroy(&(pRepo->readyToCommit));
pthread_mutex_destroy(&pRepo->mutex);
free(pRepo);
}
......
......@@ -24,7 +24,6 @@ static void tsdbFreeMemTable(SMemTable *pMemTable);
static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
static void tsdbFreeTableData(STableData *pTableData);
static char * tsdbGetTsTupleKey(const void *data);
static void * tsdbCommitData(void *arg);
static int tsdbCommitMeta(STsdbRepo *pRepo);
static void tsdbEndCommit(STsdbRepo *pRepo);
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
......@@ -262,40 +261,28 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
int tsdbAsyncCommit(STsdbRepo *pRepo) {
SMemTable *pIMem = pRepo->imem;
int code = 0;
if (pIMem != NULL) {
ASSERT(pRepo->commit);
tsdbDebug("vgId:%d waiting for the commit thread", REPO_ID(pRepo));
code = pthread_join(pRepo->commitThread, NULL);
tsdbDebug("vgId:%d commit thread is finished", REPO_ID(pRepo));
if (code != 0) {
tsdbError("vgId:%d failed to thread join since %s", REPO_ID(pRepo), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pRepo->commit = 0;
}
ASSERT(pRepo->commit == 0);
if (pRepo->mem != NULL) {
sem_wait(&(pRepo->readyToCommit));
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START);
if (tsdbLockRepo(pRepo) < 0) return -1;
pRepo->imem = pRepo->mem;
pRepo->mem = NULL;
pRepo->commit = 1;
code = pthread_create(&pRepo->commitThread, NULL, tsdbCommitData, (void *)pRepo);
if (code != 0) {
tsdbError("vgId:%d failed to create commit thread since %s", REPO_ID(pRepo), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(code);
tsdbUnlockRepo(pRepo);
return -1;
}
tsdbScheduleCommit(pRepo);
if (tsdbUnlockRepo(pRepo) < 0) return -1;
}
if (pIMem && tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1;
if (tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1;
return 0;
}
int tsdbSyncCommit(TSDB_REPO_T *repo) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
tsdbAsyncCommit(pRepo);
sem_wait(&(pRepo->readyToCommit));
sem_post(&(pRepo->readyToCommit));
return 0;
}
......@@ -419,6 +406,68 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
return 0;
}
void *tsdbCommitData(STsdbRepo *pRepo) {
SMemTable * pMem = pRepo->imem;
STsdbCfg * pCfg = &pRepo->config;
SDataCols * pDataCols = NULL;
STsdbMeta * pMeta = pRepo->tsdbMeta;
SCommitIter *iters = NULL;
SRWHelper whelper = {0};
ASSERT(pMem != NULL);
tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo),
pMem->keyFirst, pMem->keyLast, pMem->numOfRows);
// Create the iterator to read from cache
if (pMem->numOfRows > 0) {
iters = tsdbCreateCommitIters(pRepo);
if (iters == NULL) {
tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _exit;
}
if (tsdbInitWriteHelper(&whelper, pRepo) < 0) {
tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _exit;
}
if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s",
REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno));
goto _exit;
}
int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision));
int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision));
// Loop to commit to each file
for (int fid = sfid; fid <= efid; fid++) {
if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) {
tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _exit;
}
}
}
// Commit to update meta file
if (tsdbCommitMeta(pRepo) < 0) {
tsdbError("vgId:%d failed to commit data while committing meta data since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _exit;
}
tsdbFitRetention(pRepo);
_exit:
tdFreeDataCols(pDataCols);
tsdbDestroyCommitIters(iters, pMem->maxTables);
tsdbDestroyHelper(&whelper);
tsdbEndCommit(pRepo);
tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId);
return NULL;
}
// ---------------- LOCAL FUNCTIONS ----------------
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) {
ASSERT(pRepo->mem != NULL);
......@@ -529,69 +578,6 @@ static void tsdbFreeTableData(STableData *pTableData) {
static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple((SDataRow)data); }
static void *tsdbCommitData(void *arg) {
STsdbRepo * pRepo = (STsdbRepo *)arg;
SMemTable * pMem = pRepo->imem;
STsdbCfg * pCfg = &pRepo->config;
SDataCols * pDataCols = NULL;
STsdbMeta * pMeta = pRepo->tsdbMeta;
SCommitIter *iters = NULL;
SRWHelper whelper = {0};
ASSERT(pRepo->commit == 1);
ASSERT(pMem != NULL);
tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo),
pMem->keyFirst, pMem->keyLast, pMem->numOfRows);
// Create the iterator to read from cache
if (pMem->numOfRows > 0) {
iters = tsdbCreateCommitIters(pRepo);
if (iters == NULL) {
tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _exit;
}
if (tsdbInitWriteHelper(&whelper, pRepo) < 0) {
tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _exit;
}
if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s",
REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno));
goto _exit;
}
int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision));
int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision));
// Loop to commit to each file
for (int fid = sfid; fid <= efid; fid++) {
if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) {
tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _exit;
}
}
}
// Commit to update meta file
if (tsdbCommitMeta(pRepo) < 0) {
tsdbError("vgId:%d failed to commit data while committing meta data since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _exit;
}
tsdbFitRetention(pRepo);
_exit:
tdFreeDataCols(pDataCols);
tsdbDestroyCommitIters(iters, pMem->maxTables);
tsdbDestroyHelper(&whelper);
tsdbEndCommit(pRepo);
tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId);
return NULL;
}
static int tsdbCommitMeta(STsdbRepo *pRepo) {
SMemTable *pMem = pRepo->imem;
......@@ -642,8 +628,8 @@ _err:
}
static void tsdbEndCommit(STsdbRepo *pRepo) {
ASSERT(pRepo->commit == 1);
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER);
sem_post(&(pRepo->readyToCommit));
}
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
......
......@@ -2366,8 +2366,8 @@ void filterPrepare(void* expr, void* param) {
if (size < (uint32_t)pSchema->bytes) {
size = pSchema->bytes;
}
pInfo->q = calloc(1, size + TSDB_NCHAR_SIZE); // to make sure tonchar does not cause invalid write, since the '\0' needs at least sizeof(wchar_t) space.
// to make sure tonchar does not cause invalid write, since the '\0' needs at least sizeof(wchar_t) space.
pInfo->q = calloc(1, size + TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE);
tVariantDump(pCond, pInfo->q, pSchema->type, true);
}
}
......
......@@ -377,7 +377,8 @@ int taosCheckVersion(char *input_client_version, char *input_server_version, int
for(int32_t i = 0; i < comparedSegments; ++i) {
if (clientVersionNumber[i] != serverVersionNumber[i]) {
uError("the %d-th number of server version:%s not matched with client version:%s", i, server_version, version);
uError("the %d-th number of server version:%s not matched with client version:%s", i, server_version,
client_version);
return TSDB_CODE_TSC_INVALID_VERSION;
}
}
......
......@@ -67,10 +67,17 @@ int32_t vnodeInitResources() {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
if (tsdbInitCommitQueue(tsNumOfCommitThreads) < 0) {
vError("failed to init vnode commit queue");
return terrno;
}
return TSDB_CODE_SUCCESS;
}
void vnodeCleanupResources() {
tsdbDestroyCommitQueue();
if (tsVnodesHash != NULL) {
vDebug("vnode list is cleanup");
taosHashCleanup(tsVnodesHash);
......@@ -308,6 +315,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode->version = walGetVersion(pVnode->wal);
}
tsdbSyncCommit(pVnode->tsdb);
walRemoveAllOldFiles(pVnode->wal);
walRenew(pVnode->wal);
SSyncInfo syncInfo;
......@@ -346,6 +355,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode->status = TAOS_VN_STATUS_READY;
vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode);
tsdbIncCommitRef(pVnode->vgId);
taosHashPut(tsVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *));
return TSDB_CODE_SUCCESS;
......@@ -437,6 +447,7 @@ void vnodeRelease(void *pVnodeRaw) {
tsem_destroy(&pVnode->sem);
free(pVnode);
tsdbDecCommitRef(vgId);
int32_t count = taosHashGetSize(tsVnodesHash);
vDebug("vgId:%d, vnode is destroyed, vnodes:%d", vgId, count);
......@@ -583,7 +594,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status) {
if (status == TSDB_STATUS_COMMIT_OVER) {
vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
walRemoveOldFiles(pVnode->wal);
walRemoveOneOldFile(pVnode->wal);
return vnodeSaveVersion(pVnode);
}
......
......@@ -229,7 +229,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
if (handle == NULL) { // failed to register qhandle
pRsp->code = terrno;
terrno = 0;
vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo,
vError("vgId:%d, QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo,
tstrerror(pRsp->code));
qDestroyQueryInfo(pQInfo); // destroy it directly
return pRsp->code;
......
......@@ -128,16 +128,7 @@ void walClose(void *handle) {
taosClose(pWal->fd);
if (pWal->keep != TAOS_WAL_KEEP) {
int64_t fileId = -1;
while (walGetNextFile(pWal, &fileId) >= 0) {
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
if (remove(pWal->name) < 0) {
wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name);
} else {
wInfo("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name);
}
}
walRemoveAllOldFiles(pWal);
} else {
wDebug("vgId:%d, wal:%p file:%s, it is closed and kept", pWal->vgId, pWal, pWal->name);
}
......
......@@ -63,7 +63,7 @@ int32_t walRenew(void *handle) {
return code;
}
void walRemoveOldFiles(void *handle) {
void walRemoveOneOldFile(void *handle) {
SWal *pWal = handle;
if (pWal == NULL) return;
if (pWal->keep == TAOS_WAL_KEEP) return;
......@@ -86,6 +86,22 @@ void walRemoveOldFiles(void *handle) {
pthread_mutex_unlock(&pWal->mutex);
}
void walRemoveAllOldFiles(void *handle) {
if (handle == NULL) return;
SWal * pWal = handle;
int64_t fileId = -1;
while (walGetNextFile(pWal, &fileId) >= 0) {
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
if (remove(pWal->name) < 0) {
wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name);
} else {
wInfo("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name);
}
}
}
int32_t walWrite(void *handle, SWalHead *pHead) {
if (handle == NULL) return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册