diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 69d507b3d9622f68ff37470f76410da7c08c35fa..3c941a323dc6f3d8d67fe8d9b9b77e9bd04091ac 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -4,7 +4,7 @@ # Makefile for access/transam # # IDENTIFICATION -# $Header: /cvsroot/pgsql/src/backend/access/transam/Makefile,v 1.6 1998/04/06 00:21:52 momjian Exp $ +# $Header: /cvsroot/pgsql/src/backend/access/transam/Makefile,v 1.7 1999/09/27 15:47:37 vadim Exp $ # #------------------------------------------------------------------------- @@ -13,7 +13,7 @@ include ../../../Makefile.global CFLAGS += -I../.. -OBJS = transam.o transsup.o varsup.o xact.o xid.o +OBJS = transam.o transsup.o varsup.o xact.o xid.o xlog.o rmgr.o all: SUBSYS.o diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c new file mode 100644 index 0000000000000000000000000000000000000000..36d50f08c0c32937d908c6a6ee2957a7d1da03a9 --- /dev/null +++ b/src/backend/access/transam/rmgr.c @@ -0,0 +1,4 @@ +#include "postgres.h" +#include "access/rmgr.h" + +RmgrData *RmgrTable = NULL; diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c new file mode 100644 index 0000000000000000000000000000000000000000..f1b80d402704e712a0ff7d34423aba750dbcf5c2 --- /dev/null +++ b/src/backend/access/transam/xlog.c @@ -0,0 +1,1434 @@ +#include +#include +#include +#include + +#include "postgres.h" +#include "access/xlog.h" +#include "access/xact.h" +#include "storage/sinval.h" +#include "storage/proc.h" +#include "storage/spin.h" +#include "storage/s_lock.h" + +void UpdateControlFile(void); +int XLOGShmemSize(void); +void BootStrapXLOG(void); +void StartupXLOG(void); +void CreateCheckPoint(bool shutdown); + +char *XLogDir = NULL; +char *ControlFilePath = NULL; +uint32 XLOGbuffers = 0; +XLogRecPtr MyLastRecPtr = {0, 0}; +bool StopIfError = false; + +SPINLOCK ControlFileLockId; +SPINLOCK XidGenLockId; + +extern bool ReleaseDataFile(void); + +extern VariableCache ShmemVariableCache; + +#define MinXLOGbuffers 4 + +typedef struct XLgwrRqst +{ + XLogRecPtr Write; /* byte (1-based) to write out */ + XLogRecPtr Flush; /* byte (1-based) to flush */ +} XLgwrRqst; + +typedef struct XLgwrResult +{ + XLogRecPtr Write; /* bytes written out */ + XLogRecPtr Flush; /* bytes flushed */ +} XLgwrResult; + +typedef struct XLogCtlInsert +{ + XLgwrResult LgwrResult; + XLogRecPtr PrevRecord; + uint16 curridx; /* current block index in cache */ + XLogPageHeader currpage; + char *currpos; +} XLogCtlInsert; + +typedef struct XLogCtlWrite +{ + XLgwrResult LgwrResult; + uint16 curridx; /* index of next block to write */ +} XLogCtlWrite; + +typedef struct XLogCtlData +{ + XLogCtlInsert Insert; + XLgwrRqst LgwrRqst; + XLgwrResult LgwrResult; + XLogCtlWrite Write; + char *pages; + XLogRecPtr *xlblocks; /* 1st byte ptr-s + BLCKSZ */ + uint32 XLogCacheByte; + uint32 XLogCacheBlck; +#ifdef HAS_TEST_AND_SET + slock_t insert_lck; + slock_t info_lck; + slock_t lgwr_lck; +#endif +} XLogCtlData; + +static XLogCtlData *XLogCtl = NULL; + +typedef enum DBState +{ + DB_SHUTDOWNED = 1, + DB_SHUTDOWNING, + DB_IN_RECOVERY, + DB_IN_PRODUCTION +} DBState; + +typedef struct ControlFileData +{ + uint32 logId; /* current log file id */ + uint32 logSeg; /* current log file segment (1-based) */ + XLogRecPtr checkPoint; /* last check point record ptr */ + time_t time; /* time stamp of last modification */ + DBState state; /* */ + /* MORE DATA FOLLOWS AT THE END OF THIS STRUCTURE + * - locations of data dirs + */ +} ControlFileData; + +static ControlFileData *ControlFile = NULL; + +typedef struct CheckPoint +{ + XLogRecPtr redo; /* next RecPtr available when we */ + /* began to create CheckPoint */ + /* (i.e. REDO start point) */ + XLogRecPtr undo; /* first record of oldest in-progress */ + /* transaction when we started */ + /* (i.e. UNDO end point) */ + TransactionId nextXid; + Oid nextOid; +} CheckPoint; + +/* + * We break each log file in 64Mb segments + */ +#define XLogSegSize (64*1024*1024) +#define XLogLastSeg (0xffffffff / XLogSegSize) +#define XLogFileSize (XLogLastSeg * XLogSegSize) + +#define XLogFileName(path, log, seg) \ + sprintf(path, "%.*s%c%08X%08X", \ + MAXPGPATH, XLogDir, SEP_CHAR, log, seg) + +#define PrevBufIdx(curridx) \ + ((curridx == 0) ? XLogCtl->XLogCacheBlck : (curridx - 1)) + +#define NextBufIdx(curridx) \ + ((curridx == XLogCtl->XLogCacheBlck) ? 0 : (curridx + 1)) + +#define XLByteLT(left, right) \ + (right.xlogid > left.xlogid || \ + (right.xlogid == left.xlogid && right.xrecoff > left.xrecoff)) + +#define XLByteLE(left, right) \ + (right.xlogid > left.xlogid || \ + (right.xlogid == left.xlogid && right.xrecoff >= left.xrecoff)) + +#define XLByteEQ(left, right) \ + (right.xlogid == left.xlogid && right.xrecoff == left.xrecoff) + +#define InitXLBuffer(curridx) (\ + XLogCtl->xlblocks[curridx].xrecoff = \ + (XLogCtl->xlblocks[Insert->curridx].xrecoff == XLogFileSize) ? \ + BLCKSZ : (XLogCtl->xlblocks[Insert->curridx].xrecoff + BLCKSZ), \ + XLogCtl->xlblocks[curridx].xlogid = \ + (XLogCtl->xlblocks[Insert->curridx].xrecoff == XLogFileSize) ? \ + (XLogCtl->xlblocks[Insert->curridx].xlogid + 1) : \ + XLogCtl->xlblocks[Insert->curridx].xlogid, \ + Insert->curridx = curridx, \ + Insert->currpage = (XLogPageHeader) (XLogCtl->pages + curridx * BLCKSZ), \ + Insert->currpos = \ + ((char*) Insert->currpage) + SizeOfXLogPHD, \ + Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC, \ + Insert->currpage->xlp_info = 0 \ + ) + +#define XRecOffIsValid(xrecoff) \ + (xrecoff % BLCKSZ >= SizeOfXLogPHD && \ + (BLCKSZ - xrecoff % BLCKSZ) >= SizeOfXLogRecord) + +static void GetFreeXLBuffer(void); +static void XLogWrite(char *buffer); +static int XLogFileInit(uint32 log, uint32 seg); +static int XLogFileOpen(uint32 log, uint32 seg, bool econt); +static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, char *buffer); + +static XLgwrResult LgwrResult = {{0, 0}, {0, 0}}; +static XLgwrRqst LgwrRqst = {{0, 0}, {0, 0}}; + +static int logFile = -1; +static uint32 logId = 0; +static uint32 logSeg = 0; +static off_t logOff = 0; + +static XLogRecPtr ReadRecPtr; +static XLogRecPtr EndRecPtr; +static int readFile = -1; +static uint32 readId = 0; +static uint32 readSeg = 0; +static off_t readOff = (off_t) -1; +static char readBuf[BLCKSZ]; +static XLogRecord *nextRecord = NULL; + +XLogRecPtr +XLogInsert(RmgrId rmid, char *hdr, uint32 hdrlen, char *buf, uint32 buflen) +{ + XLogCtlInsert *Insert = &XLogCtl->Insert; + XLogRecord *record; + XLogSubRecord *subrecord; + XLogRecPtr RecPtr; + uint32 len = hdrlen + buflen, + freespace, + wlen; + uint16 curridx; + bool updrqst = false; + + if (len == 0 || len > MAXLOGRECSZ) + elog(STOP, "XLogInsert: invalid record len %u", len); + + /* obtain xlog insert lock */ + if (TAS(&(XLogCtl->insert_lck))) /* busy */ + { + bool do_lgwr = true; + unsigned i = 0; + + for ( ; ; ) + { + /* try to read LgwrResult while waiting for insert lock */ + if (!TAS(&(XLogCtl->info_lck))) + { + LgwrRqst = XLogCtl->LgwrRqst; + LgwrResult = XLogCtl->LgwrResult; + S_UNLOCK(&(XLogCtl->info_lck)); + /* + * If cache is half filled then try to acquire lgwr lock + * and do LGWR work, but only once. + */ + if (do_lgwr && + (LgwrRqst.Write.xlogid != LgwrResult.Write.xlogid || + (LgwrRqst.Write.xrecoff - LgwrResult.Write.xrecoff >= + XLogCtl->XLogCacheByte / 2))) + { + if (!TAS(&(XLogCtl->lgwr_lck))) + { + LgwrResult = XLogCtl->Write.LgwrResult; + if (!TAS(&(XLogCtl->info_lck))) + { + LgwrRqst = XLogCtl->LgwrRqst; + S_UNLOCK(&(XLogCtl->info_lck)); + } + if (XLByteLT(LgwrResult.Write, LgwrRqst.Write)) + { + XLogWrite(NULL); + do_lgwr = false; + } + S_UNLOCK(&(XLogCtl->lgwr_lck)); + } + } + } + s_lock_sleep(i++); + if (!TAS(&(XLogCtl->insert_lck))) + break; + } + } + + freespace = ((char*) Insert->currpage) + BLCKSZ - Insert->currpos; + if (freespace < SizeOfXLogRecord) + { + curridx = NextBufIdx(Insert->curridx); + if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write)) + InitXLBuffer(curridx); + else + GetFreeXLBuffer(); + freespace = BLCKSZ - SizeOfXLogPHD; + } + else + curridx = Insert->curridx; + + freespace -= SizeOfXLogRecord; + record = (XLogRecord*) Insert->currpos; + record->xl_prev = Insert->PrevRecord; + record->xl_xact_prev = MyLastRecPtr; + record->xl_xid = GetCurrentTransactionId(); + record->xl_len = (len > freespace) ? freespace : len; + record->xl_info = (len > freespace) ? XLR_TO_BE_CONTINUED : 0; + record->xl_rmid = rmid; + RecPtr.xlogid = XLogCtl->xlblocks[curridx].xlogid; + RecPtr.xrecoff = + XLogCtl->xlblocks[curridx].xrecoff - BLCKSZ + + Insert->currpos - ((char*) Insert->currpage); + if (MyLastRecPtr.xrecoff == 0) + { + SpinAcquire(SInvalLock); + MyProc->logRec = RecPtr; + SpinRelease(SInvalLock); + } + MyLastRecPtr = RecPtr; + RecPtr.xrecoff += record->xl_len; + Insert->currpos += SizeOfXLogRecord; + if (freespace > 0) + { + wlen = (hdrlen > freespace) ? freespace : hdrlen; + memcpy(Insert->currpos, hdr, wlen); + freespace -= wlen; + hdrlen -= wlen; + hdr += wlen; + Insert->currpos += wlen; + if (buflen > 0 && freespace > 0) + { + wlen = (buflen > freespace) ? freespace : buflen; + memcpy(Insert->currpos, buf, wlen); + freespace -= wlen; + buflen -= wlen; + buf += wlen; + Insert->currpos += wlen; + } + Insert->currpos = ((char*)Insert->currpage) + + DOUBLEALIGN(Insert->currpos - ((char*)Insert->currpage)); + len = hdrlen + buflen; + } + + if (len != 0) + { +nbuf: + curridx = NextBufIdx(curridx); + if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write)) + { + InitXLBuffer(curridx); + updrqst = true; + } + else + { + GetFreeXLBuffer(); + updrqst = false; + } + freespace = BLCKSZ - SizeOfXLogPHD - SizeOfXLogSubRecord; + Insert->currpage->xlp_info |= XLP_FIRST_IS_SUBRECORD; + subrecord = (XLogSubRecord*) Insert->currpos; + Insert->currpos += SizeOfXLogSubRecord; + if (hdrlen > freespace) + { + subrecord->xl_len = freespace; + subrecord->xl_info = XLR_TO_BE_CONTINUED; + memcpy(Insert->currpos, hdr, freespace); + hdrlen -= freespace; + hdr += freespace; + goto nbuf; + } + else if (hdrlen > 0) + { + subrecord->xl_len = hdrlen; + memcpy(Insert->currpos, hdr, hdrlen); + Insert->currpos += hdrlen; + freespace -= hdrlen; + hdrlen = 0; + } + else + subrecord->xl_len = 0; + if (buflen > freespace) + { + subrecord->xl_len += freespace; + subrecord->xl_info = XLR_TO_BE_CONTINUED; + memcpy(Insert->currpos, buf, freespace); + buflen -= freespace; + buf += freespace; + goto nbuf; + } + else if (buflen > 0) + { + subrecord->xl_len += buflen; + memcpy(Insert->currpos, buf, buflen); + Insert->currpos += buflen; + } + subrecord->xl_info = 0; + RecPtr.xlogid = XLogCtl->xlblocks[curridx].xlogid; + RecPtr.xrecoff = XLogCtl->xlblocks[curridx].xrecoff - + BLCKSZ + SizeOfXLogPHD + subrecord->xl_len; + Insert->currpos = ((char*)Insert->currpage) + + DOUBLEALIGN(Insert->currpos - ((char*)Insert->currpage)); + } + freespace = ((char*) Insert->currpage) + BLCKSZ - Insert->currpos; + /* + * All done! Update global LgwrRqst if some block was filled up. + */ + if (freespace < SizeOfXLogRecord) + updrqst = true; /* curridx is filled and available for writing out */ + else + curridx = PrevBufIdx(curridx); + LgwrRqst.Write = XLogCtl->xlblocks[curridx]; + + S_UNLOCK(&(XLogCtl->insert_lck)); + + if (updrqst) + { + unsigned i = 0; + + for ( ; ; ) + { + if (!TAS(&(XLogCtl->info_lck))) + { + if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrRqst.Write)) + XLogCtl->LgwrRqst.Write = LgwrRqst.Write; + S_UNLOCK(&(XLogCtl->info_lck)); + break; + } + s_lock_sleep(i++); + } + } + + return (RecPtr); +} + +void +XLogFlush(XLogRecPtr record) +{ + XLogRecPtr WriteRqst; + char buffer[BLCKSZ]; + char *usebuf = NULL; + unsigned i = 0; + bool force_lgwr = false; + + if (XLByteLE(record, LgwrResult.Flush)) + return; + WriteRqst = LgwrRqst.Write; + for ( ; ; ) + { + /* try to read LgwrResult */ + if (!TAS(&(XLogCtl->info_lck))) + { + LgwrResult = XLogCtl->LgwrResult; + if (XLByteLE(record, LgwrResult.Flush)) + { + S_UNLOCK(&(XLogCtl->info_lck)); + return; + } + if (XLByteLT(XLogCtl->LgwrRqst.Flush, record)) + XLogCtl->LgwrRqst.Flush = record; + if (XLByteLT(WriteRqst, XLogCtl->LgwrRqst.Write)) + { + WriteRqst = XLogCtl->LgwrRqst.Write; + usebuf = NULL; + } + S_UNLOCK(&(XLogCtl->info_lck)); + } + /* if something was added to log cache then try to flush this too */ + if (!TAS(&(XLogCtl->insert_lck))) + { + XLogCtlInsert *Insert = &XLogCtl->Insert; + uint32 freespace = + ((char*) Insert->currpage) + BLCKSZ - Insert->currpos; + + if (freespace < SizeOfXLogRecord) /* buffer is full */ + { + usebuf = NULL; + LgwrRqst.Write = WriteRqst = XLogCtl->xlblocks[Insert->curridx]; + } + else + { + usebuf = buffer; + memcpy(usebuf, Insert->currpage, BLCKSZ - freespace); + memset(usebuf + BLCKSZ - freespace, 0, freespace); + WriteRqst = XLogCtl->xlblocks[Insert->curridx]; + WriteRqst.xrecoff = WriteRqst.xrecoff - BLCKSZ + + Insert->currpos - ((char*) Insert->currpage); + } + S_UNLOCK(&(XLogCtl->insert_lck)); + force_lgwr = true; + } + if (force_lgwr || WriteRqst.xlogid > record.xlogid || + (WriteRqst.xlogid == record.xlogid && + WriteRqst.xrecoff >= record.xrecoff + BLCKSZ)) + { + if (!TAS(&(XLogCtl->lgwr_lck))) + { + LgwrResult = XLogCtl->Write.LgwrResult; + if (XLByteLE(record, LgwrResult.Flush)) + { + S_UNLOCK(&(XLogCtl->lgwr_lck)); + return; + } + if (XLByteLT(LgwrResult.Write, WriteRqst)) + { + LgwrRqst.Flush = LgwrRqst.Write = WriteRqst; + XLogWrite(usebuf); + S_UNLOCK(&(XLogCtl->lgwr_lck)); + if (XLByteLT(LgwrResult.Flush, record)) + elog(STOP, "XLogFlush: request is not satisfyed"); + return; + } + break; + } + } + s_lock_sleep(i++); + } + + if (logFile >= 0 && (LgwrResult.Write.xlogid != logId || + (LgwrResult.Write.xrecoff - 1) / XLogSegSize != logSeg)) + { + if (close(logFile) != 0) + elog(STOP, "Close(logfile %u seg %u) failed: %d", + logId, logSeg, errno); + logFile = -1; + } + + if (logFile < 0) + { + logId = LgwrResult.Write.xlogid; + logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize; + logOff = (off_t) 0; + logFile = XLogFileOpen(logId, logSeg, false); + } + + if (fsync(logFile) != 0) + elog(STOP, "Fsync(logfile %u seg %u) failed: %d", + logId, logSeg, errno); + LgwrResult.Flush = LgwrResult.Write; + + for (i = 0; ; ) + { + if (!TAS(&(XLogCtl->info_lck))) + { + XLogCtl->LgwrResult = LgwrResult; + if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrResult.Write)) + XLogCtl->LgwrRqst.Write = LgwrResult.Write; + S_UNLOCK(&(XLogCtl->info_lck)); + break; + } + s_lock_sleep(i++); + } + XLogCtl->Write.LgwrResult = LgwrResult; + + S_UNLOCK(&(XLogCtl->lgwr_lck)); + return; + +} + +static void +GetFreeXLBuffer() +{ + XLogCtlInsert *Insert = &XLogCtl->Insert; + XLogCtlWrite *Write = &XLogCtl->Write; + uint16 curridx = NextBufIdx(Insert->curridx); + + LgwrRqst.Write = XLogCtl->xlblocks[Insert->curridx]; + for ( ; ; ) + { + if (!TAS(&(XLogCtl->info_lck))) + { + LgwrResult = XLogCtl->LgwrResult; + XLogCtl->LgwrRqst.Write = LgwrRqst.Write; + S_UNLOCK(&(XLogCtl->info_lck)); + if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write)) + { + Insert->LgwrResult = LgwrResult; + InitXLBuffer(curridx); + return; + } + } + /* + * LgwrResult lock is busy or un-updated. Try to acquire lgwr lock + * and write full blocks. + */ + if (!TAS(&(XLogCtl->lgwr_lck))) + { + LgwrResult = Write->LgwrResult; + if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write)) + { + S_UNLOCK(&(XLogCtl->lgwr_lck)); + Insert->LgwrResult = LgwrResult; + InitXLBuffer(curridx); + return; + } + /* + * Have to write buffers while holding insert lock - + * not good... + */ + XLogWrite(NULL); + S_UNLOCK(&(XLogCtl->lgwr_lck)); + Insert->LgwrResult = LgwrResult; + InitXLBuffer(curridx); + return; + } + } + + return; +} + +static void +XLogWrite(char *buffer) +{ + XLogCtlWrite *Write = &XLogCtl->Write; + char *from; + uint32 wcnt = 0; + int i = 0; + + for ( ; XLByteLT(LgwrResult.Write, LgwrRqst.Write); ) + { + LgwrResult.Write = XLogCtl->xlblocks[Write->curridx]; + if (LgwrResult.Write.xlogid != logId || + (LgwrResult.Write.xrecoff - 1) / XLogSegSize != logSeg) + { + if (wcnt > 0) + { + if (fsync(logFile) != 0) + elog(STOP, "Fsync(logfile %u seg %u) failed: %d", + logId, logSeg, errno); + if (LgwrResult.Write.xlogid != logId) + LgwrResult.Flush.xrecoff = XLogFileSize; + else + LgwrResult.Flush.xrecoff = LgwrResult.Write.xrecoff - BLCKSZ; + LgwrResult.Flush.xlogid = logId; + if (!TAS(&(XLogCtl->info_lck))) + { + XLogCtl->LgwrResult.Flush = LgwrResult.Flush; + XLogCtl->LgwrResult.Write = LgwrResult.Flush; + if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrResult.Flush)) + XLogCtl->LgwrRqst.Write = LgwrResult.Flush; + if (XLByteLT(XLogCtl->LgwrRqst.Flush, LgwrResult.Flush)) + XLogCtl->LgwrRqst.Flush = LgwrResult.Flush; + S_UNLOCK(&(XLogCtl->info_lck)); + } + } + if (logFile >= 0) + { + if (close(logFile) != 0) + elog(STOP, "Close(logfile %u seg %u) failed: %d", + logId, logSeg, errno); + logFile = -1; + } + logId = LgwrResult.Write.xlogid; + logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize; + logOff = (off_t) 0; + logFile = XLogFileInit(logId, logSeg); + SpinAcquire(ControlFileLockId); + ControlFile->logId = logId; + ControlFile->logSeg = logSeg + 1; + ControlFile->time = time(NULL); + UpdateControlFile(); + SpinRelease(ControlFileLockId); + } + + if (logFile < 0) + { + logId = LgwrResult.Write.xlogid; + logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize; + logOff = (off_t) 0; + logFile = XLogFileOpen(logId, logSeg, false); + } + + if (logOff != (LgwrResult.Write.xrecoff - BLCKSZ) % XLogSegSize) + { + logOff = (LgwrResult.Write.xrecoff - BLCKSZ) % XLogSegSize; + if (lseek(logFile, logOff, SEEK_SET) < 0) + elog(STOP, "Lseek(logfile %u seg %u off %u) failed: %d", + logId, logSeg, logOff, errno); + } + + if (buffer != NULL && XLByteLT(LgwrRqst.Write, LgwrResult.Write)) + from = buffer; + else + from = XLogCtl->pages + Write->curridx * BLCKSZ; + + if (write(logFile, from, BLCKSZ) != BLCKSZ) + elog(STOP, "Write(logfile %u seg %u off %u) failed: %d", + logId, logSeg, logOff, errno); + + wcnt++; + logOff += BLCKSZ; + + if (from != buffer) + Write->curridx = NextBufIdx(Write->curridx); + else + LgwrResult.Write = LgwrRqst.Write; + } + if (wcnt == 0) + elog(STOP, "XLogWrite: nothing written"); + + if (XLByteLT(LgwrResult.Flush, LgwrRqst.Flush) && + XLByteLE(LgwrRqst.Flush, LgwrResult.Write)) + { + if (fsync(logFile) != 0) + elog(STOP, "Fsync(logfile %u seg %u) failed: %d", + logId, logSeg, errno); + LgwrResult.Flush = LgwrResult.Write; + } + + for ( ; ; ) + { + if (!TAS(&(XLogCtl->info_lck))) + { + XLogCtl->LgwrResult = LgwrResult; + if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrResult.Write)) + XLogCtl->LgwrRqst.Write = LgwrResult.Write; + S_UNLOCK(&(XLogCtl->info_lck)); + break; + } + s_lock_sleep(i++); + } + Write->LgwrResult = LgwrResult; +} + +static int +XLogFileInit(uint32 log, uint32 seg) +{ + char path[MAXPGPATH+1]; + int fd; + + XLogFileName(path, log, seg); + unlink(path); + +tryAgain: + fd = open(path, O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR); + if (fd < 0 && (errno == EMFILE || errno == ENFILE)) + { + fd = errno; + if (!ReleaseDataFile()) + elog(STOP, "Create(logfile %u seg %u) failed: %d (and no one data file can be closed)", + logId, logSeg, fd); + goto tryAgain; + } + if (fd < 0) + elog(STOP, "Init(logfile %u seg %u) failed: %d", + logId, logSeg, errno); + + if (lseek(fd, XLogSegSize - 1, SEEK_SET) != (off_t) (XLogSegSize - 1)) + elog(STOP, "Lseek(logfile %u seg %u) failed: %d", + logId, logSeg, errno); + + if (write(fd, "", 1) != 1) + elog(STOP, "Init(logfile %u seg %u) failed: %d", + logId, logSeg, errno); + + if (fsync(fd) != 0) + elog(STOP, "Fsync(logfile %u seg %u) failed: %d", + logId, logSeg, errno); + + return(fd); +} + +static int +XLogFileOpen(uint32 log, uint32 seg, bool econt) +{ + char path[MAXPGPATH+1]; + int fd; + + XLogFileName(path, log, seg); + +tryAgain: + fd = open(path, O_RDWR); + if (fd < 0 && (errno == EMFILE || errno == ENFILE)) + { + fd = errno; + if (!ReleaseDataFile()) + elog(STOP, "Open(logfile %u seg %u) failed: %d (and no one data file can be closed)", + logId, logSeg, fd); + goto tryAgain; + } + if (fd < 0) + { + if (econt && errno == ENOENT) + { + elog(LOG, "Open(logfile %u seg %u) failed: file doesn't exist", + logId, logSeg); + return (fd); + } + elog(STOP, "Open(logfile %u seg %u) failed: %d", + logId, logSeg, errno); + } + + return(fd); +} + +void +UpdateControlFile() +{ + int fd; + +tryAgain: + fd = open(ControlFilePath, O_RDWR); + if (fd < 0 && (errno == EMFILE || errno == ENFILE)) + { + fd = errno; + if (!ReleaseDataFile()) + elog(STOP, "Open(cntlfile) failed: %d (and no one data file can be closed)", + fd); + goto tryAgain; + } + if (fd < 0) + elog(STOP, "Open(cntlfile) failed: %d", errno); + + if (write(fd, ControlFile, BLCKSZ) != BLCKSZ) + elog(STOP, "Write(cntlfile) failed: %d", errno); + + if (fsync(fd) != 0) + elog(STOP, "Fsync(cntlfile) failed: %d", errno); + + close(fd); + + return; +} + +int +XLOGShmemSize() +{ + if (XLOGbuffers < MinXLOGbuffers) + XLOGbuffers = MinXLOGbuffers; + + return(sizeof(XLogCtlData) + BLCKSZ * XLOGbuffers + + sizeof(XLogRecPtr) * XLOGbuffers + BLCKSZ); +} + +/* + * This func must be called ONCE on system install + */ +void +BootStrapXLOG() +{ + int fd; + char buffer[BLCKSZ]; + XLogPageHeader page = (XLogPageHeader)buffer; + CheckPoint checkPoint; + XLogRecord *record; + + fd = open(ControlFilePath, O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR); + if (fd < 0) + elog(STOP, "BootStrapXLOG failed to create control file: %d", errno); + + logFile = XLogFileInit(0, 0); + + checkPoint.redo.xlogid = 0; + checkPoint.redo.xrecoff = SizeOfXLogPHD; + checkPoint.undo = checkPoint.redo; + checkPoint.nextXid = FirstTransactionId; + checkPoint.nextOid = BootstrapObjectIdData; + + memset(buffer, 0, BLCKSZ); + page->xlp_magic = XLOG_PAGE_MAGIC; + page->xlp_info = 0; + record = (XLogRecord*) ((char*)page + SizeOfXLogPHD); + record->xl_prev.xlogid = 0; record->xl_prev.xrecoff = 0; + record->xl_xact_prev = record->xl_prev; + record->xl_xid = InvalidTransactionId; + record->xl_len = sizeof(checkPoint); + record->xl_info = 0; + record->xl_rmid = RM_XLOG_ID; + memcpy((char*)record + SizeOfXLogRecord, &checkPoint, sizeof(checkPoint)); + + if (write(logFile, buffer, BLCKSZ) != BLCKSZ) + elog(STOP, "BootStrapXLOG failed to write logfile: %d", errno); + + if (fsync(logFile) != 0) + elog(STOP, "BootStrapXLOG failed to fsync logfile: %d", errno); + + close(logFile); + logFile = -1; + + memset(buffer, 0, BLCKSZ); + ControlFile = (ControlFileData*) buffer; + ControlFile->logId = 0; + ControlFile->logSeg = 1; + ControlFile->checkPoint = checkPoint.redo; + ControlFile->time = time(NULL); + ControlFile->state = DB_SHUTDOWNED; + + if (write(fd, buffer, BLCKSZ) != BLCKSZ) + elog(STOP, "BootStrapXLOG failed to write control file: %d", errno); + + if (fsync(fd) != 0) + elog(STOP, "BootStrapXLOG failed to fsync control file: %d", errno); + + close(fd); + + return; + +} + +/* + * This func must be called ONCE on system startup + */ +void +StartupXLOG() +{ + XLogCtlInsert *Insert = &XLogCtl->Insert; + CheckPoint checkPoint; + XLogRecPtr RecPtr, + LastRec; + XLogRecord *record; + char buffer[MAXLOGRECSZ+SizeOfXLogRecord]; + int fd; + bool found; + bool recovery = false; + bool sie_saved = false; + + elog(LOG, "Starting up XLOG manager..."); + + if (XLOGbuffers < MinXLOGbuffers) + XLOGbuffers = MinXLOGbuffers; + + ControlFile = (ControlFileData*) + ShmemInitStruct("Control File", BLCKSZ, &found); + Assert(!found); + XLogCtl = (XLogCtlData*) + ShmemInitStruct("XLOG Ctl", sizeof(XLogCtlData) + BLCKSZ * XLOGbuffers + + sizeof(XLogRecPtr) * XLOGbuffers, &found); + Assert(!found); + + XLogCtl->xlblocks = (XLogRecPtr*) (((char *)XLogCtl) + sizeof(XLogCtlData)); + XLogCtl->pages = ((char *)XLogCtl->xlblocks + sizeof(XLogRecPtr) * XLOGbuffers); + XLogCtl->XLogCacheByte = BLCKSZ * XLOGbuffers; + XLogCtl->XLogCacheBlck = XLOGbuffers - 1; + memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers); + XLogCtl->LgwrRqst = LgwrRqst; + XLogCtl->LgwrResult = LgwrResult; + XLogCtl->Insert.LgwrResult = LgwrResult; + XLogCtl->Insert.curridx = 0; + XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages); + XLogCtl->Write.LgwrResult = LgwrResult; + XLogCtl->Write.curridx = 0; + + /* + * Open/read Control file + */ +tryAgain: + fd = open(ControlFilePath, O_RDWR); + if (fd < 0 && (errno == EMFILE || errno == ENFILE)) + { + fd = errno; + if (!ReleaseDataFile()) + elog(STOP, "Open(cntlfile) failed: %d (and no one data file can be closed)", + fd); + goto tryAgain; + } + if (fd < 0) + elog(STOP, "Open(cntlfile) failed: %d", errno); + + if (read(fd, ControlFile, BLCKSZ) != BLCKSZ) + elog(STOP, "Read(cntlfile) failed: %d", errno); + + close(fd); + + if (ControlFile->logSeg == 0 || + ControlFile->time <= 0 || + ControlFile->state < DB_SHUTDOWNED || + ControlFile->state > DB_IN_PRODUCTION || + ControlFile->checkPoint.xlogid == 0 || + ControlFile->checkPoint.xrecoff == 0) + elog(STOP, "Control file context is broken"); + + if (ControlFile->state == DB_SHUTDOWNED) + elog(LOG, "Data Base System was properly shutdowned at %s", + ctime(&(ControlFile->time))); + else if (ControlFile->state == DB_SHUTDOWNING) + elog(LOG, "Data Base System was interrupted while shutting down at %s", + ctime(&(ControlFile->time))); + else if (ControlFile->state == DB_IN_RECOVERY) + { + elog(LOG, "Data Base System was interrupted being in recovery at %s\n" + "This propably means that some data blocks are corrupted\n" + "And you will have to use last backup for recovery", + ctime(&(ControlFile->time))); + } + else if (ControlFile->state == DB_IN_PRODUCTION) + elog(LOG, "Data Base System was interrupted being in production at %s", + ctime(&(ControlFile->time))); + + LastRec = RecPtr = ControlFile->checkPoint; + if (!XRecOffIsValid(RecPtr.xrecoff)) + elog(STOP, "Invalid checkPoint in control file"); + elog(LOG, "CheckPoint record at (%u, %u)", RecPtr.xlogid, RecPtr.xrecoff); + + record = ReadRecord(&RecPtr, buffer); + if (record->xl_rmid != RM_XLOG_ID) + elog(STOP, "Invalid RMID in checkPoint record"); + if (record->xl_len != sizeof(checkPoint)) + elog(STOP, "Invalid length of checkPoint record"); + checkPoint = *((CheckPoint*)((char*)record + SizeOfXLogRecord)); + + elog(LOG, "Redo record at (%u, %u); Undo record at (%u, %u)", + checkPoint.redo.xlogid, checkPoint.redo.xrecoff, + checkPoint.undo.xlogid, checkPoint.undo.xrecoff); + elog(LOG, "NextTransactionId: %u; NextOid: %u)", + checkPoint.nextXid, checkPoint.nextOid); + if (checkPoint.nextXid < FirstTransactionId || + checkPoint.nextOid < BootstrapObjectIdData) + elog(LOG, "Invalid NextTransactionId/NextOid"); + + ShmemVariableCache->nextXid = checkPoint.nextXid; + ShmemVariableCache->nextOid = checkPoint.nextOid; + + if (XLByteLT(RecPtr, checkPoint.redo)) + elog(STOP, "Invalid redo in checkPoint record"); + if (checkPoint.undo.xrecoff == 0) + checkPoint.undo = RecPtr; + if (XLByteLT(RecPtr, checkPoint.undo)) + elog(STOP, "Invalid undo in checkPoint record"); + + if (XLByteLT(checkPoint.undo, RecPtr) || XLByteLT(checkPoint.redo, RecPtr)) + { + if (ControlFile->state == DB_SHUTDOWNED) + elog(STOP, "Invalid Redo/Undo record in Shutdowned state"); + recovery = true; + } + else if (ControlFile->state != DB_SHUTDOWNED) + recovery = true; + + if (recovery) + { + elog(LOG, "The DataBase system was not properly shutdowned\n" + "Automatic recovery is in progress..."); + ControlFile->state = DB_IN_RECOVERY; + ControlFile->time = time(NULL); + UpdateControlFile(); + + sie_saved = StopIfError; + StopIfError = true; + + /* Is REDO required ? */ + if (XLByteLT(checkPoint.redo, RecPtr)) + record = ReadRecord(&(checkPoint.redo), buffer); + else /* read past CheckPoint record */ + record = ReadRecord(NULL, buffer); + + /* REDO */ + if (record->xl_len != 0) + { + elog(LOG, "Redo starts at (%u, %u)", + ReadRecPtr.xlogid, ReadRecPtr.xrecoff); + do + { + if (record->xl_xid >= ShmemVariableCache->nextXid) + ShmemVariableCache->nextXid = record->xl_xid + 1; + RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record); + record = ReadRecord(NULL, buffer); + } while (record->xl_len != 0); + elog(LOG, "Redo done at (%u, %u)", + ReadRecPtr.xlogid, ReadRecPtr.xrecoff); + LastRec = ReadRecPtr; + } + else + elog(LOG, "Redo is not required"); + /* UNDO */ + RecPtr = ReadRecPtr; + if (XLByteLT(checkPoint.undo, RecPtr)) + { + elog(LOG, "Undo starts at (%u, %u)", + RecPtr.xlogid, RecPtr.xrecoff); + do + { + record = ReadRecord(&RecPtr, buffer); + if (TransactionIdIsValid(record->xl_xid) && + !TransactionIdDidCommit(record->xl_xid)) + RmgrTable[record->xl_rmid].rm_undo(record); + RecPtr = record->xl_prev; + } while (XLByteLE(checkPoint.undo, RecPtr)); + elog(LOG, "Undo done at (%u, %u)", + ReadRecPtr.xlogid, ReadRecPtr.xrecoff); + } + else + elog(LOG, "Undo is not required"); + } + + /* Init xlog buffer cache */ + record = ReadRecord(&LastRec, buffer); + logId = EndRecPtr.xlogid; + logSeg = (EndRecPtr.xrecoff - 1) / XLogSegSize; + logOff = 0; + logFile = XLogFileOpen(logId, logSeg, false); + XLogCtl->xlblocks[0].xlogid = logId; + XLogCtl->xlblocks[0].xrecoff = + ((EndRecPtr.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ; + Insert->currpos = ((char*) Insert->currpage) + + (EndRecPtr.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff); + + if (recovery) + { + int i; + + /* + * Let resource managers know that recovery is done + */ + for (i = 0; i <= RM_MAX_ID; i++) + RmgrTable[record->xl_rmid].rm_redo(ReadRecPtr, NULL); + CreateCheckPoint(true); + StopIfError = sie_saved; + } + + ControlFile->state = DB_IN_PRODUCTION; + ControlFile->time = time(NULL); + UpdateControlFile(); + + return; +} + +static XLogRecord* +ReadRecord(XLogRecPtr *RecPtr, char *buffer) +{ + XLogRecord *record; + XLogRecPtr tmpRecPtr = EndRecPtr; + bool nextmode = (RecPtr == NULL); + int emode = (nextmode) ? LOG : STOP; + + if (nextmode) + { + RecPtr = &tmpRecPtr; + if (nextRecord != NULL) + { + record = nextRecord; + goto got_record; + } + if (tmpRecPtr.xrecoff % BLCKSZ != 0) + tmpRecPtr.xrecoff += (BLCKSZ - tmpRecPtr.xrecoff % BLCKSZ); + if (tmpRecPtr.xrecoff >= XLogFileSize) + { + (tmpRecPtr.xlogid)++; + tmpRecPtr.xrecoff = 0; + } + tmpRecPtr.xrecoff += SizeOfXLogPHD; + } + else if (!XRecOffIsValid(RecPtr->xrecoff)) + elog(STOP, "ReadRecord: invalid record offset in (%u, %u)", + RecPtr->xlogid, RecPtr->xrecoff); + + if (readFile >= 0 && (RecPtr->xlogid != readId || + RecPtr->xrecoff / XLogSegSize != readSeg)) + { + close(readFile); + readFile = -1; + } + readId = RecPtr->xlogid; + readSeg = RecPtr->xrecoff / XLogSegSize; + if (readFile < 0) + { + readOff = (off_t) -1; + readFile = XLogFileOpen(readId, readSeg, nextmode); + if (readFile < 0) + goto next_record_is_invalid; + } + + if (readOff < 0 || readOff != (RecPtr->xrecoff % XLogSegSize) / BLCKSZ) + { + readOff = (RecPtr->xrecoff % XLogSegSize) / BLCKSZ; + if (lseek(readFile, readOff * BLCKSZ, SEEK_SET) < 0) + elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d", + readId, readSeg, readOff, errno); + if (read(readFile, readBuf, BLCKSZ) != BLCKSZ) + elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %d", + readId, readSeg, readOff, errno); + if (((XLogPageHeader)readBuf)->xlp_magic != XLOG_PAGE_MAGIC) + { + elog(emode, "ReadRecord: invalid magic number %u in logfile %u seg %u off %u", + ((XLogPageHeader)readBuf)->xlp_magic, + readId, readSeg, readOff); + goto next_record_is_invalid; + } + } + if ((((XLogPageHeader)readBuf)->xlp_info & XLP_FIRST_IS_SUBRECORD) && + RecPtr->xrecoff % BLCKSZ == SizeOfXLogPHD) + { + elog(emode, "ReadRecord: subrecord is requested by (%u, %u)", + RecPtr->xlogid, RecPtr->xrecoff); + goto next_record_is_invalid; + } + record = (XLogRecord*)((char*) readBuf + RecPtr->xrecoff % BLCKSZ); + +got_record:; + if (record->xl_len == 0 || record->xl_len > + (BLCKSZ - RecPtr->xrecoff % BLCKSZ - SizeOfXLogRecord)) + { + elog(emode, "ReadRecord: invalid record len %u in (%u, %u)", + record->xl_len, RecPtr->xlogid, RecPtr->xrecoff); + goto next_record_is_invalid; + } + if (record->xl_rmid > RM_MAX_ID) + { + elog(emode, "ReadRecord: invalid resource managed id %u in (%u, %u)", + record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff); + goto next_record_is_invalid; + } + nextRecord = NULL; + if (record->xl_info & XLR_TO_BE_CONTINUED) + { + XLogSubRecord *subrecord; + uint32 len = record->xl_len; + + if (record->xl_len + RecPtr->xrecoff % BLCKSZ + SizeOfXLogRecord != BLCKSZ) + { + elog(emode, "ReadRecord: invalid fragmented record len %u in (%u, %u)", + record->xl_len, RecPtr->xlogid, RecPtr->xrecoff); + goto next_record_is_invalid; + } + memcpy(buffer, record, record->xl_len + SizeOfXLogRecord); + record = (XLogRecord*) buffer; + buffer += record->xl_len + SizeOfXLogRecord; + for ( ; ; ) + { + readOff++; + if (readOff == XLogSegSize / BLCKSZ) + { + readSeg++; + if (readSeg == XLogLastSeg) + { + readSeg = 0; + readId++; + } + close(readFile); + readOff = (off_t) 0; + readFile = XLogFileOpen(readId, readSeg, nextmode); + if (readFile < 0) + goto next_record_is_invalid; + } + if (read(readFile, readBuf, BLCKSZ) != BLCKSZ) + elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %d", + readId, readSeg, readOff, errno); + if (((XLogPageHeader)readBuf)->xlp_magic != XLOG_PAGE_MAGIC) + { + elog(emode, "ReadRecord: invalid magic number %u in logfile %u seg %u off %u", + ((XLogPageHeader)readBuf)->xlp_magic, + readId, readSeg, readOff); + goto next_record_is_invalid; + } + if (!(((XLogPageHeader)readBuf)->xlp_info & XLP_FIRST_IS_SUBRECORD)) + { + elog(emode, "ReadRecord: there is no subrecord flag in logfile %u seg %u off %u", + readId, readSeg, readOff); + goto next_record_is_invalid; + } + subrecord = (XLogSubRecord*)((char*) readBuf + SizeOfXLogPHD); + if (subrecord->xl_len == 0 || subrecord->xl_len > + (BLCKSZ - SizeOfXLogPHD - SizeOfXLogSubRecord)) + { + elog(emode, "ReadRecord: invalid subrecord len %u in logfile %u seg %u off %u", + subrecord->xl_len, readId, readSeg, readOff); + goto next_record_is_invalid; + } + len += subrecord->xl_len; + if (len > MAXLOGRECSZ) + { + elog(emode, "ReadRecord: too long record len %u in (%u, %u)", + len, RecPtr->xlogid, RecPtr->xrecoff); + goto next_record_is_invalid; + } + memcpy(buffer, (char*)subrecord + SizeOfXLogSubRecord, subrecord->xl_len); + buffer += subrecord->xl_len; + if (subrecord->xl_info & XLR_TO_BE_CONTINUED) + { + if (subrecord->xl_len + + SizeOfXLogPHD + SizeOfXLogSubRecord != BLCKSZ) + { + elog(emode, "ReadRecord: invalid fragmented subrecord len %u in logfile %u seg %u off %u", + subrecord->xl_len, readId, readSeg, readOff); + goto next_record_is_invalid; + } + continue; + } + break; + } + if (BLCKSZ - SizeOfXLogRecord >= + subrecord->xl_len + SizeOfXLogPHD + SizeOfXLogSubRecord) + { + nextRecord = (XLogRecord*) + ((char*)subrecord + subrecord->xl_len + SizeOfXLogSubRecord); + } + EndRecPtr.xlogid = readId; + EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff * BLCKSZ + + SizeOfXLogPHD + SizeOfXLogSubRecord + subrecord->xl_len; + ReadRecPtr = *RecPtr; + return(record); + } + if (BLCKSZ - SizeOfXLogRecord >= + record->xl_len + RecPtr->xrecoff % BLCKSZ + SizeOfXLogRecord) + { + nextRecord = (XLogRecord*)((char*)record + record->xl_len + SizeOfXLogRecord); + } + EndRecPtr.xlogid = RecPtr->xlogid; + EndRecPtr.xrecoff = RecPtr->xrecoff + record->xl_len + SizeOfXLogRecord; + ReadRecPtr = *RecPtr; + + return(record); + +next_record_is_invalid:; + close(readFile); + readFile = -1; + nextRecord = NULL; + memset(buffer, 0, SizeOfXLogRecord); + record = (XLogRecord*) buffer; + /* + * If we assumed that next record began on the same page where + * previous one ended - zero end of page. + */ + if (XLByteEQ(tmpRecPtr, EndRecPtr)) + { + Assert (EndRecPtr.xrecoff % BLCKSZ > (SizeOfXLogPHD + SizeOfXLogSubRecord) && + BLCKSZ - EndRecPtr.xrecoff % BLCKSZ >= SizeOfXLogRecord); + readId = EndRecPtr.xlogid; + readSeg = EndRecPtr.xrecoff / XLogSegSize; + readOff = (EndRecPtr.xrecoff % XLogSegSize) / BLCKSZ; + elog(LOG, "Formating logfile %u seg %u block %u at offset %u", + readId, readSeg, readOff, EndRecPtr.xrecoff % BLCKSZ); + readFile = XLogFileOpen(readId, readSeg, false); + if (lseek(readFile, readOff * BLCKSZ, SEEK_SET) < 0) + elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d", + readId, readSeg, readOff, errno); + if (read(readFile, readBuf, BLCKSZ) != BLCKSZ) + elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %d", + readId, readSeg, readOff, errno); + memset(readBuf + EndRecPtr.xrecoff % BLCKSZ, 0, + BLCKSZ - EndRecPtr.xrecoff % BLCKSZ); + if (lseek(readFile, readOff * BLCKSZ, SEEK_SET) < 0) + elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d", + readId, readSeg, readOff, errno); + if (write(readFile, readBuf, BLCKSZ) != BLCKSZ) + elog(STOP, "ReadRecord: write(logfile %u seg %u off %u) failed: %d", + readId, readSeg, readOff, errno); + readOff++; + } + else + { + Assert (EndRecPtr.xrecoff % BLCKSZ == 0 || + BLCKSZ - EndRecPtr.xrecoff % BLCKSZ < SizeOfXLogRecord); + readId = tmpRecPtr.xlogid; + readSeg = tmpRecPtr.xrecoff / XLogSegSize; + readOff = (tmpRecPtr.xrecoff % XLogSegSize) / BLCKSZ; + } + if (readOff > 0) + { + elog(LOG, "Formating logfile %u seg %u block %u at offset 0", + readId, readSeg, readOff); + readOff *= BLCKSZ; + memset(readBuf, 0, BLCKSZ); + readFile = XLogFileOpen(readId, readSeg, false); + if (lseek(readFile, readOff, SEEK_SET) < 0) + elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d", + readId, readSeg, readOff, errno); + while (readOff < XLogSegSize) + { + if (write(readFile, readBuf, BLCKSZ) != BLCKSZ) + elog(STOP, "ReadRecord: write(logfile %u seg %u off %u) failed: %d", + readId, readSeg, readOff, errno); + readOff += BLCKSZ; + } + } + if (readFile >= 0) + { + if (fsync(readFile) < 0) + elog(STOP, "ReadRecord: fsync(logfile %u seg %u) failed: %d", + readId, readSeg, errno); + close(readFile); + readFile = -1; + } + + readId = EndRecPtr.xlogid; + readSeg = (EndRecPtr.xrecoff - 1) / XLogSegSize + 1; + elog(LOG, "The last logId/logSeg is (%u, %u)", readId, readSeg - 1); + if (ControlFile->logId != readId || ControlFile->logSeg != readSeg) + { + elog(LOG, "Set logId/logSeg in control file"); + ControlFile->logId = readId; + ControlFile->logSeg = readSeg; + ControlFile->time = time(NULL); + UpdateControlFile(); + } + if (readSeg == XLogLastSeg) + { + readSeg = 0; + readId++; + } + { + char path[MAXPGPATH+1]; + + XLogFileName(path, readId, readSeg); + unlink(path); + } + + return(record); +} + +void +CreateCheckPoint(bool shutdown) +{ + CheckPoint checkPoint; + XLogRecPtr recptr; + XLogCtlInsert *Insert = &XLogCtl->Insert; + uint32 freespace; + uint16 curridx; + + memset(&checkPoint, 0, sizeof(checkPoint)); + if (shutdown) + { + ControlFile->state = DB_SHUTDOWNING; + ControlFile->time = time(NULL); + UpdateControlFile(); + } + + /* Get REDO record ptr */ + while (!TAS(&(XLogCtl->insert_lck))) + { + struct timeval delay; + + if (shutdown) + elog(STOP, "XLog insert lock is busy while data base is shutting down"); + delay.tv_sec = 0; + delay.tv_usec = 0; + (void) select(0, NULL, NULL, NULL, &delay); + } + freespace = ((char*) Insert->currpage) + BLCKSZ - Insert->currpos; + if (freespace < SizeOfXLogRecord) + { + curridx = NextBufIdx(Insert->curridx); + if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write)) + InitXLBuffer(curridx); + else + GetFreeXLBuffer(); + freespace = BLCKSZ - SizeOfXLogPHD; + } + else + curridx = Insert->curridx; + checkPoint.redo.xlogid = XLogCtl->xlblocks[curridx].xlogid; + checkPoint.redo.xrecoff = XLogCtl->xlblocks[curridx].xrecoff - BLCKSZ + + Insert->currpos - ((char*) Insert->currpage); + S_UNLOCK(&(XLogCtl->insert_lck)); + + SpinAcquire(XidGenLockId); + checkPoint.nextXid = ShmemVariableCache->nextXid; + SpinRelease(XidGenLockId); + SpinAcquire(OidGenLockId); + checkPoint.nextOid = ShmemVariableCache->nextOid; + SpinRelease(OidGenLockId); + + FlushBufferPool(false); + + /* Get UNDO record ptr */ + + if (shutdown && checkPoint.undo.xrecoff != 0) + elog(STOP, "Active transaction while data base is shutting down"); + + recptr = XLogInsert(RM_XLOG_ID, (char*)&checkPoint, sizeof(checkPoint), NULL, 0); + + if (shutdown && !XLByteEQ(checkPoint.redo, MyLastRecPtr)) + elog(STOP, "XLog concurrent activity while data base is shutting down"); + + XLogFlush(recptr); + + SpinAcquire(ControlFileLockId); + if (shutdown) + ControlFile->state = DB_SHUTDOWNED; + ControlFile->checkPoint = MyLastRecPtr; + ControlFile->time = time(NULL); + UpdateControlFile(); + SpinRelease(ControlFileLockId); + + return; +} diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index a44fe726bfd1b6898ded85e65210677895f73991..32f5aa253ccf26c7e70bdc0fe6429a5a8d60a605 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -10,7 +10,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/postmaster/postmaster.c,v 1.117 1999/09/27 03:13:05 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/postmaster/postmaster.c,v 1.118 1999/09/27 15:47:43 vadim Exp $ * * NOTES * @@ -204,9 +204,10 @@ static int SendStop = false; static bool NetServer = false; /* if not zero, postmaster listen for * non-local connections */ +#ifdef USE_SSL static bool SecureNetServer = false; /* if not zero, postmaster listens for only SSL * non-local connections */ - +#endif /* * GH: For !HAVE_SIGPROCMASK (NEXTSTEP), TRH implemented an @@ -990,12 +991,17 @@ readStartupPacket(void *arg, PacketLen len, void *pkt) /* Could add additional special packet types here */ - /* Any SSL negotiation must have taken place here, so drop the connection - * ASAP if we require SSL */ - if (SecureNetServer && !port->ssl) { - PacketSendError(&port->pktInfo, "Backend requires secure connection."); - return STATUS_OK; +#ifdef USE_SSL + /* + * Any SSL negotiation must have taken place here, so drop the connection + * ASAP if we require SSL + */ + if (SecureNetServer && !port->ssl) + { + PacketSendError(&port->pktInfo, "Backend requires secure connection."); + return STATUS_OK; } +#endif /* Check we can handle the protocol the frontend is using. */ @@ -1832,6 +1838,7 @@ CountChildren(void) } +#ifdef USE_SSL /* * Initialize SSL library and structures */ @@ -1860,3 +1867,4 @@ static void InitSSL(void) { exit(1); } } +#endif diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c index c279528974cd797dd9031988dbb4ee50ed26b1b0..4cdb638819e81b2be731b4d61b1da56b0dc241d8 100644 --- a/src/backend/storage/file/fd.c +++ b/src/backend/storage/file/fd.c @@ -6,7 +6,7 @@ * Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Id: fd.c,v 1.47 1999/07/17 20:17:42 momjian Exp $ + * $Id: fd.c,v 1.48 1999/09/27 15:47:49 vadim Exp $ * * NOTES: * @@ -49,6 +49,7 @@ #include "miscadmin.h" #include "storage/fd.h" +bool ReleaseDataFile(void); /* * Problem: Postgres does a system(ld...) to do dynamic loading. * This will open several extra files in addition to those used by @@ -410,6 +411,19 @@ ReleaseLruFile() LruDelete(VfdCache[0].lruMoreRecently); } +bool +ReleaseDataFile() +{ + DO_DB(elog(DEBUG, "ReleaseDataFile. Opened %d", nfile)); + + if (nfile <= 0) + return(false); + Assert(VfdCache[0].lruMoreRecently != 0); + LruDelete(VfdCache[0].lruMoreRecently); + + return(true); +} + static File AllocateVfd() { diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 26735a5d80672dc6ff39608399b6dfaa56b64922..9a5b7935db8874a5bd95b1c3457525c758a5db5f 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -9,7 +9,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/tcop/utility.c,v 1.66 1999/09/23 17:02:52 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/tcop/utility.c,v 1.67 1999/09/27 15:47:54 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -220,13 +220,13 @@ ProcessUtility(Node *parsetree, relname); } - rel = heap_openr(relname); + rel = heap_openr(relname, AccessExclusiveLock); if (RelationIsValid(rel)) { if (rel->rd_rel->relkind == RELKIND_SEQUENCE) { elog(ERROR, "TRUNCATE cannot be used on sequences. '%s' is a sequence", relname); } - heap_close(rel); + heap_close(rel, NoLock); } #ifndef NO_SECURITY if (!pg_ownercheck(userName, relname, RELNAME)) { diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h new file mode 100644 index 0000000000000000000000000000000000000000..d8bde166c2cb379d5e4a15097a1c8ff764133b4f --- /dev/null +++ b/src/include/access/rmgr.h @@ -0,0 +1,34 @@ +/* + * + * rmgr.h + * + * Resource managers description table + * + */ +#ifndef RMGR_H +#define RMGR_H + +typedef uint8 RmgrId; + +typedef struct RmgrData +{ + char *rm_name; + char *(*rm_redo) (); /* REDO(XLogRecPtr rptr) */ + char *(*rm_undo) (); /* UNDO(XLogRecPtr rptr) */ +} RmgrData; + +extern RmgrData *RmgrTable; + +/* + * Built-in resource managers + */ +#define RM_XLOG_ID 0 +#define RM_XACT_ID 1 +#define RM_HEAP_ID 2 +#define RM_BTREE_ID 3 +#define RM_HASH_ID 4 +#define RM_RTREE_ID 5 +#define RM_GIST_ID 6 +#define RM_MAX_ID RM_GIST_ID + +#endif /* RMGR_H */ diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h new file mode 100644 index 0000000000000000000000000000000000000000..5c8c0754246b044bdd1e072b8ecd0df5515a7a40 --- /dev/null +++ b/src/include/access/xlog.h @@ -0,0 +1,70 @@ +/* + * + * xlog.h + * + * Postgres transaction log manager + * + */ +#ifndef XLOG_H +#define XLOG_H + +#include "access/rmgr.h" +#include "access/transam.h" + +typedef struct XLogRecPtr +{ + uint32 xlogid; /* log file #, 0 based */ + uint32 xrecoff; /* offset of record in log file */ +} XLogRecPtr; + +typedef struct XLogRecord +{ + XLogRecPtr xl_prev; /* ptr to previous record in log */ + XLogRecPtr xl_xact_prev; /* ptr to previous record of this xact */ + TransactionId xl_xid; /* xact id */ + uint16 xl_len; /* len of record on this page */ + uint8 xl_info; + RmgrId xl_rmid; /* resource manager inserted this record */ + + /* ACTUAL LOG DATA FOLLOWS AT END OF STRUCT */ + +} XLogRecord; + +#define SizeOfXLogRecord DOUBLEALIGN(sizeof(XLogRecord)) +#define MAXLOGRECSZ (2 * BLCKSZ) +/* + * When there is no space on current page we continue on the next + * page with subrecord. + */ +typedef struct XLogSubRecord +{ + uint16 xl_len; + uint8 xl_info; + + /* ACTUAL LOG DATA FOLLOWS AT END OF STRUCT */ + +} XLogSubRecord; + +#define SizeOfXLogSubRecord DOUBLEALIGN(sizeof(XLogSubRecord)) + +#define XLR_TO_BE_CONTINUED 0x01 + +#define XLOG_PAGE_MAGIC 0x17345168 + +typedef struct XLogPageHeaderData +{ + uint32 xlp_magic; + uint16 xlp_info; +} XLogPageHeaderData; + +#define SizeOfXLogPHD DOUBLEALIGN(sizeof(XLogPageHeaderData)) + +typedef XLogPageHeaderData *XLogPageHeader; + +#define XLP_FIRST_IS_SUBRECORD 0x0001 + +extern XLogRecPtr XLogInsert(RmgrId rmid, char *hdr, uint32 hdrlen, + char *buf, uint32 buflen); +extern void XLogFlush(XLogRecPtr RecPtr); + +#endif /* XLOG_H */ diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index d28e936b33fa7984f85f9ee43c2599cd7893a1f2..44a5fbd313b28ceac6cbbc07f711cf5c83505436 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -6,13 +6,14 @@ * * Copyright (c) 1994, Regents of the University of California * - * $Id: proc.h,v 1.26 1999/09/24 00:25:27 tgl Exp $ + * $Id: proc.h,v 1.27 1999/09/27 15:48:06 vadim Exp $ * *------------------------------------------------------------------------- */ #ifndef _PROC_H_ #define _PROC_H_ +#include "access/xlog.h" #include "storage/lock.h" typedef struct @@ -47,7 +48,7 @@ typedef struct proc TransactionId xmin; /* minimal running XID as it was when we * were starting our xact: vacuum must not * remove tuples deleted by xid >= xmin ! */ - + XLogRecPtr logRec; LOCK *waitLock; /* Lock we're sleeping on ... */ int token; /* type of lock we sleeping for */ int holdLock; /* while holding these locks */ diff --git a/src/include/utils/elog.h b/src/include/utils/elog.h index 5f64a6b1c7723e30c69d89dc62f3fd98b587ba9c..e69ef11d7a3cac1023d6f032b8583e472109691d 100644 --- a/src/include/utils/elog.h +++ b/src/include/utils/elog.h @@ -6,7 +6,7 @@ * * Copyright (c) 1994, Regents of the University of California * - * $Id: elog.h,v 1.12 1999/09/11 19:06:25 tgl Exp $ + * $Id: elog.h,v 1.13 1999/09/27 15:48:12 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -17,7 +17,9 @@ #define ERROR (-1) /* user error - return to known state */ #define FATAL 1 /* fatal error - abort process */ #define REALLYFATAL 2 /* take down the other backends with me */ +#define STOP REALLYFATAL #define DEBUG (-2) /* debug message */ +#define LOG DEBUG #define NOIND (-3) /* debug message, don't indent as far */ extern void elog(int lev, const char *fmt, ...);