提交 461ef73f 编写于 作者: T Tatsuo Ishii

Add API for 64-bit large object access. Now users can access up to

4TB large objects (standard 8KB BLCKSZ case).  For this purpose new
libpq API lo_lseek64, lo_tell64 and lo_truncate64 are added.  Also
corresponding new backend functions lo_lseek64, lo_tell64 and
lo_truncate64 are added. inv_api.c is changed to handle 64-bit
offsets.

Patch contributed by Nozomi Anzai (backend side) and Yugo Nagata
(frontend side, docs, regression tests and example program). Reviewed
by Kohei Kaigai. Committed by Tatsuo Ishii with minor editings.
上级 ae835c7d
......@@ -41,7 +41,7 @@
larger than a single database page into a secondary storage area per table.
This makes the large object facility partially obsolete. One
remaining advantage of the large object facility is that it allows values
up to 2 GB in size, whereas <acronym>TOAST</acronym>ed fields can be at
up to 4 TB in size, whereas <acronym>TOAST</acronym>ed fields can be at
most 1 GB. Also, large objects can be randomly modified using a read/write
API that is more efficient than performing such operations using
<acronym>TOAST</acronym>.
......@@ -237,7 +237,9 @@ int lo_open(PGconn *conn, Oid lobjId, int mode);
<function>lo_open</function> returns a (non-negative) large object
descriptor for later use in <function>lo_read</function>,
<function>lo_write</function>, <function>lo_lseek</function>,
<function>lo_tell</function>, and <function>lo_close</function>.
<function>lo_lseek64</function>, <function>lo_tell</function>,
<function>lo_tell64</function>, <function>lo_truncate</function>,
<function>lo_truncate64</function>, and <function>lo_close</function>.
The descriptor is only valid for
the duration of the current transaction.
On failure, -1 is returned.
......@@ -312,6 +314,7 @@ int lo_read(PGconn *conn, int fd, char *buf, size_t len);
large object descriptor, call
<synopsis>
int lo_lseek(PGconn *conn, int fd, int offset, int whence);
pg_int64 lo_lseek64(PGconn *conn, int fd, pg_int64 offset, int whence);
</synopsis>
<indexterm><primary>lo_lseek</></> This function moves the
current location pointer for the large object descriptor identified by
......@@ -321,7 +324,16 @@ int lo_lseek(PGconn *conn, int fd, int offset, int whence);
<symbol>SEEK_CUR</> (seek from current position), and
<symbol>SEEK_END</> (seek from object end). The return value is
the new location pointer, or -1 on error.
<indexterm><primary>lo_lseek64</></> <function>lo_lseek64</function>
is a function for large objects larger than 2GB. <symbol>pg_int64</>
is defined as 8-byte integer type.
</para>
<para>
<function>lo_lseek64</> is new as of <productname>PostgreSQL</productname>
9.3; if this function is run against an older server version, it will
fail and return a negative value.
</para>
</sect2>
<sect2 id="lo-tell">
......@@ -332,9 +344,17 @@ int lo_lseek(PGconn *conn, int fd, int offset, int whence);
call
<synopsis>
int lo_tell(PGconn *conn, int fd);
pg_int64 lo_tell64(PGconn *conn, int fd);
</synopsis>
<indexterm><primary>lo_tell</></> If there is an error, the
return value is negative.
<indexterm><primary>lo_tell64</></> <function>lo_tell64</function> is
a function for large objects larger than 2GB.
</para>
<para>
<function>lo_tell64</> is new as of <productname>PostgreSQL</productname>
9.3; if this function is run against an older server version, it will
fail and return a negative value.
</para>
</sect2>
......@@ -345,6 +365,7 @@ int lo_tell(PGconn *conn, int fd);
To truncate a large object to a given length, call
<synopsis>
int lo_truncate(PGcon *conn, int fd, size_t len);
int lo_truncate64(PGcon *conn, int fd, pg_int64 len);
</synopsis>
<indexterm><primary>lo_truncate</></> truncates the large object
descriptor <parameter>fd</> to length <parameter>len</>. The
......@@ -352,6 +373,8 @@ int lo_truncate(PGcon *conn, int fd, size_t len);
previous <function>lo_open</function>. If <parameter>len</> is
greater than the current large object length, the large object
is extended with null bytes ('\0').
<indexterm><primary>lo_truncate64</></> <function>lo_truncate64</function>
is a function for large objects larger than 2GB.
</para>
<para>
......@@ -359,7 +382,7 @@ int lo_truncate(PGcon *conn, int fd, size_t len);
</para>
<para>
On success <function>lo_truncate</function> returns
On success <function>lo_truncate</function> and <function>lo_truncate64</function> returns
zero. On error, the return value is negative.
</para>
......@@ -368,6 +391,11 @@ int lo_truncate(PGcon *conn, int fd, size_t len);
8.3; if this function is run against an older server version, it will
fail and return a negative value.
</para>
<para>
<function>lo_truncate64</> is new as of <productname>PostgreSQL</productname>
9.3; if this function is run against an older server version, it will
fail and return a negative value.
</para>
</sect2>
<sect2 id="lo-close">
......
......@@ -39,6 +39,7 @@
#include "postgres.h"
#include <fcntl.h>
#include <limits.h>
#include <sys/stat.h>
#include <unistd.h>
......@@ -216,7 +217,7 @@ lo_lseek(PG_FUNCTION_ARGS)
int32 fd = PG_GETARG_INT32(0);
int32 offset = PG_GETARG_INT32(1);
int32 whence = PG_GETARG_INT32(2);
int status;
int64 status;
if (fd < 0 || fd >= cookies_size || cookies[fd] == NULL)
ereport(ERROR,
......@@ -225,9 +226,45 @@ lo_lseek(PG_FUNCTION_ARGS)
status = inv_seek(cookies[fd], offset, whence);
if (INT_MAX < status)
{
ereport(ERROR,
(errcode(ERRCODE_BLOB_OFFSET_OVERFLOW),
errmsg("offset overflow: %d", fd)));
PG_RETURN_INT32(-1);
}
PG_RETURN_INT32(status);
}
Datum
lo_lseek64(PG_FUNCTION_ARGS)
{
int32 fd = PG_GETARG_INT32(0);
int64 offset = PG_GETARG_INT64(1);
int32 whence = PG_GETARG_INT32(2);
MemoryContext currentContext;
int64 status;
if (fd < 0 || fd >= cookies_size || cookies[fd] == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("invalid large-object descriptor: %d", fd)));
PG_RETURN_INT64(-1);
}
Assert(fscxt != NULL);
currentContext = MemoryContextSwitchTo(fscxt);
status = inv_seek(cookies[fd], offset, whence);
MemoryContextSwitchTo(currentContext);
PG_RETURN_INT64(status);
}
Datum
lo_creat(PG_FUNCTION_ARGS)
{
......@@ -262,15 +299,48 @@ lo_create(PG_FUNCTION_ARGS)
Datum
lo_tell(PG_FUNCTION_ARGS)
{
int32 fd = PG_GETARG_INT32(0);
int64 offset = 0;
if (fd < 0 || fd >= cookies_size || cookies[fd] == NULL)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("invalid large-object descriptor: %d", fd)));
offset = inv_tell(cookies[fd]);
if (INT_MAX < offset)
{
ereport(ERROR,
(errcode(ERRCODE_BLOB_OFFSET_OVERFLOW),
errmsg("offset overflow: %d", fd)));
PG_RETURN_INT32(-1);
}
PG_RETURN_INT32(offset);
}
Datum
lo_tell64(PG_FUNCTION_ARGS)
{
int32 fd = PG_GETARG_INT32(0);
if (fd < 0 || fd >= cookies_size || cookies[fd] == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("invalid large-object descriptor: %d", fd)));
PG_RETURN_INT64(-1);
}
PG_RETURN_INT32(inv_tell(cookies[fd]));
/*
* We assume we do not need to switch contexts for inv_tell. That is
* true for now, but is probably more than this module ought to
* assume...
*/
PG_RETURN_INT64(inv_tell(cookies[fd]));
}
Datum
......@@ -533,6 +603,33 @@ lo_truncate(PG_FUNCTION_ARGS)
PG_RETURN_INT32(0);
}
Datum
lo_truncate64(PG_FUNCTION_ARGS)
{
int32 fd = PG_GETARG_INT32(0);
int64 len = PG_GETARG_INT64(1);
if (fd < 0 || fd >= cookies_size || cookies[fd] == NULL)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("invalid large-object descriptor: %d", fd)));
/* Permission checks */
if (!lo_compat_privileges &&
pg_largeobject_aclcheck_snapshot(cookies[fd]->id,
GetUserId(),
ACL_UPDATE,
cookies[fd]->snapshot) != ACLCHECK_OK)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied for large object %u",
cookies[fd]->id)));
inv_truncate(cookies[fd], len);
PG_RETURN_INT32(0);
}
/*
* AtEOXact_LargeObject -
* prepares large objects for transaction commit
......
......@@ -324,10 +324,10 @@ inv_drop(Oid lobjId)
* NOTE: LOs can contain gaps, just like Unix files. We actually return
* the offset of the last byte + 1.
*/
static uint32
static uint64
inv_getsize(LargeObjectDesc *obj_desc)
{
uint32 lastbyte = 0;
uint64 lastbyte = 0;
ScanKeyData skey[1];
SysScanDesc sd;
HeapTuple tuple;
......@@ -368,7 +368,7 @@ inv_getsize(LargeObjectDesc *obj_desc)
heap_tuple_untoast_attr((struct varlena *) datafield);
pfreeit = true;
}
lastbyte = data->pageno * LOBLKSIZE + getbytealen(datafield);
lastbyte = (uint64) data->pageno * LOBLKSIZE + getbytealen(datafield);
if (pfreeit)
pfree(datafield);
}
......@@ -378,30 +378,31 @@ inv_getsize(LargeObjectDesc *obj_desc)
return lastbyte;
}
int
inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
int64
inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
{
Assert(PointerIsValid(obj_desc));
switch (whence)
{
case SEEK_SET:
if (offset < 0)
elog(ERROR, "invalid seek offset: %d", offset);
if (offset < 0 || offset >= MAX_LARGE_OBJECT_SIZE)
elog(ERROR, "invalid seek offset: " INT64_FORMAT, offset);
obj_desc->offset = offset;
break;
case SEEK_CUR:
if (offset < 0 && obj_desc->offset < ((uint32) (-offset)))
elog(ERROR, "invalid seek offset: %d", offset);
if ((offset + obj_desc->offset) < 0 ||
(offset + obj_desc->offset) >= MAX_LARGE_OBJECT_SIZE)
elog(ERROR, "invalid seek offset: " INT64_FORMAT, offset);
obj_desc->offset += offset;
break;
case SEEK_END:
{
uint32 size = inv_getsize(obj_desc);
int64 pos = inv_getsize(obj_desc) + offset;
if (offset < 0 && size < ((uint32) (-offset)))
elog(ERROR, "invalid seek offset: %d", offset);
obj_desc->offset = size + offset;
if (pos < 0 || pos >= MAX_LARGE_OBJECT_SIZE)
elog(ERROR, "invalid seek offset: " INT64_FORMAT, offset);
obj_desc->offset = pos;
}
break;
default:
......@@ -410,7 +411,7 @@ inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
return obj_desc->offset;
}
int
int64
inv_tell(LargeObjectDesc *obj_desc)
{
Assert(PointerIsValid(obj_desc));
......@@ -422,11 +423,11 @@ int
inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
{
int nread = 0;
int n;
int off;
int64 n;
int64 off;
int len;
int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
uint32 pageoff;
uint64 pageoff;
ScanKeyData skey[2];
SysScanDesc sd;
HeapTuple tuple;
......@@ -437,6 +438,9 @@ inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
if (nbytes <= 0)
return 0;
if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
elog(ERROR, "invalid read request size: %d", nbytes);
open_lo_relation();
ScanKeyInit(&skey[0],
......@@ -467,7 +471,7 @@ inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
* there may be missing pages if the LO contains unwritten "holes". We
* want missing sections to read out as zeroes.
*/
pageoff = ((uint32) data->pageno) * LOBLKSIZE;
pageoff = ((uint64) data->pageno) * LOBLKSIZE;
if (pageoff > obj_desc->offset)
{
n = pageoff - obj_desc->offset;
......@@ -560,6 +564,9 @@ inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
if (nbytes <= 0)
return 0;
if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
elog(ERROR, "invalid write request size: %d", nbytes);
open_lo_relation();
indstate = CatalogOpenIndexes(lo_heap_r);
......@@ -718,10 +725,10 @@ inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
}
void
inv_truncate(LargeObjectDesc *obj_desc, int len)
inv_truncate(LargeObjectDesc *obj_desc, int64 len)
{
int32 pageno = (int32) (len / LOBLKSIZE);
int off;
int32 off;
ScanKeyData skey[2];
SysScanDesc sd;
HeapTuple oldtuple;
......
......@@ -199,6 +199,7 @@ Section: Class 22 - Data Exception
2200N E ERRCODE_INVALID_XML_CONTENT invalid_xml_content
2200S E ERRCODE_INVALID_XML_COMMENT invalid_xml_comment
2200T E ERRCODE_INVALID_XML_PROCESSING_INSTRUCTION invalid_xml_processing_instruction
22P07 E ERRCODE_BLOB_OFFSET_OVERFLOW blob_offset_overflow
Section: Class 23 - Integrity Constraint Violation
......
......@@ -1040,14 +1040,20 @@ DATA(insert OID = 955 ( lowrite PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 23
DESCR("large object write");
DATA(insert OID = 956 ( lo_lseek PGNSP PGUID 12 1 0 0 0 f f f f t f v 3 0 23 "23 23 23" _null_ _null_ _null_ _null_ lo_lseek _null_ _null_ _null_ ));
DESCR("large object seek");
DATA(insert OID = 3170 ( lo_lseek64 PGNSP PGUID 12 1 0 0 0 f f f f t f v 3 0 20 "23 20 23" _null_ _null_ _null_ _null_ lo_lseek64 _null_ _null_ _null_ ));
DESCR("large object seek (64 bit)");
DATA(insert OID = 957 ( lo_creat PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 26 "23" _null_ _null_ _null_ _null_ lo_creat _null_ _null_ _null_ ));
DESCR("large object create");
DATA(insert OID = 715 ( lo_create PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 26 "26" _null_ _null_ _null_ _null_ lo_create _null_ _null_ _null_ ));
DESCR("large object create");
DATA(insert OID = 958 ( lo_tell PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 23 "23" _null_ _null_ _null_ _null_ lo_tell _null_ _null_ _null_ ));
DESCR("large object position");
DATA(insert OID = 3171 ( lo_tell64 PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 20 "23" _null_ _null_ _null_ _null_ lo_tell64 _null_ _null_ _null_ ));
DESCR("large object position (64 bit)");
DATA(insert OID = 1004 ( lo_truncate PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 23 "23 23" _null_ _null_ _null_ _null_ lo_truncate _null_ _null_ _null_ ));
DESCR("truncate large object");
DATA(insert OID = 3172 ( lo_truncate64 PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 23 "23 20" _null_ _null_ _null_ _null_ lo_truncate64 _null_ _null_ _null_ ));
DESCR("truncate large object (64 bit)");
DATA(insert OID = 959 ( on_pl PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 16 "600 628" _null_ _null_ _null_ _null_ on_pl _null_ _null_ _null_ ));
DATA(insert OID = 960 ( on_sl PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 16 "601 628" _null_ _null_ _null_ _null_ on_sl _null_ _null_ _null_ ));
......
......@@ -34,8 +34,11 @@ extern Datum lowrite(PG_FUNCTION_ARGS);
extern Datum lo_lseek(PG_FUNCTION_ARGS);
extern Datum lo_tell(PG_FUNCTION_ARGS);
extern Datum lo_lseek64(PG_FUNCTION_ARGS);
extern Datum lo_tell64(PG_FUNCTION_ARGS);
extern Datum lo_unlink(PG_FUNCTION_ARGS);
extern Datum lo_truncate(PG_FUNCTION_ARGS);
extern Datum lo_truncate64(PG_FUNCTION_ARGS);
/*
* compatibility option for access control
......
......@@ -56,4 +56,9 @@ typedef unsigned int Oid;
#define PG_DIAG_SOURCE_LINE 'L'
#define PG_DIAG_SOURCE_FUNCTION 'R'
#ifndef NO_PG_INT64
#define HAVE_PG_INT64 1
typedef long long int pg_int64;
#endif
#endif
......@@ -37,7 +37,7 @@ typedef struct LargeObjectDesc
Oid id; /* LO's identifier */
Snapshot snapshot; /* snapshot to use */
SubTransactionId subid; /* owning subtransaction ID */
uint32 offset; /* current seek pointer */
uint64 offset; /* current seek pointer */
int flags; /* locking info, etc */
/* flag bits: */
......@@ -62,7 +62,10 @@ typedef struct LargeObjectDesc
* This avoids unnecessary tuple updates caused by partial-page writes.
*/
#define LOBLKSIZE (BLCKSZ / 4)
/*
* Maximum byte length for each large object
*/
#define MAX_LARGE_OBJECT_SIZE INT64CONST(INT_MAX * LOBLKSIZE)
/*
* Function definitions...
......@@ -74,10 +77,10 @@ extern Oid inv_create(Oid lobjId);
extern LargeObjectDesc *inv_open(Oid lobjId, int flags, MemoryContext mcxt);
extern void inv_close(LargeObjectDesc *obj_desc);
extern int inv_drop(Oid lobjId);
extern int inv_seek(LargeObjectDesc *obj_desc, int offset, int whence);
extern int inv_tell(LargeObjectDesc *obj_desc);
extern int64 inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence);
extern int64 inv_tell(LargeObjectDesc *obj_desc);
extern int inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes);
extern int inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes);
extern void inv_truncate(LargeObjectDesc *obj_desc, int len);
extern void inv_truncate(LargeObjectDesc *obj_desc, int64 len);
#endif /* LARGE_OBJECT_H */
......@@ -161,3 +161,6 @@ PQping 158
PQpingParams 159
PQlibVersion 160
PQsetSingleRowMode 161
lo_lseek64 162
lo_tell64 163
lo_truncate64 164
......@@ -37,10 +37,16 @@
#include "libpq-int.h"
#include "libpq/libpq-fs.h" /* must come after sys/stat.h */
/* for ntohl/htonl */
#include <netinet/in.h>
#include <arpa/inet.h>
#define LO_BUFSIZE 8192
static int lo_initialize(PGconn *conn);
static Oid lo_import_internal(PGconn *conn, const char *filename, Oid oid);
static pg_int64 lo_hton64(pg_int64 host64);
static pg_int64 lo_ntoh64(pg_int64 net64);
/*
* lo_open
......@@ -174,6 +180,59 @@ lo_truncate(PGconn *conn, int fd, size_t len)
}
}
/*
* lo_truncate64
* truncates an existing large object to the given size
*
* returns 0 upon success
* returns -1 upon failure
*/
#ifdef HAVE_PG_INT64
int
lo_truncate64(PGconn *conn, int fd, pg_int64 len)
{
PQArgBlock argv[2];
PGresult *res;
int retval;
int result_len;
if (conn == NULL || conn->lobjfuncs == NULL)
{
if (lo_initialize(conn) < 0)
return -1;
}
if (conn->lobjfuncs->fn_lo_truncate64 == 0)
{
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("cannot determine OID of function lo_truncate64\n"));
return -1;
}
argv[0].isint = 1;
argv[0].len = 4;
argv[0].u.integer = fd;
len = lo_hton64(len);
argv[1].isint = 0;
argv[1].len = 8;
argv[1].u.ptr = (int *) &len;
res = PQfn(conn, conn->lobjfuncs->fn_lo_truncate64,
&retval, &result_len, 1, argv, 2);
if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
PQclear(res);
return retval;
}
else
{
PQclear(res);
return -1;
}
}
#endif
/*
* lo_read
......@@ -310,6 +369,63 @@ lo_lseek(PGconn *conn, int fd, int offset, int whence)
}
}
/*
* lo_lseek64
* change the current read or write location on a large object
* currently, only L_SET is a legal value for whence
*
*/
#ifdef HAVE_PG_INT64
pg_int64
lo_lseek64(PGconn *conn, int fd, pg_int64 offset, int whence)
{
PQArgBlock argv[3];
PGresult *res;
pg_int64 retval;
int result_len;
if (conn == NULL || conn->lobjfuncs == NULL)
{
if (lo_initialize(conn) < 0)
return -1;
}
if (conn->lobjfuncs->fn_lo_lseek64 == 0)
{
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("cannot determine OID of function lo_lseek64\n"));
return -1;
}
argv[0].isint = 1;
argv[0].len = 4;
argv[0].u.integer = fd;
offset = lo_hton64(offset);
argv[1].isint = 0;
argv[1].len = 8;
argv[1].u.ptr = (int *) &offset;
argv[2].isint = 1;
argv[2].len = 4;
argv[2].u.integer = whence;
res = PQfn(conn, conn->lobjfuncs->fn_lo_lseek64,
(int *)&retval, &result_len, 0, argv, 3);
if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
PQclear(res);
return lo_ntoh64((pg_int64)retval);
}
else
{
PQclear(res);
return -1;
}
}
#endif
/*
* lo_creat
* create a new large object
......@@ -435,6 +551,52 @@ lo_tell(PGconn *conn, int fd)
}
}
/*
* lo_tell64
* returns the current seek location of the large object
*
*/
#ifdef HAVE_PG_INT64
pg_int64
lo_tell64(PGconn *conn, int fd)
{
pg_int64 retval;
PQArgBlock argv[1];
PGresult *res;
int result_len;
if (conn == NULL || conn->lobjfuncs == NULL)
{
if (lo_initialize(conn) < 0)
return -1;
}
if (conn->lobjfuncs->fn_lo_tell64 == 0)
{
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("cannot determine OID of function lo_tell64\n"));
return -1;
}
argv[0].isint = 1;
argv[0].len = 4;
argv[0].u.integer = fd;
res = PQfn(conn, conn->lobjfuncs->fn_lo_tell64,
(int *) &retval, &result_len, 0, argv, 1);
if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
PQclear(res);
return lo_ntoh64((pg_int64) retval);
}
else
{
PQclear(res);
return -1;
}
}
#endif
/*
* lo_unlink
* delete a file
......@@ -713,8 +875,11 @@ lo_initialize(PGconn *conn)
"'lo_create', "
"'lo_unlink', "
"'lo_lseek', "
"'lo_lseek64', "
"'lo_tell', "
"'lo_tell64', "
"'lo_truncate', "
"'lo_truncate64', "
"'loread', "
"'lowrite') "
"and pronamespace = (select oid from pg_catalog.pg_namespace "
......@@ -765,10 +930,16 @@ lo_initialize(PGconn *conn)
lobjfuncs->fn_lo_unlink = foid;
else if (strcmp(fname, "lo_lseek") == 0)
lobjfuncs->fn_lo_lseek = foid;
else if (strcmp(fname, "lo_lseek64") == 0)
lobjfuncs->fn_lo_lseek64 = foid;
else if (strcmp(fname, "lo_tell") == 0)
lobjfuncs->fn_lo_tell = foid;
else if (strcmp(fname, "lo_tell64") == 0)
lobjfuncs->fn_lo_tell64 = foid;
else if (strcmp(fname, "lo_truncate") == 0)
lobjfuncs->fn_lo_truncate = foid;
else if (strcmp(fname, "lo_truncate64") == 0)
lobjfuncs->fn_lo_truncate64 = foid;
else if (strcmp(fname, "loread") == 0)
lobjfuncs->fn_lo_read = foid;
else if (strcmp(fname, "lowrite") == 0)
......@@ -836,10 +1007,76 @@ lo_initialize(PGconn *conn)
free(lobjfuncs);
return -1;
}
if (conn->sversion >= 90300)
{
if (lobjfuncs->fn_lo_lseek64 == 0)
{
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("cannot determine OID of function lo_lseek64\n"));
free(lobjfuncs);
return -1;
}
if (lobjfuncs->fn_lo_tell64 == 0)
{
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("cannot determine OID of function lo_tell64\n"));
free(lobjfuncs);
return -1;
}
if (lobjfuncs->fn_lo_truncate64 == 0)
{
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("cannot determine OID of function lo_truncate64\n"));
free(lobjfuncs);
return -1;
}
}
/*
* Put the structure into the connection control
*/
conn->lobjfuncs = lobjfuncs;
return 0;
}
/*
* lo_hton64
* converts an 64-bit integer from host byte order to network byte order
*/
static pg_int64
lo_hton64(pg_int64 host64)
{
pg_int64 result;
uint32_t h32, l32;
/* High order half first, since we're doing MSB-first */
h32 = (uint32_t) (host64 >> 32);
/* Now the low order half */
l32 = (uint32_t) (host64 & 0xffffffff);
result = htonl(l32);
result <<= 32;
result |= htonl(h32);
return result;
}
/*
* lo_ntoh64
* converts an 64-bit integer from network byte order to host byte order
*/
static pg_int64
lo_ntoh64(pg_int64 net64)
{
pg_int64 result;
uint32_t h32, l32;
l32 = (uint32_t) (net64 >> 32);
h32 = (uint32_t) (net64 & 0xffffffff);
result = ntohl(h32);
result <<= 32;
result |= ntohl(l32);
return result;
}
......@@ -548,6 +548,12 @@ extern Oid lo_import(PGconn *conn, const char *filename);
extern Oid lo_import_with_oid(PGconn *conn, const char *filename, Oid lobjId);
extern int lo_export(PGconn *conn, Oid lobjId, const char *filename);
#ifdef HAVE_PG_INT64
extern pg_int64 lo_lseek64(PGconn *conn, int fd, pg_int64 offset, int whence);
extern pg_int64 lo_tell64(PGconn *conn, int fd);
extern int lo_truncate64(PGconn *conn, int fd, pg_int64 len);
#endif
/* === in fe-misc.c === */
/* Get the version of the libpq library in use */
......
......@@ -271,8 +271,11 @@ typedef struct pgLobjfuncs
Oid fn_lo_create; /* OID of backend function lo_create */
Oid fn_lo_unlink; /* OID of backend function lo_unlink */
Oid fn_lo_lseek; /* OID of backend function lo_lseek */
Oid fn_lo_lseek64; /* OID of backend function lo_lseek64 */
Oid fn_lo_tell; /* OID of backend function lo_tell */
Oid fn_lo_tell64; /* OID of backend function lo_tell64 */
Oid fn_lo_truncate; /* OID of backend function lo_truncate */
Oid fn_lo_truncate64; /* OID of backend function lo_truncate64 */
Oid fn_lo_read; /* OID of backend function LOread */
Oid fn_lo_write; /* OID of backend function LOwrite */
} PGlobjfuncs;
......
......@@ -14,7 +14,7 @@ override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
override LDLIBS := $(libpq_pgport) $(LDLIBS)
PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo
PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo testlo64
all: $(PROGS)
......
/*-------------------------------------------------------------------------
*
* testlo64.c
* test using large objects with libpq using 64-bit APIs
*
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/test/examples/testlo64.c
*
*-------------------------------------------------------------------------
*/
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include "libpq-fe.h"
#include "libpq/libpq-fs.h"
#define BUFSIZE 1024
/*
* importFile -
* import file "in_filename" into database as large object "lobjOid"
*
*/
static Oid
importFile(PGconn *conn, char *filename)
{
Oid lobjId;
int lobj_fd;
char buf[BUFSIZE];
int nbytes,
tmp;
int fd;
/*
* open the file to be read in
*/
fd = open(filename, O_RDONLY, 0666);
if (fd < 0)
{ /* error */
fprintf(stderr, "can't open unix file\"%s\"\n", filename);
}
/*
* create the large object
*/
lobjId = lo_creat(conn, INV_READ | INV_WRITE);
if (lobjId == 0)
fprintf(stderr, "can't create large object");
lobj_fd = lo_open(conn, lobjId, INV_WRITE);
/*
* read in from the Unix file and write to the inversion file
*/
while ((nbytes = read(fd, buf, BUFSIZE)) > 0)
{
tmp = lo_write(conn, lobj_fd, buf, nbytes);
if (tmp < nbytes)
fprintf(stderr, "error while reading \"%s\"", filename);
}
close(fd);
lo_close(conn, lobj_fd);
return lobjId;
}
static void
pickout(PGconn *conn, Oid lobjId, pg_int64 start, int len)
{
int lobj_fd;
char *buf;
int nbytes;
int nread;
pg_int64 pos;
lobj_fd = lo_open(conn, lobjId, INV_READ);
if (lobj_fd < 0)
fprintf(stderr, "can't open large object %u", lobjId);
if (lo_tell64(conn, lobj_fd) < 0)
{
fprintf(stderr, "error lo_tell64: %s\n", PQerrorMessage(conn));
}
if ((pos=lo_lseek64(conn, lobj_fd, start, SEEK_SET)) < 0)
{
fprintf(stderr, "error lo_lseek64: %s\n", PQerrorMessage(conn));
return;
}
fprintf(stderr, "before read: retval of lo_lseek64 : %lld\n", (long long int) pos);
buf = malloc(len + 1);
nread = 0;
while (len - nread > 0)
{
nbytes = lo_read(conn, lobj_fd, buf, len - nread);
buf[nbytes] = '\0';
fprintf(stderr, ">>> %s", buf);
nread += nbytes;
if (nbytes <= 0)
break; /* no more data? */
}
free(buf);
fprintf(stderr, "\n");
pos = lo_tell64(conn, lobj_fd);
fprintf(stderr, "after read: retval of lo_tell64 : %lld\n\n", (long long int) pos);
lo_close(conn, lobj_fd);
}
static void
overwrite(PGconn *conn, Oid lobjId, pg_int64 start, int len)
{
int lobj_fd;
char *buf;
int nbytes;
int nwritten;
int i;
pg_int64 pos;
lobj_fd = lo_open(conn, lobjId, INV_READ | INV_WRITE);
if (lobj_fd < 0)
fprintf(stderr, "can't open large object %u", lobjId);
if ((pos=lo_lseek64(conn, lobj_fd, start, SEEK_SET)) < 0)
{
fprintf(stderr, "error lo_lseek64: %s\n", PQerrorMessage(conn));
return;
}
fprintf(stderr, "before write: retval of lo_lseek64 : %lld\n", (long long int) pos);
buf = malloc(len + 1);
for (i = 0; i < len; i++)
buf[i] = 'X';
buf[i] = '\0';
nwritten = 0;
while (len - nwritten > 0)
{
nbytes = lo_write(conn, lobj_fd, buf + nwritten, len - nwritten);
nwritten += nbytes;
if (nbytes <= 0)
{
fprintf(stderr, "\nWRITE FAILED!\n");
break;
}
}
free(buf);
pos = lo_tell64(conn, lobj_fd);
fprintf(stderr, "after write: retval of lo_tell64 : %lld\n\n", (long long int) pos);
lo_close(conn, lobj_fd);
}
static void
my_truncate(PGconn *conn, Oid lobjId, size_t len)
{
int lobj_fd;
lobj_fd = lo_open(conn, lobjId, INV_READ | INV_WRITE);
if (lobj_fd < 0)
fprintf(stderr, "can't open large object %u", lobjId);
if (lo_truncate64(conn, lobj_fd, len) < 0)
{
fprintf(stderr, "error lo_truncate64: %s\n", PQerrorMessage(conn));
return;
}
fprintf(stderr, "\n");
lo_close(conn, lobj_fd);
}
/*
* exportFile -
* export large object "lobjOid" to file "out_filename"
*
*/
static void
exportFile(PGconn *conn, Oid lobjId, char *filename)
{
int lobj_fd;
char buf[BUFSIZE];
int nbytes,
tmp;
int fd;
/*
* create an inversion "object"
*/
lobj_fd = lo_open(conn, lobjId, INV_READ);
if (lobj_fd < 0)
fprintf(stderr, "can't open large object %u", lobjId);
/*
* open the file to be written to
*/
fd = open(filename, O_CREAT | O_WRONLY | O_TRUNC, 0666);
if (fd < 0)
{ /* error */
fprintf(stderr, "can't open unix file\"%s\"",
filename);
}
/*
* read in from the Unix file and write to the inversion file
*/
while ((nbytes = lo_read(conn, lobj_fd, buf, BUFSIZE)) > 0)
{
tmp = write(fd, buf, nbytes);
if (tmp < nbytes)
{
fprintf(stderr, "error while writing \"%s\"",
filename);
}
}
lo_close(conn, lobj_fd);
close(fd);
return;
}
static void
exit_nicely(PGconn *conn)
{
PQfinish(conn);
exit(1);
}
int
main(int argc, char **argv)
{
char *in_filename,
*out_filename,
*out_filename2;
char *database;
Oid lobjOid;
PGconn *conn;
PGresult *res;
if (argc != 5)
{
fprintf(stderr, "Usage: %s database_name in_filename out_filename out_filename2\n",
argv[0]);
exit(1);
}
database = argv[1];
in_filename = argv[2];
out_filename = argv[3];
out_filename2 = argv[4];
/*
* set up the connection
*/
conn = PQsetdb(NULL, NULL, NULL, NULL, database);
/* check to see that the backend connection was successfully made */
if (PQstatus(conn) != CONNECTION_OK)
{
fprintf(stderr, "Connection to database failed: %s",
PQerrorMessage(conn));
exit_nicely(conn);
}
res = PQexec(conn, "begin");
PQclear(res);
printf("importing file \"%s\" ...\n", in_filename);
/* lobjOid = importFile(conn, in_filename); */
lobjOid = lo_import(conn, in_filename);
if (lobjOid == 0)
fprintf(stderr, "%s\n", PQerrorMessage(conn));
else
{
printf("\tas large object %u.\n", lobjOid);
printf("picking out bytes 4294967000-4294968000 of the large object\n");
pickout(conn, lobjOid, 4294967000ULL, 1000);
printf("overwriting bytes 4294967000-4294968000 of the large object with X's\n");
overwrite(conn, lobjOid, 4294967000ULL, 1000);
printf("exporting large object to file \"%s\" ...\n", out_filename);
/* exportFile(conn, lobjOid, out_filename); */
if (!lo_export(conn, lobjOid, out_filename))
fprintf(stderr, "%s\n", PQerrorMessage(conn));
printf("truncating to 3294968000 byte\n");
my_truncate(conn, lobjOid, 3294968000ULL);
printf("exporting truncated large object to file \"%s\" ...\n", out_filename2);
if (!lo_export(conn, lobjOid, out_filename2))
fprintf(stderr, "%s\n", PQerrorMessage(conn));
}
res = PQexec(conn, "end");
PQclear(res);
PQfinish(conn);
return 0;
}
......@@ -125,6 +125,29 @@ SELECT lo_tell(fd) FROM lotest_stash_values;
SELECT lo_close(fd) FROM lotest_stash_values;
END;
-- Test 64-bit largelbject functions.
BEGIN;
UPDATE lotest_stash_values SET fd = lo_open(loid, CAST(x'20000' | x'40000' AS integer));
SELECT lo_lseek64(fd, 4294967296, 0) FROM lotest_stash_values;
SELECT lowrite(fd, 'offset:4GB') FROM lotest_stash_values;
SELECT lo_tell64(fd) FROM lotest_stash_values;
SELECT lo_lseek64(fd, -10, 1) FROM lotest_stash_values;
SELECT lo_tell64(fd) FROM lotest_stash_values;
SELECT loread(fd, 10) FROM lotest_stash_values;
SELECT lo_truncate64(fd, 5000000000) FROM lotest_stash_values;
SELECT lo_lseek64(fd, 0, 2) FROM lotest_stash_values;
SELECT lo_tell64(fd) FROM lotest_stash_values;
SELECT lo_truncate64(fd, 3000000000) FROM lotest_stash_values;
SELECT lo_lseek64(fd, 0, 2) FROM lotest_stash_values;
SELECT lo_tell64(fd) FROM lotest_stash_values;
SELECT lo_close(fd) FROM lotest_stash_values;
END;
-- lo_unlink(lobjId oid) returns integer
-- return value appears to always be 1
SELECT lo_unlink(loid) from lotest_stash_values;
......
......@@ -209,6 +209,88 @@ SELECT lo_close(fd) FROM lotest_stash_values;
0
(1 row)
END;
-- Test 64-bit largelbject functions.
BEGIN;
UPDATE lotest_stash_values SET fd = lo_open(loid, CAST(x'20000' | x'40000' AS integer));
SELECT lo_lseek64(fd, 4294967296, 0) FROM lotest_stash_values;
lo_lseek64
------------
4294967296
(1 row)
SELECT lowrite(fd, 'offset:4GB') FROM lotest_stash_values;
lowrite
---------
10
(1 row)
SELECT lo_tell64(fd) FROM lotest_stash_values;
lo_tell64
------------
4294967306
(1 row)
SELECT lo_lseek64(fd, -10, 1) FROM lotest_stash_values;
lo_lseek64
------------
4294967296
(1 row)
SELECT lo_tell64(fd) FROM lotest_stash_values;
lo_tell64
------------
4294967296
(1 row)
SELECT loread(fd, 10) FROM lotest_stash_values;
loread
------------
offset:4GB
(1 row)
SELECT lo_truncate64(fd, 5000000000) FROM lotest_stash_values;
lo_truncate64
---------------
0
(1 row)
SELECT lo_lseek64(fd, 0, 2) FROM lotest_stash_values;
lo_lseek64
------------
5000000000
(1 row)
SELECT lo_tell64(fd) FROM lotest_stash_values;
lo_tell64
------------
5000000000
(1 row)
SELECT lo_truncate64(fd, 3000000000) FROM lotest_stash_values;
lo_truncate64
---------------
0
(1 row)
SELECT lo_lseek64(fd, 0, 2) FROM lotest_stash_values;
lo_lseek64
------------
3000000000
(1 row)
SELECT lo_tell64(fd) FROM lotest_stash_values;
lo_tell64
------------
3000000000
(1 row)
SELECT lo_close(fd) FROM lotest_stash_values;
lo_close
----------
0
(1 row)
END;
-- lo_unlink(lobjId oid) returns integer
-- return value appears to always be 1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册