提交 50f35e7b 编写于 作者: A Ashwin Agrawal

pgindent cdb directory (part-1).

上级 ec5bff25
......@@ -19,12 +19,13 @@
#ifdef USE_SEGWALREP
#include "cdb/cdbappendonlyam.h"
#endif /* USE_SEGWALREP */
#endif /* USE_SEGWALREP */
int32 AppendOnlyStorage_GetUsableBlockSize(int32 configBlockSize)
int32
AppendOnlyStorage_GetUsableBlockSize(int32 configBlockSize)
{
int32 result;
int32 result;
if (configBlockSize > AOSmallContentHeader_MaxLength)
result = AOSmallContentHeader_MaxLength;
......@@ -35,7 +36,7 @@ int32 AppendOnlyStorage_GetUsableBlockSize(int32 configBlockSize)
* Round down to 32-bit boundary.
*/
result = (result / sizeof(uint32)) * sizeof(uint32);
return result;
}
......@@ -44,9 +45,9 @@ void
appendonly_redo(XLogRecPtr beginLoc, XLogRecPtr lsn, XLogRecord *record)
{
/*
* Perform redo of AO XLOG records only for standby mode. We do
* not need to replay AO XLOG records in normal mode because fsync
* is performed on file close.
* Perform redo of AO XLOG records only for standby mode. We do not need
* to replay AO XLOG records in normal mode because fsync is performed on
* file close.
*/
if (IsStandbyMode())
ao_xlog_insert(record);
......@@ -55,9 +56,9 @@ appendonly_redo(XLogRecPtr beginLoc, XLogRecPtr lsn, XLogRecord *record)
void
appendonly_desc(StringInfo buf, XLogRecPtr beginLoc, XLogRecord *record)
{
xl_ao_insert *xlrec = (xl_ao_insert *)XLogRecGetData(record);
uint8 xl_info = record->xl_info;
uint8 info = xl_info & ~XLR_INFO_MASK;
xl_ao_insert *xlrec = (xl_ao_insert *) XLogRecGetData(record);
uint8 xl_info = record->xl_info;
uint8 info = xl_info & ~XLR_INFO_MASK;
if (info == XLOG_APPENDONLY_INSERT)
{
......@@ -69,4 +70,4 @@ appendonly_desc(StringInfo buf, XLogRecPtr beginLoc, XLogRecord *record)
else
appendStringInfo(buf, "UNKNOWN");
}
#endif /* USE_SEGWALREP */
#endif /* USE_SEGWALREP */
......@@ -100,7 +100,7 @@ AppendOnlyStorageRead_Init(AppendOnlyStorageRead *storageRead,
storageRead->minimumHeaderLen =
AppendOnlyStorageFormat_RegularHeaderLenNeeded(
storageRead->storageAttributes.checksum);
storageRead->storageAttributes.checksum);
/*
* Initialize BufferedRead.
......@@ -229,6 +229,7 @@ AppendOnlyStorageRead_DoOpenFile(AppendOnlyStorageRead *storageRead,
char *filePathName)
{
int fileFlags = O_RDONLY | PG_BINARY;
/* File mode is S_IRUSR 00400 user has read permission */
int fileMode = 0400;
File file;
......@@ -554,8 +555,8 @@ AppendOnlyStorageRead_DoSkipPadding(AppendOnlyStorageRead *storageRead,
}
/*
* UNDONE: For verification purposes, we should verify the
* remainder is all zeroes.
* UNDONE: For verification purposes, we should verify the remainder
* is all zeroes.
*/
elogif(Debug_appendonly_print_scan, LOG,
......@@ -638,8 +639,7 @@ AppendOnlyStorageRead_PositionToNextBlock(AppendOnlyStorageRead *storageRead,
* partially full page.
*/
AppendOnlyStorageRead_DoSkipPadding(storageRead,
-1 /* means till end of
* page */ );
-1 /* means till end of page */ );
/*
* Now try to get the peek data from the new page.
......@@ -659,13 +659,13 @@ AppendOnlyStorageRead_PositionToNextBlock(AppendOnlyStorageRead *storageRead,
if (availableLen != storageRead->minimumHeaderLen)
ereport(ERROR,
(errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("Expected %d bytes and found %d bytes in table %s (segment file '%s', header offset in file = " INT64_FORMAT ", bufferCount " INT64_FORMAT ")",
storageRead->minimumHeaderLen,
availableLen,
storageRead->relationName,
storageRead->segmentFileName,
*headerOffsetInFile,
storageRead->bufferCount)));
errmsg("Expected %d bytes and found %d bytes in table %s (segment file '%s', header offset in file = " INT64_FORMAT ", bufferCount " INT64_FORMAT ")",
storageRead->minimumHeaderLen,
availableLen,
storageRead->relationName,
storageRead->segmentFileName,
*headerOffsetInFile,
storageRead->bufferCount)));
break;
}
}
......@@ -731,7 +731,7 @@ AppendOnlyStorageRead_StorageContentHeaderStr(AppendOnlyStorageRead *storageRead
header = BufferedReadGetCurrentBuffer(&storageRead->bufferedRead);
return AppendOnlyStorageFormat_BlockHeaderStr(header,
storageRead->storageAttributes.checksum,
storageRead->storageAttributes.checksum,
storageRead->formatVersion);
}
......@@ -765,8 +765,8 @@ AppendOnlyStorageRead_LogBlockHeader(AppendOnlyStorageRead *storageRead,
blockHeaderStr =
AppendOnlyStorageFormat_SmallContentHeaderStr(header,
storageRead->storageAttributes.checksum,
storageRead->formatVersion);
storageRead->storageAttributes.checksum,
storageRead->formatVersion);
ereport(LOG,
(errmsg("%s. %s",
contextStr,
......@@ -817,7 +817,7 @@ AppendOnlyStorageRead_ReadNextBlock(AppendOnlyStorageRead *storageRead)
storageRead->current.headerOffsetInFile, storageRead->current.overallBlockLen);
if (!AppendOnlyStorageRead_PositionToNextBlock(storageRead,
&storageRead->current.headerOffsetInFile,
&storageRead->current.headerOffsetInFile,
&header,
&blockLimitLen))
{
......@@ -848,7 +848,7 @@ AppendOnlyStorageRead_ReadNextBlock(AppendOnlyStorageRead *storageRead)
errmsg("Header checksum does not match. Expected 0x%X and found 0x%X ",
storedChecksum,
computedChecksum),
errdetail_appendonly_read_storage_content_header(storageRead),
errdetail_appendonly_read_storage_content_header(storageRead),
errcontext_appendonly_read_storage_block(storageRead)));
}
......@@ -856,16 +856,16 @@ AppendOnlyStorageRead_ReadNextBlock(AppendOnlyStorageRead *storageRead)
* Check the (basic) header information.
*/
checkError = AppendOnlyStorageFormat_GetHeaderInfo(header,
storageRead->storageAttributes.checksum,
&storageRead->current.headerKind,
&storageRead->current.actualHeaderLen);
storageRead->storageAttributes.checksum,
&storageRead->current.headerKind,
&storageRead->current.actualHeaderLen);
if (checkError != AOHeaderCheckOk)
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("Bad append-only storage header. Header check error %d, detail '%s'",
(int) checkError,
AppendOnlyStorageFormat_GetHeaderCheckErrorStr()),
errdetail_appendonly_read_storage_content_header(storageRead),
errdetail_appendonly_read_storage_content_header(storageRead),
errcontext_appendonly_read_storage_block(storageRead)));
/*
......@@ -884,14 +884,14 @@ AppendOnlyStorageRead_ReadNextBlock(AppendOnlyStorageRead *storageRead)
availableLen != storageRead->current.actualHeaderLen)
ereport(ERROR,
(errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("Expected %d bytes and found %d bytes in table %s "
"(segment file '%s', header offset in file = " INT64_FORMAT ", bufferCount " INT64_FORMAT ")",
storageRead->current.actualHeaderLen,
availableLen,
storageRead->relationName,
storageRead->segmentFileName,
storageRead->current.headerOffsetInFile,
storageRead->bufferCount)));
errmsg("Expected %d bytes and found %d bytes in table %s "
"(segment file '%s', header offset in file = " INT64_FORMAT ", bufferCount " INT64_FORMAT ")",
storageRead->current.actualHeaderLen,
availableLen,
storageRead->relationName,
storageRead->segmentFileName,
storageRead->current.headerOffsetInFile,
storageRead->bufferCount)));
}
/*
......@@ -925,9 +925,9 @@ AppendOnlyStorageRead_ReadNextBlock(AppendOnlyStorageRead *storageRead)
(errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("Bad append-only storage header of type small content. Header check error %d, detail '%s'",
(int) checkError,
AppendOnlyStorageFormat_GetHeaderCheckErrorStr()),
errdetail_appendonly_read_storage_content_header(storageRead),
errcontext_appendonly_read_storage_block(storageRead)));
AppendOnlyStorageFormat_GetHeaderCheckErrorStr()),
errdetail_appendonly_read_storage_content_header(storageRead),
errcontext_appendonly_read_storage_block(storageRead)));
break;
case AoHeaderKind_LargeContent:
......@@ -949,9 +949,9 @@ AppendOnlyStorageRead_ReadNextBlock(AppendOnlyStorageRead *storageRead)
(errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("Bad append-only storage header of type large content. Header check error %d, detail '%s'",
(int) checkError,
AppendOnlyStorageFormat_GetHeaderCheckErrorStr()),
errdetail_appendonly_read_storage_content_header(storageRead),
errcontext_appendonly_read_storage_block(storageRead)));
AppendOnlyStorageFormat_GetHeaderCheckErrorStr()),
errdetail_appendonly_read_storage_content_header(storageRead),
errcontext_appendonly_read_storage_block(storageRead)));
storageRead->current.isLarge = true;
break;
......@@ -980,9 +980,9 @@ AppendOnlyStorageRead_ReadNextBlock(AppendOnlyStorageRead *storageRead)
(errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("Bad append-only storage header of type non-bulk dense content. Header check error %d, detail '%s'",
(int) checkError,
AppendOnlyStorageFormat_GetHeaderCheckErrorStr()),
errdetail_appendonly_read_storage_content_header(storageRead),
errcontext_appendonly_read_storage_block(storageRead)));
AppendOnlyStorageFormat_GetHeaderCheckErrorStr()),
errdetail_appendonly_read_storage_content_header(storageRead),
errcontext_appendonly_read_storage_block(storageRead)));
break;
case AoHeaderKind_BulkDenseContent:
......@@ -1012,9 +1012,9 @@ AppendOnlyStorageRead_ReadNextBlock(AppendOnlyStorageRead *storageRead)
(errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("Bad append-only storage header of type bulk dense content. Header check error %d, detail '%s'",
(int) checkError,
AppendOnlyStorageFormat_GetHeaderCheckErrorStr()),
errdetail_appendonly_read_storage_content_header(storageRead),
errcontext_appendonly_read_storage_block(storageRead)));
AppendOnlyStorageFormat_GetHeaderCheckErrorStr()),
errdetail_appendonly_read_storage_content_header(storageRead),
errcontext_appendonly_read_storage_block(storageRead)));
break;
default:
......@@ -1151,7 +1151,7 @@ AppendOnlyStorageRead_InternalGetBuffer(AppendOnlyStorageRead *storageRead,
* Verify next block is type Block.
*/
Assert(storageRead->current.headerKind == AoHeaderKind_SmallContent ||
storageRead->current.headerKind == AoHeaderKind_NonBulkDenseContent ||
storageRead->current.headerKind == AoHeaderKind_NonBulkDenseContent ||
storageRead->current.headerKind == AoHeaderKind_BulkDenseContent);
/*
......@@ -1173,7 +1173,7 @@ AppendOnlyStorageRead_InternalGetBuffer(AppendOnlyStorageRead *storageRead,
errmsg("Wrong buffer length. Expected %d byte length buffer and got %d ",
storageRead->current.overallBlockLen,
availableLen),
errdetail_appendonly_read_storage_content_header(storageRead),
errdetail_appendonly_read_storage_content_header(storageRead),
errcontext_appendonly_read_storage_block(storageRead)));
if (storageRead->storageAttributes.checksum &&
......@@ -1184,7 +1184,7 @@ AppendOnlyStorageRead_InternalGetBuffer(AppendOnlyStorageRead *storageRead,
* the header with the checksum of the data portion.
*/
if (!AppendOnlyStorageFormat_VerifyBlockChecksum(*header,
storageRead->current.overallBlockLen,
storageRead->current.overallBlockLen,
&storedChecksum,
&computedChecksum))
ereport(ERROR,
......@@ -1192,7 +1192,7 @@ AppendOnlyStorageRead_InternalGetBuffer(AppendOnlyStorageRead *storageRead,
errmsg("Block checksum does not match. Expected 0x%X and found 0x%X",
storedChecksum,
computedChecksum),
errdetail_appendonly_read_storage_content_header(storageRead),
errdetail_appendonly_read_storage_content_header(storageRead),
errcontext_appendonly_read_storage_block(storageRead)));
}
......@@ -1218,7 +1218,7 @@ AppendOnlyStorageRead_GetBuffer(AppendOnlyStorageRead *storageRead)
* Verify next block is a "small" non-compressed block.
*/
Assert(storageRead->current.headerKind == AoHeaderKind_SmallContent ||
storageRead->current.headerKind == AoHeaderKind_NonBulkDenseContent ||
storageRead->current.headerKind == AoHeaderKind_NonBulkDenseContent ||
storageRead->current.headerKind == AoHeaderKind_BulkDenseContent);
Assert(!storageRead->current.isLarge);
Assert(!storageRead->current.isCompressed);
......@@ -1255,17 +1255,16 @@ AppendOnlyStorageRead_Content(AppendOnlyStorageRead *storageRead,
if (storageRead->current.isLarge)
{
int64 largeContentPosition; /* Position of the large
* content metadata block. */
int64 largeContentPosition; /* Position of the large content
* metadata block. */
int32 largeContentLen; /* Total length of the large content. */
int32 remainingLargeContentLen; /* The remaining number of
* bytes to read for the large
* content. */
uint8 *contentNext;/* Pointer inside the contentOut buffer to put
* the next byte. */
int32 regularBlockReadCount; /* Number of regular blocks
* read after the metadata
* block. */
uint8 *contentNext; /* Pointer inside the contentOut buffer to
* put the next byte. */
int32 regularBlockReadCount; /* Number of regular blocks read
* after the metadata block. */
int32 regularContentLen; /* Length of the current regular
* block's content. */
......@@ -1342,7 +1341,7 @@ AppendOnlyStorageRead_Content(AppendOnlyStorageRead *storageRead,
(errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("Too much data found after reading %d blocks for large content in segment file '%s' of table '%s'. "
"Large content metadata block is at position " INT64_FORMAT " "
"Large content length %d; extra data length %d",
"Large content length %d; extra data length %d",
regularBlockReadCount,
storageRead->segmentFileName,
storageRead->relationName,
......@@ -1396,7 +1395,7 @@ AppendOnlyStorageRead_Content(AppendOnlyStorageRead *storageRead,
if (Debug_appendonly_print_scan)
elog(LOG,
"Append-only Storage Read non-compressed block for table '%s' "
"(length = %d, segment file '%s', header offset in file = "
"(length = %d, segment file '%s', header offset in file = "
INT64_FORMAT ", block count " INT64_FORMAT ")",
storageRead->relationName,
storageRead->current.uncompressedLen,
......@@ -1429,7 +1428,7 @@ AppendOnlyStorageRead_Content(AppendOnlyStorageRead *storageRead,
if (Debug_appendonly_print_scan)
elog(LOG,
"Append-only Storage Read decompressed block for table '%s' "
"Append-only Storage Read decompressed block for table '%s' "
"(compressed length %d, uncompressed length = %d, segment file '%s', "
"header offset in file = " INT64_FORMAT ", block count " INT64_FORMAT ")",
storageRead->relationName,
......@@ -1459,15 +1458,14 @@ AppendOnlyStorageRead_SkipCurrentBlock(AppendOnlyStorageRead *storageRead)
if (storageRead->current.isLarge)
{
int64 largeContentPosition; /* Position of the large
* content metadata block. */
int64 largeContentPosition; /* Position of the large content
* metadata block. */
int32 largeContentLen; /* Total length of the large content. */
int32 remainingLargeContentLen; /* Remaining number of bytes
* to read for the large
* content. */
int32 regularBlockReadCount; /* Number of regular blocks
* read after the metadata
* block. */
int32 regularBlockReadCount; /* Number of regular blocks read
* after the metadata block. */
int32 regularContentLen; /* Length of the current regular
* block's content. */
int32 availableLen;
......@@ -1540,12 +1538,12 @@ AppendOnlyStorageRead_SkipCurrentBlock(AppendOnlyStorageRead *storageRead)
if (storageRead->current.overallBlockLen != availableLen)
ereport(ERROR,
(errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("Wrong buffer length. Expected %d byte length"
" buffer and got %d ",
storageRead->current.overallBlockLen,
availableLen),
errdetail_appendonly_read_storage_content_header(storageRead),
errcontext_appendonly_read_storage_block(storageRead)));
errmsg("Wrong buffer length. Expected %d byte length"
" buffer and got %d ",
storageRead->current.overallBlockLen,
availableLen),
errdetail_appendonly_read_storage_content_header(storageRead),
errcontext_appendonly_read_storage_block(storageRead)));
regularContentLen = storageRead->current.uncompressedLen;
remainingLargeContentLen -= regularContentLen;
......@@ -1558,7 +1556,7 @@ AppendOnlyStorageRead_SkipCurrentBlock(AppendOnlyStorageRead *storageRead)
(errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("Too much data found after reading %d blocks for large content in segment file '%s' of table '%s'. "
"Large content metadata block is at position " INT64_FORMAT " "
"Large content length %d; extra data length %d",
"Large content length %d; extra data length %d",
regularBlockReadCount,
storageRead->segmentFileName,
storageRead->relationName,
......
......@@ -42,8 +42,8 @@ void
DtxContextInfo_CreateOnMaster(DtxContextInfo *dtxContextInfo,
int txnOptions, Snapshot snapshot)
{
int i;
CommandId curcid = 0;
int i;
CommandId curcid = 0;
if (snapshot)
curcid = snapshot->curcid;
......@@ -95,73 +95,73 @@ DtxContextInfo_CreateOnMaster(DtxContextInfo *dtxContextInfo,
if (DEBUG5 >= log_min_messages || Debug_print_full_dtm)
{
char gid[TMGIDSIZE];
char gid[TMGIDSIZE];
DistributedSnapshot *ds = &dtxContextInfo->distributedSnapshot;
if (!getDistributedTransactionIdentifier(gid))
memcpy(gid, "<empty>", 8);
elog((Debug_print_full_dtm ? LOG : DEBUG5),
"DtxContextInfo_CreateOnMaster Gp_role is DISPATCH and have currentGxact = %s, gxid = %u --> have distributed snapshot",
gid,
gid,
getDistributedTransactionId());
elog((Debug_print_full_dtm ? LOG : DEBUG5),
"DtxContextInfo_CreateOnMaster distributedXid = %u, "
"distributedSnapshotHeader (xminAllDistributedSnapshots %u, xmin = %u, xmax = %u, count = %d, maxCount %d)",
"distributedSnapshotHeader (xminAllDistributedSnapshots %u, xmin = %u, xmax = %u, count = %d, maxCount %d)",
dtxContextInfo->distributedXid,
ds->xminAllDistributedSnapshots,
ds->xmin,
ds->xmax,
ds->count,
ds->maxCount);
for (i = 0; i < ds->count; i++)
{
elog((Debug_print_full_dtm ? LOG : DEBUG5),
".... distributedSnapshotData->xip[%d] = %u",
i, ds->inProgressXidArray[i]);
".... distributedSnapshotData->xip[%d] = %u",
i, ds->inProgressXidArray[i]);
}
elog((Debug_print_full_dtm ? LOG : DEBUG5),
"DtxContextInfo_CreateOnMaster curcid = %u",
dtxContextInfo->curcid);
elog((Debug_print_full_dtm ? LOG : DEBUG5),
"DtxContextInfo_CreateOnMaster txnOptions = 0x%x, needTwoPhase = %s, explicitBegin = %s, isoLevel = %s, readOnly = %s.",
txnOptions,
"DtxContextInfo_CreateOnMaster txnOptions = 0x%x, needTwoPhase = %s, explicitBegin = %s, isoLevel = %s, readOnly = %s.",
txnOptions,
(isMppTxOptions_NeedTwoPhase(txnOptions) ? "true" : "false"),
(isMppTxOptions_ExplicitBegin(txnOptions) ? "true" : "false"),
IsoLevelAsUpperString(mppTxOptions_IsoLevel(txnOptions)),
(isMppTxOptions_ReadOnly(txnOptions) ? "true" : "false"));
(isMppTxOptions_ReadOnly(txnOptions) ? "true" : "false"));
}
}
int
int
DtxContextInfo_SerializeSize(DtxContextInfo *dtxContextInfo)
{
int size = 0;
int size = 0;
size += sizeof(DistributedTransactionId); /* distributedXid */
if (dtxContextInfo->distributedXid != InvalidDistributedTransactionId)
{
size += sizeof(DistributedTransactionTimeStamp);
size += TMGIDSIZE; /* distributedId */
size += sizeof(CommandId); /* curcid */
size += TMGIDSIZE; /* distributedId */
size += sizeof(CommandId); /* curcid */
}
size += sizeof(uint32); /* segmateSync */
size += sizeof(uint32); /* nestingLevel */
size += sizeof(bool); /* haveDistributedSnapshot */
size += sizeof(bool); /* cursorContext */
size += sizeof(uint32); /* segmateSync */
size += sizeof(uint32); /* nestingLevel */
size += sizeof(bool); /* haveDistributedSnapshot */
size += sizeof(bool); /* cursorContext */
if (dtxContextInfo->haveDistributedSnapshot)
{
size += DistributedSnapshot_SerializeSize(
&dtxContextInfo->distributedSnapshot);
&dtxContextInfo->distributedSnapshot);
}
size += sizeof(int); /* distributedTxnOptions */
elog((Debug_print_full_dtm ? LOG : DEBUG5),
"DtxContextInfo_SerializeSize is returning size = %d", size);
......@@ -171,22 +171,22 @@ DtxContextInfo_SerializeSize(DtxContextInfo *dtxContextInfo)
void
DtxContextInfo_Serialize(char *buffer, DtxContextInfo *dtxContextInfo)
{
char *p = buffer;
int i;
int used;
char *p = buffer;
int i;
int used;
DistributedSnapshot *ds = &dtxContextInfo->distributedSnapshot;
memcpy(p, &dtxContextInfo->distributedXid, sizeof(DistributedTransactionId));
p += sizeof(DistributedTransactionId);
if (dtxContextInfo->distributedXid != InvalidDistributedTransactionId)
{
memcpy(p, &dtxContextInfo->distributedTimeStamp, sizeof(DistributedTransactionTimeStamp));
memcpy(p, &dtxContextInfo->distributedTimeStamp, sizeof(DistributedTransactionTimeStamp));
p += sizeof(DistributedTransactionTimeStamp);
if (strlen(dtxContextInfo->distributedId) >= TMGIDSIZE)
elog(PANIC, "Distribute transaction identifier too long (%d)",
(int)strlen(dtxContextInfo->distributedId));
(int) strlen(dtxContextInfo->distributedId));
memcpy(p, dtxContextInfo->distributedId, TMGIDSIZE);
p += TMGIDSIZE;
p += TMGIDSIZE;
memcpy(p, &dtxContextInfo->curcid, sizeof(CommandId));
p += sizeof(CommandId);
}
......@@ -198,7 +198,7 @@ DtxContextInfo_Serialize(char *buffer, DtxContextInfo *dtxContextInfo)
elog((Debug_print_full_dtm ? LOG : DEBUG3),
"DtxContextInfo_Serialize distributedTimeStamp %u, distributedXid = %u, curcid %d nestingLevel %d segmateSync %u",
dtxContextInfo->distributedTimeStamp, dtxContextInfo->distributedXid,
dtxContextInfo->distributedTimeStamp, dtxContextInfo->distributedXid,
dtxContextInfo->curcid, dtxContextInfo->nestingLevel, dtxContextInfo->segmateSync);
memcpy(p, &dtxContextInfo->segmateSync, sizeof(uint32));
......@@ -222,14 +222,14 @@ DtxContextInfo_Serialize(char *buffer, DtxContextInfo *dtxContextInfo)
p += sizeof(int);
used = (p - buffer);
if (DEBUG5 >= log_min_messages || Debug_print_full_dtm || Debug_print_snapshot_dtm)
{
elog((Debug_print_full_dtm ? LOG : DEBUG5),
"DtxContextInfo_Serialize distributedTimeStamp %u, distributedXid = %u, "
"curcid %d",
dtxContextInfo->distributedTimeStamp,
dtxContextInfo->distributedXid,
dtxContextInfo->distributedXid,
dtxContextInfo->curcid);
if (dtxContextInfo->haveDistributedSnapshot)
......@@ -244,18 +244,18 @@ DtxContextInfo_Serialize(char *buffer, DtxContextInfo *dtxContextInfo)
for (i = 0; i < ds->count; i++)
{
elog((Debug_print_full_dtm ? LOG : DEBUG5),
".... inProgressXidArray[%d] = %u",
i, ds->inProgressXidArray[i]);
".... inProgressXidArray[%d] = %u",
i, ds->inProgressXidArray[i]);
}
elog((Debug_print_snapshot_dtm ? LOG : DEBUG5),
"[Distributed Snapshot #%u] *Serialize* currcid = %d (gxid = %u, '%s')",
"[Distributed Snapshot #%u] *Serialize* currcid = %d (gxid = %u, '%s')",
ds->distribSnapshotId,
dtxContextInfo->curcid,
getDistributedTransactionId(),
dtxContextInfo->curcid,
getDistributedTransactionId(),
DtxContextToString(DistributedTransactionContext));
}
elog((Debug_print_full_dtm ? LOG : DEBUG5),"DtxContextInfo_Serialize txnOptions = 0x%x", dtxContextInfo->distributedTxnOptions);
elog((Debug_print_full_dtm ? LOG : DEBUG5),"DtxContextInfo_Serialize copied %d bytes", used);
elog((Debug_print_full_dtm ? LOG : DEBUG5), "DtxContextInfo_Serialize txnOptions = 0x%x", dtxContextInfo->distributedTxnOptions);
elog((Debug_print_full_dtm ? LOG : DEBUG5), "DtxContextInfo_Serialize copied %d bytes", used);
}
}
......@@ -272,7 +272,7 @@ DtxContextInfo_Reset(DtxContextInfo *dtxContextInfo)
dtxContextInfo->nestingLevel = 0;
dtxContextInfo->haveDistributedSnapshot = false;
DistributedSnapshot_Reset(&dtxContextInfo->distributedSnapshot);
dtxContextInfo->distributedTxnOptions = 0;
......@@ -280,8 +280,8 @@ DtxContextInfo_Reset(DtxContextInfo *dtxContextInfo)
void
DtxContextInfo_Copy(
DtxContextInfo *target,
DtxContextInfo *source)
DtxContextInfo *target,
DtxContextInfo *source)
{
DtxContextInfo_Reset(target);
......@@ -289,9 +289,9 @@ DtxContextInfo_Copy(
target->distributedXid = source->distributedXid;
Assert(strlen(source->distributedId) < TMGIDSIZE);
memcpy(
target->distributedId,
source->distributedId,
TMGIDSIZE);
target->distributedId,
source->distributedId,
TMGIDSIZE);
target->segmateSync = source->segmateSync;
target->nestingLevel = source->nestingLevel;
......@@ -303,8 +303,8 @@ DtxContextInfo_Copy(
if (source->haveDistributedSnapshot)
DistributedSnapshot_Copy(
&target->distributedSnapshot,
&source->distributedSnapshot);
&target->distributedSnapshot,
&source->distributedSnapshot);
target->distributedTxnOptions = source->distributedTxnOptions;
......@@ -326,7 +326,7 @@ DtxContextInfo_Copy(
target->distributedSnapshot.xmin,
target->distributedSnapshot.count,
target->distributedSnapshot.xmax);
}
void
......@@ -334,7 +334,7 @@ DtxContextInfo_Deserialize(const char *serializedDtxContextInfo,
int serializedDtxContextInfolen,
DtxContextInfo *dtxContextInfo)
{
int i;
int i;
DistributedSnapshot *ds = &dtxContextInfo->distributedSnapshot;
DtxContextInfo_Reset(dtxContextInfo);
......@@ -345,7 +345,7 @@ DtxContextInfo_Deserialize(const char *serializedDtxContextInfo,
elog((Debug_print_full_dtm ? LOG : DEBUG5),
"DtxContextInfo_Deserialize serializedDtxContextInfolen = %d.",
serializedDtxContextInfolen);
serializedDtxContextInfolen);
memcpy(&dtxContextInfo->distributedXid, p, sizeof(DistributedTransactionId));
p += sizeof(DistributedTransactionId);
......@@ -357,7 +357,7 @@ DtxContextInfo_Deserialize(const char *serializedDtxContextInfo,
memcpy(dtxContextInfo->distributedId, p, TMGIDSIZE);
if (strlen(dtxContextInfo->distributedId) >= TMGIDSIZE)
elog(PANIC, "Distribute transaction identifier too long (%d)",
(int)strlen(dtxContextInfo->distributedId));
(int) strlen(dtxContextInfo->distributedId));
p += TMGIDSIZE;
memcpy(&dtxContextInfo->curcid, p, sizeof(CommandId));
p += sizeof(CommandId);
......@@ -380,8 +380,8 @@ DtxContextInfo_Deserialize(const char *serializedDtxContextInfo,
elog((Debug_print_full_dtm ? LOG : DEBUG3),
"DtxContextInfo_Deserialize distributedTimeStamp %u, distributedXid = %u, curcid %d nestingLevel %d segmateSync %u as %s",
dtxContextInfo->distributedTimeStamp, dtxContextInfo->distributedXid,
dtxContextInfo->curcid, dtxContextInfo->nestingLevel,
dtxContextInfo->distributedTimeStamp, dtxContextInfo->distributedXid,
dtxContextInfo->curcid, dtxContextInfo->nestingLevel,
dtxContextInfo->segmateSync, (Gp_is_writer ? "WRITER" : "READER"));
if (dtxContextInfo->haveDistributedSnapshot)
......@@ -402,15 +402,15 @@ DtxContextInfo_Deserialize(const char *serializedDtxContextInfo,
elog((Debug_print_full_dtm ? LOG : DEBUG5),
"DtxContextInfo_Deserialize distributedTimeStamp %u, distributedXid = %u, "
"distributedId = %s",
dtxContextInfo->distributedTimeStamp,
dtxContextInfo->distributedTimeStamp,
dtxContextInfo->distributedXid,
dtxContextInfo->distributedId);
if (dtxContextInfo->haveDistributedSnapshot)
{
elog((Debug_print_full_dtm ? LOG : DEBUG5),
"distributedSnapshotHeader (xminAllDistributedSnapshots %u, xmin = %u, xmax = %u, count = %d, maxCount = %d)",
ds->xminAllDistributedSnapshots,
"distributedSnapshotHeader (xminAllDistributedSnapshots %u, xmin = %u, xmax = %u, count = %d, maxCount = %d)",
ds->xminAllDistributedSnapshots,
ds->xmin,
ds->xmax,
ds->count,
......@@ -420,7 +420,7 @@ DtxContextInfo_Deserialize(const char *serializedDtxContextInfo,
{
elog((Debug_print_full_dtm ? LOG : DEBUG5),
".... inProgressXidArray[%d] = %u",
i, ds->inProgressXidArray[i]);
i, ds->inProgressXidArray[i]);
}
elog((Debug_print_snapshot_dtm ? LOG : DEBUG5),
......
此差异已折叠。
此差异已折叠。
/*-------------------------------------------------------------------------
*
* cdbfilerepconnclient.c
*
*
* Portions Copyright (c) 2009-2010 Greenplum Inc
* Portions Copyright (c) 2012-Present Pivotal Software, Inc.
*
......@@ -15,7 +15,7 @@
/*
*
* Responsibilities of this module.
* *)
* *)
*
*/
#include "postgres.h"
......@@ -29,16 +29,16 @@
#include "gp-libpq-int.h"
#include "cdb/cdbfilerepservice.h"
static PGconn *filerep_conn = NULL;
static PGconn *filerep_conn = NULL;
/*
*
*/
int
int
FileRepConnClient_EstablishConnection(
char *hostAddress,
int port,
bool reportError)
char *hostAddress,
int port,
bool reportError)
{
int status = STATUS_OK;
char portbuf[11];
......@@ -67,7 +67,7 @@ FileRepConnClient_EstablishConnection(
if (PQstatus(filerep_conn) != CONNECTION_OK)
{
if (reportError || Debug_filerep_print)
ereport(WARNING,
ereport(WARNING,
(errcode_for_socket_access(),
errmsg("could not establish connection with server, host:'%s' port:'%d' err:'%s' : %m",
hostAddress,
......@@ -87,7 +87,7 @@ FileRepConnClient_EstablishConnection(
/* NOTE Handle error message see ftsprobe.c */
return status;
return status;
}
/*
......@@ -118,19 +118,19 @@ FileRepConnClient_CloseConnection(void)
*/
bool
FileRepConnClient_SendMessage(
FileRepConsumerProcIndex_e messageType,
bool messageSynchronous,
char* message,
uint32 messageLength)
FileRepConsumerProcIndex_e messageType,
bool messageSynchronous,
char *message,
uint32 messageLength)
{
char msgType = 0;
int status = STATUS_OK;
char msgType = 0;
int status = STATUS_OK;
#ifdef USE_ASSERT_CHECKING
int prevOutCount = filerep_conn->outCount;
#endif // USE_ASSERT_CHECKING
int prevOutCount = filerep_conn->outCount;
#endif /* // USE_ASSERT_CHECKING */
switch(messageType)
switch (messageType)
{
case FileRepMessageTypeXLog:
msgType = '1';
......@@ -152,31 +152,35 @@ FileRepConnClient_SendMessage(
* Note that pqPutMsgStart and pqPutnchar both may grow the connection's internal buffer, and do not
* flush data
*/
if (pqPutMsgStart(msgType, true, filerep_conn) < 0 )
if (pqPutMsgStart(msgType, true, filerep_conn) < 0)
{
return false;
}
if ( pqPutnchar(message, messageLength, filerep_conn) < 0 )
if (pqPutnchar(message, messageLength, filerep_conn) < 0)
{
return false;
}
/* Server side needs complete messages for mode-transitions so disable auto-flush since it flushes
* partial messages
/*
* Server side needs complete messages for mode-transitions so disable
* auto-flush since it flushes partial messages
*/
pqPutMsgEndNoAutoFlush(filerep_conn);
/* assert that a flush did not occur */
Assert( prevOutCount + messageLength + 5 == filerep_conn->outCount ); /* the +5 is the amount added by pgPutMsgStart */
Assert(prevOutCount + messageLength + 5 == filerep_conn->outCount); /* the +5 is the amount
* added by
* pgPutMsgStart */
/*
* note also that we could do a flush beforehand to avoid
* having pqPutMsgStart and pqPutnchar growing the buffer
* note also that we could do a flush beforehand to avoid having
* pqPutMsgStart and pqPutnchar growing the buffer
*/
if (messageSynchronous || filerep_conn->outCount >= file_rep_min_data_before_flush )
if (messageSynchronous || filerep_conn->outCount >= file_rep_min_data_before_flush)
{
int result = 0;
int result = 0;
/* wait and timeout will be handled by pqWaitTimeout */
while ((status = pqFlushNonBlocking(filerep_conn)) > 0)
{
......@@ -190,12 +194,12 @@ FileRepConnClient_SendMessage(
break;
}
}
if (result < 0)
{
ereport(WARNING,
(errcode_for_socket_access(),
errmsg("could not write data to socket, failure detected : %m")));
errmsg("could not write data to socket, failure detected : %m")));
status = -1;
break;
}
......@@ -210,7 +214,7 @@ FileRepConnClient_SendMessage(
{
return false;
}
Assert( status == 0 );
Assert(status == 0);
return true;
}
......
/*-------------------------------------------------------------------------
*
* cdbfilerepconnserver.c
*
*
* Portions Copyright (c) 2009-2010 Greenplum Inc
* Portions Copyright (c) 2012-Present Pivotal Software, Inc.
*
......@@ -15,7 +15,7 @@
/*
*
* Responsibilities of this module.
* *)
* *)
*
*/
#include "postgres.h"
......@@ -31,38 +31,40 @@
#include "libpq/auth.h"
#include "libpq/pqformat.h"
static Port *port;
static Port *port;
static void ConnFree(void);
static int listenSocket[FILEREP_MAX_LISTEN];
static int listenSocket[FILEREP_MAX_LISTEN];
/*
*
*/
int
int
FileRepConnServer_StartListener(
char *hostAddress,
int portLocal)
char *hostAddress,
int portLocal)
{
int status = STATUS_OK;
int i;
for (i=0; i < FILEREP_MAX_LISTEN; i++) {
int status = STATUS_OK;
int i;
for (i = 0; i < FILEREP_MAX_LISTEN; i++)
{
listenSocket[i] = -1;
}
/* NOTE check if family AF_UNIX has to be considered as well */
status = StreamServerPort(
AF_UNSPEC,
hostAddress,
(unsigned short) portLocal,
NULL,
listenSocket,
FILEREP_MAX_LISTEN);
if (status != STATUS_OK) {
ereport(WARNING,
AF_UNSPEC,
hostAddress,
(unsigned short) portLocal,
NULL,
listenSocket,
FILEREP_MAX_LISTEN);
if (status != STATUS_OK)
{
ereport(WARNING,
(errcode_for_socket_access(),
errmsg("could not start listener, host:'%s' port:'%d': %m",
hostAddress,
......@@ -70,7 +72,7 @@ FileRepConnServer_StartListener(
errSendAlert(true),
FileRep_errcontext()));
}
return status;
}
......@@ -80,42 +82,49 @@ FileRepConnServer_StartListener(
int
FileRepConnServer_CreateConnection()
{
int status = STATUS_OK;
port = (Port*) calloc(1, sizeof(Port));
if (port == NULL) {
ereport(ERROR,
int status = STATUS_OK;
port = (Port *) calloc(1, sizeof(Port));
if (port == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
(errmsg("not enough memory to create connection"))));
(errmsg("not enough memory to create connection"))));
return status;
}
status = StreamConnection(listenSocket[0], port);
if (status != STATUS_OK) {
ereport(WARNING,
if (status != STATUS_OK)
{
ereport(WARNING,
(errcode_for_socket_access(),
errmsg("could not accept connection: %m"),
FileRep_errcontext()));
if (port->sock >= 0) {
if (port->sock >= 0)
{
StreamClose(port->sock);
}
}
ConnFree();
} else {
/* MPP-14225:
* On NIC failure, filerep receiver process's recv() system call
* will take hours to timeout, depending on the TCP timeout.
* Add SO_RCVTIMEO timeout to filerep receiver process's socket
* to avoid this.
}
else
{
/*
* MPP-14225: On NIC failure, filerep receiver process's recv() system
* call will take hours to timeout, depending on the TCP timeout. Add
* SO_RCVTIMEO timeout to filerep receiver process's socket to avoid
* this.
*/
struct timeval tv;
tv.tv_sec = file_rep_socket_timeout;
tv.tv_usec = 0; /* Not initializing this can cause strange errors */
tv.tv_usec = 0; /* Not initializing this can cause strange
* errors */
if (setsockopt(port->sock, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(struct timeval)) == -1)
if (setsockopt(port->sock, SOL_SOCKET, SO_RCVTIMEO, (char *) &tv, sizeof(struct timeval)) == -1)
ereport(WARNING,
(errcode_for_socket_access(),
errmsg("could not set receive timeout on socket")));
errmsg("could not set receive timeout on socket")));
/* set TCP keep-alive parameters for FileRep connection */
(void) pq_setkeepalivesidle(gp_filerep_tcp_keepalives_idle, port);
......@@ -134,52 +143,56 @@ FileRepConnServer_CreateConnection()
int
FileRepConnServer_Select(void)
{
struct timeval timeout;
fd_set rfds;
int retval;
struct timeval timeout;
fd_set rfds;
int retval;
timeout.tv_sec = 0;
timeout.tv_usec = 100 * 1000L;
FD_ZERO(&rfds);
FD_SET(listenSocket[0], &rfds);
retval = select(listenSocket[0]+1, &rfds, NULL, NULL, &timeout);
/*
* check and process any signals received
* The routine returns TRUE if the received signal requests
* process shutdown.
retval = select(listenSocket[0] + 1, &rfds, NULL, NULL, &timeout);
/*
* check and process any signals received The routine returns TRUE if the
* received signal requests process shutdown.
*/
if (retval) {
if (! FD_ISSET(listenSocket[0], &rfds)) {
if (retval)
{
if (!FD_ISSET(listenSocket[0], &rfds))
{
retval = -1;
}
}
if (retval == -1) {
ereport(WARNING,
if (retval == -1)
{
ereport(WARNING,
(errcode_for_socket_access(),
errmsg("receive failure on connection: %m"),
FileRep_errcontext()));
}
return retval;
}
void
FileRepConnServer_CloseConnection(void)
{
if (port != NULL) {
if (port != NULL)
{
secure_close(port);
if (port->sock >= 0) {
if (port->sock >= 0)
{
/* to close socket (file descriptor) */
StreamClose(port->sock);
port->sock = -1;
}
}
ConnFree();
}
}
......@@ -188,9 +201,10 @@ FileRepConnServer_CloseConnection(void)
*
*/
void
ConnFree(void)
{
if (port) {
ConnFree(void)
{
if (port)
{
/* We copied the pointer in FileRepConnServer_CreateConnection() */
if (port == MyProcPort)
MyProcPort = NULL;
......@@ -206,80 +220,88 @@ ConnFree(void)
int
FileRepConnServer_ReceiveStartupPacket(void)
{
uint32 length;
int status = STATUS_OK;
char *buf = NULL;
pq_init();
uint32 length;
int status = STATUS_OK;
char *buf = NULL;
pq_init();
status = FileRepConnServer_ReceiveMessageLength(&length);
if (status != STATUS_OK) {
if (status != STATUS_OK)
{
goto exit;
}
if (length < (uint32) sizeof(ProtocolVersion) ||
length > MAX_STARTUP_PACKET_LENGTH) {
length > MAX_STARTUP_PACKET_LENGTH)
{
status = STATUS_ERROR;
ereport(WARNING,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid length of startup packet"),
FileRep_errcontext()));
FileRep_errcontext()));
goto exit;
}
buf = (char *)malloc(length +1);
if (buf == NULL) {
buf = (char *) malloc(length + 1);
if (buf == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("not enough memory to allocate buffer for startup packet"),
FileRep_errcontext()));
FileRep_errcontext()));
}
memset(buf, 0, length + 1);
if (pq_getbytes(buf, length) == EOF) {
if (pq_getbytes(buf, length) == EOF)
{
status = STATUS_ERROR;
ereport(WARNING,
(errcode_for_socket_access(),
errmsg("receive EOF on connection: %m"),
FileRep_errcontext()));
FileRep_errcontext()));
goto exit;
}
port->proto = ntohl(*((ProtocolVersion *) buf));
if (PG_PROTOCOL_MAJOR(port->proto) >= 3) {
/* uint32 offset = sizeof(ProtocolVersion);*/
/*
* tell the client that it is authorized (no pg_hba.conf and
* password are required).
*/
StringInfoData buf;
if (PG_PROTOCOL_MAJOR(port->proto) >= 3)
{
/* uint32 offset = sizeof(ProtocolVersion); */
/*
* tell the client that it is authorized (no pg_hba.conf and password
* are required).
*/
StringInfoData buf;
/* sends AUTH_REQ_OK back to client */
FakeClientAuthentication(port);
/* send to client that we are ready to receive data */
/* similar to ReadyForQuery(DestRemoteExecute); */
pq_beginmessage(&buf, 'Z');
pq_sendbyte(&buf, 'I');
pq_endmessage(&buf);
pq_flush();
} else {
}
else
{
ereport(WARNING,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("not supported version"),
FileRep_errcontext()));
FileRep_errcontext()));
}
exit:
if (buf) {
if (buf)
{
free(buf);
buf = NULL;
}
return status;
}
......@@ -299,56 +321,58 @@ FileRepConnServer_AwaitMessageBegin()
*/
int
FileRepConnServer_ReceiveMessageType(
FileRepConsumerProcIndex_e *fileRepMessageType)
FileRepConsumerProcIndex_e *fileRepMessageType)
{
char messageType;
int status = STATUS_OK;
messageType = pq_getbyte();
switch (messageType) {
char messageType;
int status = STATUS_OK;
case '1':
*fileRepMessageType = FileRepMessageTypeXLog;
break;
messageType = pq_getbyte();
case '2':
*fileRepMessageType = FileRepMessageTypeAO01;
break;
switch (messageType)
{
case '1':
*fileRepMessageType = FileRepMessageTypeXLog;
break;
case '2':
*fileRepMessageType = FileRepMessageTypeAO01;
break;
case '3':
*fileRepMessageType = FileRepMessageTypeWriter;
break;
case 'S':
*fileRepMessageType = FileRepMessageTypeShutdown;
break;
case EOF:
ereport(WARNING,
(errcode_for_socket_access(),
errmsg("receive EOF on connection: %m")));
status = STATUS_ERROR;
*fileRepMessageType = FileRepMessageTypeWriter;
break;
case 'S':
*fileRepMessageType = FileRepMessageTypeShutdown;
break;
case EOF:
ereport(WARNING,
(errcode_for_socket_access(),
errmsg("receive EOF on connection: %m")));
status = STATUS_ERROR;
break;
case 'X':
/* Close Message(sent by PQfinish()) */
/*
* Client closed connection. Client does not wait for response.
*/
ereport(WARNING,
(errcode_for_socket_access(),
errmsg("receive close on connection: %m")));
status = STATUS_ERROR;
break;
case 'X': // Close Message (sent by PQfinish())
/*
* Client closed connection.
* Client does not wait for response.
*/
ereport(WARNING,
(errcode_for_socket_access(),
errmsg("receive close on connection: %m")));
status = STATUS_ERROR;
break;
default:
default:
ereport(WARNING,
(errcode_for_socket_access(),
(errcode_for_socket_access(),
errmsg("receive unexpected message type on connection: %m")));
status = STATUS_ERROR;
break;
status = STATUS_ERROR;
break;
}
return status;
......@@ -361,26 +385,28 @@ int
FileRepConnServer_ReceiveMessageLength(uint32 *len)
{
int32 length;
if (pq_getbytes((char*) &length, 4) == EOF) {
int32 length;
if (pq_getbytes((char *) &length, 4) == EOF)
{
ereport(WARNING,
(errcode_for_socket_access(),
errmsg("receive EOF on connection: %m")));
return STATUS_ERROR;
}
length = ntohl(length);
if (length < 4) {
if (length < 4)
{
ereport(WARNING,
(errmsg("receive unexpected message length on connection")));
(errmsg("receive unexpected message length on connection")));
return STATUS_ERROR;
}
length -= 4;
*len = length;
return STATUS_OK;
......@@ -390,23 +416,20 @@ FileRepConnServer_ReceiveMessageLength(uint32 *len)
/*
*
*/
int
FileRepConnServer_ReceiveMessageData(
char *data,
uint32 length)
{
if (pq_getbytes(data, length) == EOF)
{
ereport(WARNING,
(errcode_for_socket_access(),
errmsg("receive EOF on connection: %m")));
return STATUS_ERROR;
}
return STATUS_OK;
}
int
FileRepConnServer_ReceiveMessageData(
char *data,
uint32 length)
{
if (pq_getbytes(data, length) == EOF)
{
ereport(WARNING,
(errcode_for_socket_access(),
errmsg("receive EOF on connection: %m")));
return STATUS_ERROR;
}
return STATUS_OK;
}
此差异已折叠。
此差异已折叠。
......@@ -17,180 +17,179 @@
#include "postgres.h"
#include "cdb/cdbheap.h" /* me */
#include "cdb/cdbheap.h" /* me */
static void
SiftDown(CdbHeap *hp, int iHole, void *newElement);
SiftDown(CdbHeap *hp, int iHole, void *newElement);
static inline void
CdbHeap_CopySlot(CdbHeap *hp, void *tgtSlot, const void *srcSlot)
{
int *tgt = (int *)tgtSlot;
const int *src = (const int *)srcSlot;
int j = hp->wordsPerSlot;
tgt[0] = src[0];
if (--j > 0)
{
tgt[1] = src[1];
if (--j > 0)
{
tgt[2] = src[2];
while (--j > 0)
tgt[j+2] = src[j+2];
}
}
} /* CdbHeap_CopySlot */
int *tgt = (int *) tgtSlot;
const int *src = (const int *) srcSlot;
int j = hp->wordsPerSlot;
tgt[0] = src[0];
if (--j > 0)
{
tgt[1] = src[1];
if (--j > 0)
{
tgt[2] = src[2];
while (--j > 0)
tgt[j + 2] = src[j + 2];
}
}
} /* CdbHeap_CopySlot */
/* Allocate and initialize a CdbHeap structure. */
CdbHeap *
CdbHeap_Create(CdbHeapCmpFn comparator,
void *comparatorContext,
int nSlotsMax,
int bytesPerSlot,
void *slotArray)
CdbHeap_Create(CdbHeapCmpFn comparator,
void *comparatorContext,
int nSlotsMax,
int bytesPerSlot,
void *slotArray)
{
CdbHeap *hp = (CdbHeap *)palloc0(sizeof(*hp) + bytesPerSlot);
CdbHeap *hp = (CdbHeap *) palloc0(sizeof(*hp) + bytesPerSlot);
Assert(comparator && nSlotsMax > 0);
Assert(comparator && nSlotsMax > 0);
/* Initialize the CdbHeap structure. */
hp->nSlotsUsed = 0;
hp->nSlotsMax = nSlotsMax;
hp->bytesPerSlot = bytesPerSlot;
hp->wordsPerSlot = bytesPerSlot / sizeof(int);
hp->comparator = comparator;
hp->comparatorContext = comparatorContext;
/* Initialize the CdbHeap structure. */
hp->nSlotsUsed = 0;
hp->nSlotsMax = nSlotsMax;
hp->bytesPerSlot = bytesPerSlot;
hp->wordsPerSlot = bytesPerSlot / sizeof(int);
hp->comparator = comparator;
hp->comparatorContext = comparatorContext;
/* A 1-element temporary buffer immediately follows the CdbHeap struct. */
hp->tempSlot = (char *)(hp + 1);
/* A 1-element temporary buffer immediately follows the CdbHeap struct. */
hp->tempSlot = (char *) (hp + 1);
/* Allocate space for array of priority queue entries. */
hp->slotArray = slotArray;
hp->ownSlotArray = false;
if (!slotArray)
{
hp->slotArray = palloc0(nSlotsMax * bytesPerSlot);
hp->ownSlotArray = true;
}
/* Allocate space for array of priority queue entries. */
hp->slotArray = slotArray;
hp->ownSlotArray = false;
if (!slotArray)
{
hp->slotArray = palloc0(nSlotsMax * bytesPerSlot);
hp->ownSlotArray = true;
}
Assert(hp->wordsPerSlot > 0 &&
(int)(hp->wordsPerSlot * sizeof(int)) == bytesPerSlot);
Assert(hp->wordsPerSlot > 0 &&
(int) (hp->wordsPerSlot * sizeof(int)) == bytesPerSlot);
return hp;
} /* CdbHeap_Create */
return hp;
} /* CdbHeap_Create */
/* Free a CdbHeap structure. */
void
CdbHeap_Destroy(CdbHeap *hp)
{
if (hp)
{
/* Free slotArray if we allocated it. Don't free if caller owns it. */
if (hp->ownSlotArray &&
hp->slotArray)
pfree(hp->slotArray);
if (hp)
{
/* Free slotArray if we allocated it. Don't free if caller owns it. */
if (hp->ownSlotArray &&
hp->slotArray)
pfree(hp->slotArray);
pfree(hp);
}
} /* CdbHeap_Destroy */
pfree(hp);
}
} /* CdbHeap_Destroy */
/* Arrange elements of slotArray such that the heap property is satisfied. */
void
CdbHeap_Heapify(CdbHeap *hp, int nSlotsUsed)
{
Assert(hp &&
nSlotsUsed >= 0 &&
nSlotsUsed <= hp->nSlotsMax);
hp->nSlotsUsed = nSlotsUsed;
if (nSlotsUsed > 1)
{
int i;
for (i = nSlotsUsed/2-1; i >= 0; i--)
{
/* Make a hole at slot i by moving its contents to temp area. */
CdbHeap_CopySlot(hp, hp->tempSlot, CdbHeap_Slot(void, hp, i));
/* Refill the hole, moving smallest descendant into slot i. */
SiftDown(hp, i, hp->tempSlot);
}
}
} /* CdbHeap_Heapify */
Assert(hp &&
nSlotsUsed >= 0 &&
nSlotsUsed <= hp->nSlotsMax);
hp->nSlotsUsed = nSlotsUsed;
if (nSlotsUsed > 1)
{
int i;
for (i = nSlotsUsed / 2 - 1; i >= 0; i--)
{
/* Make a hole at slot i by moving its contents to temp area. */
CdbHeap_CopySlot(hp, hp->tempSlot, CdbHeap_Slot(void, hp, i));
/* Refill the hole, moving smallest descendant into slot i. */
SiftDown(hp, i, hp->tempSlot);
}
}
} /* CdbHeap_Heapify */
/* Delete the smallest element. */
void
CdbHeap_DeleteMin(CdbHeap *hp)
{
Assert(hp && hp->nSlotsUsed > 0);
Assert(hp && hp->nSlotsUsed > 0);
/* Heap shrinks by one element. */
hp->nSlotsUsed--;
/* Heap shrinks by one element. */
hp->nSlotsUsed--;
/* Sift down the rightmost element, refilling hole at root with new min. */
if (hp->nSlotsUsed > 0)
SiftDown(hp, 0, CdbHeap_Slot(void, hp, hp->nSlotsUsed));
} /* CdbHeap_DeleteMin */
/* Sift down the rightmost element, refilling hole at root with new min. */
if (hp->nSlotsUsed > 0)
SiftDown(hp, 0, CdbHeap_Slot(void, hp, hp->nSlotsUsed));
} /* CdbHeap_DeleteMin */
/* Delete the smallest element and insert a copy of the given element. */
void
CdbHeap_DeleteMinAndInsert(CdbHeap *hp, void* newElement)
CdbHeap_DeleteMinAndInsert(CdbHeap *hp, void *newElement)
{
Assert(hp &&
hp->nSlotsUsed > 0 &&
newElement);
Assert(hp &&
hp->nSlotsUsed > 0 &&
newElement);
/* Sift down the new element, refilling hole at root with new min. */
SiftDown(hp, 0, newElement);
} /* CdbHeap_DeleteMinAndInsert */
/* Sift down the new element, refilling hole at root with new min. */
SiftDown(hp, 0, newElement);
} /* CdbHeap_DeleteMinAndInsert */
void
SiftDown(CdbHeap *hp, int iHole, void *newElement)
{
CdbHeapCmpFn comparator = hp->comparator;
void *comparatorContext = hp->comparatorContext;
int bytesPerSlot = hp->bytesPerSlot;
char *slot0 = CdbHeap_Slot(char, hp, 0);
char *slotN = CdbHeap_Slot(char, hp, hp->nSlotsUsed);
char *firstLeafSlot = CdbHeap_Slot(char, hp, hp->nSlotsUsed >> 1);
char *curSlot;
char *kidSlot;
Assert(iHole >= 0 &&
iHole <= hp->nSlotsUsed &&
iHole < hp->nSlotsMax &&
newElement);
/* Bubble up the new min value into the hole; the hole sinks down. */
for (curSlot = iHole * bytesPerSlot + slot0;
curSlot < firstLeafSlot;
curSlot = kidSlot)
{
/* Point to left child (could be a leaf). */
kidSlot = curSlot - slot0 + bytesPerSlot + curSlot;
/* If right child exists and has lesser value, choose it instead. */
if (kidSlot+bytesPerSlot < slotN &&
(*comparator)(kidSlot+bytesPerSlot, kidSlot, comparatorContext) < 0)
kidSlot += bytesPerSlot;
/* Hole comes to rest where new value <= all descendants. */
if ((*comparator)(newElement, kidSlot, comparatorContext) <= 0)
break;
/* Hole trades places with lesser child. */
CdbHeap_CopySlot(hp, curSlot, kidSlot);
}
/* Fill the hole with the given element. */
CdbHeap_CopySlot(hp, curSlot, newElement);
} /* SiftDown */
CdbHeapCmpFn comparator = hp->comparator;
void *comparatorContext = hp->comparatorContext;
int bytesPerSlot = hp->bytesPerSlot;
char *slot0 = CdbHeap_Slot(char, hp, 0);
char *slotN = CdbHeap_Slot(char, hp, hp->nSlotsUsed);
char *firstLeafSlot = CdbHeap_Slot(char, hp, hp->nSlotsUsed >> 1);
char *curSlot;
char *kidSlot;
Assert(iHole >= 0 &&
iHole <= hp->nSlotsUsed &&
iHole < hp->nSlotsMax &&
newElement);
/* Bubble up the new min value into the hole; the hole sinks down. */
for (curSlot = iHole * bytesPerSlot + slot0;
curSlot < firstLeafSlot;
curSlot = kidSlot)
{
/* Point to left child (could be a leaf). */
kidSlot = curSlot - slot0 + bytesPerSlot + curSlot;
/* If right child exists and has lesser value, choose it instead. */
if (kidSlot + bytesPerSlot < slotN &&
(*comparator) (kidSlot + bytesPerSlot, kidSlot, comparatorContext) < 0)
kidSlot += bytesPerSlot;
/* Hole comes to rest where new value <= all descendants. */
if ((*comparator) (newElement, kidSlot, comparatorContext) <= 0)
break;
/* Hole trades places with lesser child. */
CdbHeap_CopySlot(hp, curSlot, kidSlot);
}
/* Fill the hole with the given element. */
CdbHeap_CopySlot(hp, curSlot, newElement);
} /* SiftDown */
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册