提交 d0b0ac95 编写于 作者: S slguan

[TD-15] fix bug in sdbInsertRow

上级 eb6764be
......@@ -222,7 +222,6 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) {
int32_t real_size = 0;
int32_t maxAutoIndex = 0;
oldId = pTable->id;
if (sdbOpenSdbFile(pTable) < 0) return -1;
total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM);
......@@ -234,7 +233,6 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) {
sdbTrace("open sdb file:%s for read", pTable->fn);
// Loop to read sdb file row by row
while (1) {
memset(rowHead, 0, total_size);
......@@ -259,9 +257,6 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) {
continue;
}
// sdbTrace("%s id:%ld rowSize:%d", pTable->name, rowHead->id,
// rowHead->rowSize);
bytes = read(pTable->fd, rowHead->data, rowHead->rowSize + sizeof(TSCKSUM));
if (bytes < rowHead->rowSize + sizeof(TSCKSUM)) {
// TODO: Here may cause pTable->size not end of the file
......@@ -276,40 +271,22 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) {
continue;
}
// Check if the the object exists already
if (pTable->keyType == SDB_KEYTYPE_AUTO) {
maxAutoIndex = MAX(maxAutoIndex, *(int32_t *) rowHead->data);
}
pMetaRow = sdbGetRow(pTable, rowHead->data);
if (pMetaRow == NULL) { // New object
if (pMetaRow == NULL) {
if (rowHead->id < 0) {
/* assert(0); */
sdbError("error sdb negative id:%d, sdb:%s, skip", rowHead->id, pTable->name);
} else {
rowMeta.id = rowHead->id;
// TODO: Get rid of the rowMeta.offset and rowSize
rowMeta.offset = pTable->size;
rowMeta.rowSize = rowHead->rowSize;
rowMeta.row = (*pTable->decodeFp)(rowHead->data);
(*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, rowMeta.row, &rowMeta);
if (pTable->keyType == SDB_KEYTYPE_AUTO) {
pTable->autoIndex++;
maxAutoIndex = MAX(maxAutoIndex, *(int32_t*)rowHead->data);
}
pTable->numOfRows++;
}
} else { // already exists
if (pTable->keyType == SDB_KEYTYPE_AUTO) {
pTable->autoIndex++;
maxAutoIndex = MAX(maxAutoIndex, *(int32_t *) rowHead->data);
sdbInsertRow(pTable, rowHead->data, SDB_OPER_DISK);
}
if (rowHead->id < 0) { // Delete the object
(*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, rowHead->data);
(*pTable->destroyFp)(pMetaRow);
pTable->numOfRows--;
numOfDels++;
} else { // Reset the object TODO: is it possible to merge reset and
// update ??
//(*(pTable->appTool))(SDB_TYPE_RESET, pMetaRow, rowHead->data, rowHead->rowSize, NULL);
} else {
if (rowHead->id < 0) {
sdbDeleteRow(pTable, rowHead->data, SDB_OPER_DISK);
} else {
sdbUpdateRow(pTable, rowHead->data, rowHead->rowSize, SDB_OPER_DISK);
}
numOfDels++;
}
......@@ -317,7 +294,6 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) {
pTable->size += real_size;
if (pTable->id < abs(rowHead->id)) pTable->id = abs(rowHead->id);
//TODO: check this valid
pTable->size += 4;
lseek(pTable->fd, 4, SEEK_CUR);
}
......@@ -326,8 +302,10 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) {
pTable->autoIndex = maxAutoIndex;
}
sdbVersion += (pTable->id - oldId);
if (numOfDels > pTable->hashSessions / 4) sdbSaveSnapShot(pTable);
sdbVersion += pTable->id;
if (numOfDels > pTable->hashSessions / 4) {
sdbSaveSnapShot(pTable);
}
tfree(rowHead);
return 0;
......@@ -390,12 +368,13 @@ void *sdbGetRow(void *handle, void *key) {
pMeta = (*sdbGetIndexFp[pTable->keyType])(pTable->iHandle, key);
pthread_mutex_unlock(&pTable->mutex);
if (pMeta == NULL) return NULL;
if (pMeta == NULL) {
return NULL;
}
return pMeta->row;
}
// row here must be encoded string (rowSize > 0) or the object it self (rowSize = 0)
int32_t sdbInsertRow(void *handle, void *row, ESdbOperType oper) {
SSdbTable *pTable = (SSdbTable *)handle;
SRowMeta rowMeta;
......@@ -448,7 +427,7 @@ int32_t sdbInsertRow(void *handle, void *row, ESdbOperType oper) {
}
}
if (oper == SDB_OPER_GLOBAL || oper == SDB_OPER_LOCAL) {
if (oper != SDB_OPER_DISK) {
rowHead->rowSize = (*pTable->encodeFp)(pObj, rowHead->data, pTable->maxRowSize);
assert(rowHead->rowSize > 0 && rowHead->rowSize <= pTable->maxRowSize);
......@@ -462,43 +441,46 @@ int32_t sdbInsertRow(void *handle, void *row, ESdbOperType oper) {
return -1;
}
// update in SDB layer
rowMeta.id = pTable->id;
rowMeta.offset = pTable->size;
rowMeta.rowSize = rowHead->rowSize;
rowMeta.row = pObj;
(*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, pObj, &rowMeta);
twrite(pTable->fd, rowHead, real_size);
pTable->size += real_size;
sdbFinishCommit(pTable);
}
switch (pTable->keyType) {
case SDB_KEYTYPE_STRING:
sdbTrace("table:%s, a record is inserted:%s, sdbVersion:%" PRId64 " id:%" PRId64 " rowSize:%d numOfRows:%d fileSize:%" PRId64,
pTable->name, (char *)row, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size);
break;
case SDB_KEYTYPE_AUTO:
sdbTrace("table:%s, a record is inserted:%d, sdbVersion:%" PRId64 " id:%" PRId64 " rowSize:%d numOfRows:%d fileSize:%" PRId64,
pTable->name, *(int32_t *)row, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size);
break;
default:
sdbTrace("table:%s, a record is inserted, sdbVersion:%" PRId64 " id:%" PRId64 " rowSize:%d numOfRows:%d fileSize:%" PRId64,
pTable->name, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size);
break;
}
}
// update in SDB layer
rowMeta.id = pTable->id;
rowMeta.offset = pTable->size;
rowMeta.rowSize = rowHead->rowSize;
rowMeta.row = pObj;
(*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, pObj, &rowMeta);
if (pTable->keyType == SDB_KEYTYPE_AUTO) {
*((uint32_t *)pObj) = ++pTable->autoIndex;
}
pTable->numOfRows++;
pTable->id++;
sdbVersion++;
if (oper != SDB_OPER_DISK) {
pTable->id++;
sdbVersion++;
}
pthread_mutex_unlock(&pTable->mutex);
switch (pTable->keyType) {
case SDB_KEYTYPE_STRING:
sdbTrace("table:%s, a record is inserted:%s, sdbVersion:%" PRId64 " id:%" PRId64 " rowSize:%d numOfRows:%d fileSize:%" PRId64,
pTable->name, (char *)row, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size);
break;
case SDB_KEYTYPE_AUTO:
sdbTrace("table:%s, a record is inserted:%d, sdbVersion:%" PRId64 " id:%" PRId64 " rowSize:%d numOfRows:%d fileSize:%" PRId64,
pTable->name, *(int32_t *)row, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size);
break;
default:
sdbTrace("table:%s, a record is inserted, sdbVersion:%" PRId64 " id:%" PRId64 " rowSize:%d numOfRows:%d fileSize:%" PRId64,
pTable->name, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size);
break;
}
(*pTable->insertFp)(pObj);
tfree(rowHead);
......@@ -556,20 +538,22 @@ int32_t sdbDeleteRow(void *handle, void *row, ESdbOperType oper) {
}
}
rowHead->delimiter = SDB_DELIMITER;
rowHead->rowSize = rowSize;
rowHead->id = -(pTable->id);
memcpy(rowHead->data, row, rowSize);
if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, total_size) < 0) {
sdbError("failed to get checksum while inserting, sdb:%s", pTable->name);
pthread_mutex_unlock(&pTable->mutex);
tfree(rowHead);
return -1;
}
if (oper != SDB_OPER_DISK) {
rowHead->delimiter = SDB_DELIMITER;
rowHead->rowSize = rowSize;
rowHead->id = -(pTable->id);
memcpy(rowHead->data, row, rowSize);
if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, total_size) < 0) {
sdbError("failed to get checksum while inserting, sdb:%s", pTable->name);
pthread_mutex_unlock(&pTable->mutex);
tfree(rowHead);
return -1;
}
twrite(pTable->fd, rowHead, total_size);
pTable->size += total_size;
sdbFinishCommit(pTable);
twrite(pTable->fd, rowHead, total_size);
pTable->size += total_size;
sdbFinishCommit(pTable);
}
switch (pTable->keyType) {
case SDB_KEYTYPE_STRING:
......@@ -590,14 +574,18 @@ int32_t sdbDeleteRow(void *handle, void *row, ESdbOperType oper) {
(*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, row);
pTable->numOfRows--;
pTable->id++;
sdbVersion++;
if (oper != SDB_OPER_DISK) {
pTable->id++;
sdbVersion++;
}
pthread_mutex_unlock(&pTable->mutex);
tfree(rowHead);
(*pTable->deleteFp)(pMetaRow);
(*pTable->destroyFp)(pMetaRow);
return 0;
}
......@@ -661,23 +649,25 @@ int32_t sdbUpdateRow(void *handle, void *row, int32_t updateSize, ESdbOperType o
real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM);
// write to the new position
rowHead->delimiter = SDB_DELIMITER;
rowHead->id = pTable->id;
if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) {
sdbError("failed to get checksum, sdb:%s id:%d", pTable->name, rowHead->id);
pthread_mutex_unlock(&pTable->mutex);
tfree(rowHead);
return -1;
}
twrite(pTable->fd, rowHead, real_size);
if (oper != SDB_OPER_DISK) {
rowHead->delimiter = SDB_DELIMITER;
rowHead->id = pTable->id;
if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) {
sdbError("failed to get checksum, sdb:%s id:%d", pTable->name, rowHead->id);
pthread_mutex_unlock(&pTable->mutex);
tfree(rowHead);
return -1;
}
twrite(pTable->fd, rowHead, real_size);
pMeta->id = pTable->id;
pMeta->offset = pTable->size;
pMeta->rowSize = rowHead->rowSize;
pTable->size += real_size;
pMeta->id = pTable->id;
pMeta->offset = pTable->size;
pMeta->rowSize = rowHead->rowSize;
pTable->size += real_size;
sdbFinishCommit(pTable);
sdbFinishCommit(pTable);
}
switch (pTable->keyType) {
case SDB_KEYTYPE_STRING:
......@@ -694,8 +684,10 @@ int32_t sdbUpdateRow(void *handle, void *row, int32_t updateSize, ESdbOperType o
break;
}
pTable->id++;
sdbVersion++;
if (oper != SDB_OPER_DISK) {
pTable->id++;
sdbVersion++;
}
pthread_mutex_unlock(&pTable->mutex);
......
......@@ -176,8 +176,7 @@ static int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) {
return code;
}
pUser = malloc(sizeof(SUserObj));
memset(pUser, 0, sizeof(SUserObj));
pUser = calloc(1, sizeof(SUserObj));
strcpy(pUser->user, name);
taosEncryptPass((uint8_t*) pass, strlen(pass), pUser->pass);
strcpy(pUser->acct, pAcct->user);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册