diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index 89a80c23970284bc3b507ca3ce14ed78c8a8ca3a..06c54a456dcd6c94e4105acd4babc4a418c3e121 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -7,7 +7,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.10 1997/01/25 21:08:09 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.11 1997/03/24 08:48:09 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -19,6 +19,7 @@ #include #include #include +#include #ifndef HAVE_MEMMOVE # include @@ -33,6 +34,7 @@ static void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf); static OffsetNumber _bt_pgaddtup(Relation rel, Buffer buf, int keysz, ScanKey itup_scankey, Size itemsize, BTItem btitem, BTItem afteritem); static bool _bt_goesonpg(Relation rel, Buffer buf, Size keysz, ScanKey scankey, BTItem afteritem); static void _bt_updateitem(Relation rel, Size keysz, Buffer buf, Oid bti_oid, BTItem newItem); +static bool _bt_isequal (TupleDesc itupdesc, Page page, OffsetNumber offnum, int keysz, ScanKey scankey); /* * _bt_doinsert() -- Handle insertion of a single btitem in the tree. @@ -104,8 +106,16 @@ _bt_doinsert(Relation rel, BTItem btitem, bool index_is_unique, Relation heapRel itupdesc = RelationGetTupleDescriptor(rel); nbuf = InvalidBuffer; opaque = (BTPageOpaque) PageGetSpecialPointer(page); + /* + * _bt_compare returns 0 for (1,NULL) and (1,NULL) - + * this's how we handling NULLs - and so we must not use + * _bt_compare in real comparison, but only for + * ordering/finding items on pages. - vadim 03/24/97 + while ( !_bt_compare (rel, itupdesc, page, natts, itup_scankey, offset) ) + */ + while ( _bt_isequal (itupdesc, page, offset, natts, itup_scankey) ) { /* they're equal */ btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offset)); itup = &(btitem->bti_itup); @@ -123,8 +133,8 @@ _bt_doinsert(Relation rel, BTItem btitem, bool index_is_unique, Relation heapRel { /* move right ? */ if ( P_RIGHTMOST (opaque) ) break; - if ( _bt_compare (rel, itupdesc, page, - natts, itup_scankey, P_HIKEY) ) + if ( !_bt_isequal (itupdesc, page, P_HIKEY, + natts, itup_scankey) ) break; /* * min key of the right page is the same, @@ -939,18 +949,70 @@ _bt_itemcmp(Relation rel, IndexTuple indexTuple1, indexTuple2; Datum attrDatum1, attrDatum2; int i; - bool isNull; + bool isFirstNull, isSecondNull; bool compare; + bool useEqual = false; + + if ( strat == BTLessEqualStrategyNumber ) + { + useEqual = true; + strat = BTLessStrategyNumber; + } + else if ( strat == BTGreaterEqualStrategyNumber ) + { + useEqual = true; + strat = BTGreaterStrategyNumber; + } tupDes = RelationGetTupleDescriptor(rel); indexTuple1 = &(item1->bti_itup); indexTuple2 = &(item2->bti_itup); for (i = 1; i <= keysz; i++) { - attrDatum1 = index_getattr(indexTuple1, i, tupDes, &isNull); - attrDatum2 = index_getattr(indexTuple2, i, tupDes, &isNull); - compare = _bt_invokestrat(rel, i, strat, attrDatum1, attrDatum2); - if (!compare) { + attrDatum1 = index_getattr(indexTuple1, i, tupDes, &isFirstNull); + attrDatum2 = index_getattr(indexTuple2, i, tupDes, &isSecondNull); + + /* see comments about NULLs handling in btbuild */ + if ( isFirstNull ) /* attr in item1 is NULL */ + { + if ( isSecondNull ) /* attr in item2 is NULL too */ + compare = ( strat == BTEqualStrategyNumber ) ? true : false; + else + compare = ( strat == BTGreaterStrategyNumber ) ? true : false; + } + else if ( isSecondNull ) /* attr in item1 is NOT_NULL and */ + { /* and attr in item2 is NULL */ + compare = ( strat == BTLessStrategyNumber ) ? true : false; + } + else + { + compare = _bt_invokestrat(rel, i, strat, attrDatum1, attrDatum2); + } + + if ( compare ) /* true for one of ">, <, =" */ + { + if ( strat != BTEqualStrategyNumber ) + return (true); + } + else /* false for one of ">, <, =" */ + { + if ( strat == BTEqualStrategyNumber ) + return (false); + /* + * if original strat was "<=, >=" OR + * "<, >" but some attribute(s) left + * - need to test for Equality + */ + if ( useEqual || i < keysz ) + { + if ( isFirstNull || isSecondNull ) + compare = ( isFirstNull && isSecondNull ) ? true : false; + else + compare = _bt_invokestrat(rel, i, BTEqualStrategyNumber, + attrDatum1, attrDatum2); + if ( compare ) /* item1' and item2' attributes are equal */ + continue; /* - try to compare next attributes */ + } return (false); } } @@ -1015,3 +1077,45 @@ _bt_updateitem(Relation rel, ItemPointerCopy(&itemPtrData, &(oldIndexTuple->t_tid)); } + +/* + * _bt_isequal - used in _bt_doinsert in check for duplicates. + * + * Rule is simple: NOT_NULL not equal NULL, NULL not_equal NULL too. + */ +static bool +_bt_isequal (TupleDesc itupdesc, Page page, OffsetNumber offnum, + int keysz, ScanKey scankey) +{ + Datum datum; + BTItem btitem; + IndexTuple itup; + ScanKey entry; + AttrNumber attno; + long result; + int i; + bool null; + + btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum)); + itup = &(btitem->bti_itup); + + for (i = 1; i <= keysz; i++) + { + entry = &scankey[i - 1]; + attno = entry->sk_attno; + Assert (attno == i); + datum = index_getattr(itup, attno, itupdesc, &null); + + /* NULLs are not equal */ + if ( entry->sk_flags & SK_ISNULL || null ) + return (false); + + result = (long) FMGR_PTR2(entry->sk_func, entry->sk_procedure, + entry->sk_argument, datum); + if (result != 0) + return (false); + } + + /* by here, the keys are equal */ + return (true); +} diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c index 0fe6787c010e6e02a07bde294c9d9d47917c6f6b..e75814dd8e830a45367fb19532af9996c9694ca7 100644 --- a/src/backend/access/nbtree/nbtree.c +++ b/src/backend/access/nbtree/nbtree.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.16 1997/03/18 18:38:35 scrappy Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.17 1997/03/24 08:48:11 vadim Exp $ * * NOTES * This file contains only the public interface routines. @@ -219,11 +219,21 @@ btbuild(Relation heap, * artifact of the strategy map architecture chosen in 1986, not * of the way nulls are handled here. */ - + /* + * New comments: NULLs handling. + * While we can't do NULL comparison, we can follow simple + * rule for ordering items on btree pages - NULLs greater + * NOT_NULLs and NULL = NULL is TRUE. Sure, it's just rule + * for placing/finding items and no more - keytest'll return + * FALSE for a = 5 for items having 'a' isNULL. + * Look at _bt_skeycmp, _bt_compare and _bt_itemcmp for + * how it works. - vadim 03/23/97 + if (itup->t_info & INDEX_NULL_MASK) { pfree(itup); continue; } + */ itup->t_tid = htup->t_ctid; btitem = _bt_formitem(itup); @@ -328,8 +338,12 @@ btinsert(Relation rel, Datum *datum, char *nulls, ItemPointer ht_ctid, Relation itup = index_formtuple(RelationGetTupleDescriptor(rel), datum, nulls); itup->t_tid = *ht_ctid; + /* + * See comments in btbuild. + if (itup->t_info & INDEX_NULL_MASK) return ((InsertIndexResult) NULL); + */ btitem = _bt_formitem(itup); @@ -423,7 +437,7 @@ btrescan(IndexScanDesc scan, bool fromEnd, ScanKey scankey) /* reset the scan key */ so->numberOfKeys = scan->numberOfKeys; - so->numberOfFirstKeys = 0; + so->numberOfFirstKeys = 0; /* may be changed by _bt_orderkeys */ so->qual_ok = 1; /* may be changed by _bt_orderkeys */ if (scan->numberOfKeys > 0) { memmove(scan->keyData, @@ -433,10 +447,7 @@ btrescan(IndexScanDesc scan, bool fromEnd, ScanKey scankey) scankey, so->numberOfKeys * sizeof(ScanKeyData)); /* order the keys in the qualification */ - if (so->numberOfKeys > 1) - _bt_orderkeys(scan->relation, so); - else - so->numberOfFirstKeys = 1; + _bt_orderkeys(scan->relation, so); } /* finally, be sure that the scan exploits the tree order */ @@ -499,9 +510,10 @@ btendscan(IndexScanDesc scan) ItemPointerSetInvalid(iptr); } - pfree (scan->opaque); if ( so->keyData != (ScanKey) NULL ) pfree (so->keyData); + pfree (so); + _bt_dropscan(scan); } diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c index 2e802ee852728f0d8b7a6bf92708321e3ae13066..99fb38f18cecbf8340dd85ce202d0ceae33df369 100644 --- a/src/backend/access/nbtree/nbtsearch.c +++ b/src/backend/access/nbtree/nbtsearch.c @@ -7,7 +7,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.15 1997/03/18 18:38:41 scrappy Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.16 1997/03/24 08:48:12 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -19,6 +19,7 @@ #include #include #include +#include #ifndef HAVE_MEMMOVE # include @@ -238,7 +239,20 @@ _bt_skeycmp(Relation rel, Datum keyDatum; bool compare; bool isNull; + bool useEqual = false; + bool keyNull; + if ( strat == BTLessEqualStrategyNumber ) + { + useEqual = true; + strat = BTLessStrategyNumber; + } + else if ( strat == BTGreaterEqualStrategyNumber ) + { + useEqual = true; + strat = BTGreaterStrategyNumber; + } + item = (BTItem) PageGetItem(page, itemid); indexTuple = &(item->bti_itup); @@ -248,27 +262,60 @@ _bt_skeycmp(Relation rel, for (i=1; i <= keysz; i++) { entry = &scankey[i-1]; + Assert ( entry->sk_attno == i ); attrDatum = index_getattr(indexTuple, entry->sk_attno, tupDes, &isNull); keyDatum = entry->sk_argument; - - /* - * This may happen in a nested loop if an attribute used - * as scan key is null. DZ 29-10-1996 - */ - if ((entry->sk_flags & SK_ISNULL) || (isNull)) { - if ((entry->sk_flags & SK_ISNULL) && (isNull)) { - return (true); - } else { - return (false); - } + + /* see comments about NULLs handling in btbuild */ + if ( entry->sk_flags & SK_ISNULL ) /* key is NULL */ + { + Assert ( entry->sk_procedure == NullValueRegProcedure ); + keyNull = true; + if ( isNull ) + compare = ( strat == BTEqualStrategyNumber ) ? true : false; + else + compare = ( strat == BTGreaterStrategyNumber ) ? true : false; + } + else if ( isNull ) /* key is NOT_NULL and item is NULL */ + { + keyNull = false; + compare = ( strat == BTLessStrategyNumber ) ? true : false; + } + else + { + keyNull = false; + compare = _bt_invokestrat(rel, i, strat, keyDatum, attrDatum); } - compare = _bt_invokestrat(rel, i, strat, keyDatum, attrDatum); - if (!compare) + if ( compare ) /* true for one of ">, <, =" */ + { + if ( strat != BTEqualStrategyNumber ) + return (true); + } + else /* false for one of ">, <, =" */ + { + if ( strat == BTEqualStrategyNumber ) + return (false); + /* + * if original strat was "<=, >=" OR + * "<, >" but some attribute(s) left + * - need to test for Equality + */ + if ( useEqual || i < keysz ) + { + if ( keyNull || isNull ) + compare = ( keyNull && isNull ) ? true : false; + else + compare = _bt_invokestrat(rel, i, BTEqualStrategyNumber, + keyDatum, attrDatum); + if ( compare ) /* key' and item' attributes are equal */ + continue; /* - try to compare next attributes */ + } return (false); + } } return (true); @@ -520,20 +567,24 @@ _bt_compare(Relation rel, attno = entry->sk_attno; datum = index_getattr(itup, attno, itupdesc, &null); - /* - * This may happen in a nested loop if an attribute used - * as scan key is null. DZ 29-10-1996 - */ - if ((entry->sk_flags & SK_ISNULL) || (null)) { - if ((entry->sk_flags & SK_ISNULL) && (null)) { - return (0); - } else { - return (null ? +1 : -1); - } + /* see comments about NULLs handling in btbuild */ + if ( entry->sk_flags & SK_ISNULL ) /* key is NULL */ + { + Assert ( entry->sk_procedure == NullValueRegProcedure ); + if ( null ) + tmpres = (long) 0; /* NULL "=" NULL */ + else + tmpres = (long) 1; /* NULL ">" NOT_NULL */ + } + else if ( null ) /* key is NOT_NULL and item is NULL */ + { + tmpres = (long) -1; /* NOT_NULL "<" NULL */ + } + else + { + tmpres = (long) FMGR_PTR2(entry->sk_func, entry->sk_procedure, + entry->sk_argument, datum); } - - tmpres = (long) FMGR_PTR2(entry->sk_func, entry->sk_procedure, - entry->sk_argument, datum); result = tmpres; /* if the keys are unequal, return the difference */ @@ -566,6 +617,7 @@ _bt_next(IndexScanDesc scan, ScanDirection dir) BTItem btitem; IndexTuple itup; BTScanOpaque so; + Size keysok; rel = scan->relation; so = (BTScanOpaque) scan->opaque; @@ -596,8 +648,9 @@ _bt_next(IndexScanDesc scan, ScanDirection dir) btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum)); itup = &btitem->bti_itup; - if (_bt_checkqual(scan, itup)) + if ( _bt_checkkeys (scan, itup, &keysok) ) { + Assert (keysok == so->numberOfKeys); res = FormRetrieveIndexResult(current, &(itup->t_tid)); /* remember which buffer we have pinned and locked */ @@ -605,7 +658,7 @@ _bt_next(IndexScanDesc scan, ScanDirection dir) return (res); } - } while ( _bt_checkforkeys (scan, itup, so->numberOfFirstKeys) ); + } while ( keysok >= so->numberOfFirstKeys ); ItemPointerSetInvalid(current); so->btso_curbuf = InvalidBuffer; @@ -644,6 +697,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) int result; BTScanOpaque so; ScanKeyData skdata; + Size keysok; so = (BTScanOpaque) scan->opaque; if ( so->qual_ok == 0 ) /* may be set by _bt_orderkeys */ @@ -663,6 +717,12 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) * ordered to take advantage of index ordering) to position ourselves * at the right place in the scan. */ + /* _bt_orderkeys disallows it, but it's place to add some code latter */ + if ( so->keyData[0].sk_flags & SK_ISNULL ) + { + elog (WARN, "_bt_first: btree doesn't support is(not)null, yet"); + return ((RetrieveIndexResult) NULL); + } proc = index_getprocid(rel, 1, BTORDER_PROC); ScanKeyEntryInitialize(&skdata, so->keyData[0].sk_flags, 1, proc, so->keyData[0].sk_argument); @@ -706,6 +766,9 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) */ result = _bt_compare(rel, itupdesc, page, 1, &skdata, offnum); + + /* it's yet other place to add some code latter for is(not)null */ + strat = _bt_getstrat(rel, 1, so->keyData[0].sk_procedure); switch (strat) { @@ -798,14 +861,14 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum)); itup = &btitem->bti_itup; - if ( _bt_checkqual(scan, itup) ) + if ( _bt_checkkeys (scan, itup, &keysok) ) { res = FormRetrieveIndexResult(current, &(itup->t_tid)); /* remember which buffer we have pinned */ so->btso_curbuf = buf; } - else if ( _bt_checkforkeys (scan, itup, so->numberOfFirstKeys) ) + else if ( keysok >= so->numberOfFirstKeys ) { so->btso_curbuf = buf; return (_bt_next (scan, dir)); @@ -1081,6 +1144,7 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir) IndexTuple itup; BTScanOpaque so; RetrieveIndexResult res; + Size keysok; rel = scan->relation; current = &(scan->currentItemData); @@ -1223,13 +1287,14 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir) itup = &(btitem->bti_itup); /* see if we picked a winner */ - if (_bt_checkqual(scan, itup)) { + if ( _bt_checkkeys (scan, itup, &keysok) ) + { res = FormRetrieveIndexResult(current, &(itup->t_tid)); /* remember which buffer we have pinned */ so->btso_curbuf = buf; } - else if ( _bt_checkforkeys (scan, itup, so->numberOfFirstKeys) ) + else if ( keysok >= so->numberOfFirstKeys ) { so->btso_curbuf = buf; return (_bt_next (scan, dir)); diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index 7c3d5ba56394dd4e4e888f546ef27cebe9c3aede..7ec926f9e249ea2069727588dce6b2d0a51fbb91 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -5,7 +5,7 @@ * * * IDENTIFICATION - * $Id: nbtsort.c,v 1.12 1997/02/25 03:38:23 scrappy Exp $ + * $Id: nbtsort.c,v 1.13 1997/03/24 08:48:15 vadim Exp $ * * NOTES * @@ -137,11 +137,13 @@ typedef struct { * *------------------------------------------------------------------------- */ typedef struct { - Datum btsk_datum; + Datum *btsk_datum; + char *btsk_nulls; BTItem btsk_item; } BTSortKey; static Relation _bt_sortrel; +static int _bt_nattr; static BTSpool * _bt_inspool; static void @@ -149,26 +151,51 @@ _bt_isortcmpinit(Relation index, BTSpool *spool) { _bt_sortrel = index; _bt_inspool = spool; + _bt_nattr = index->rd_att->natts; } static int _bt_isortcmp(BTSortKey *k1, BTSortKey *k2) { - if (k1->btsk_item == (BTItem) NULL) { - if (k2->btsk_item == (BTItem) NULL) { + Datum *k1_datum = k1->btsk_datum; + Datum *k2_datum = k2->btsk_datum; + char *k1_nulls = k1->btsk_nulls; + char *k2_nulls = k2->btsk_nulls; + bool equal_isnull = false; + int i; + + if (k1->btsk_item == (BTItem) NULL) + { + if (k2->btsk_item == (BTItem) NULL) return(0); /* 1 = 2 */ - } return(1); /* 1 > 2 */ - } else if (k2->btsk_item == (BTItem) NULL) { - return(-1); /* 1 < 2 */ - } else if (_bt_invokestrat(_bt_sortrel, 1, BTGreaterStrategyNumber, - k1->btsk_datum, k2->btsk_datum)) { - return(1); /* 1 > 2 */ - } else if (_bt_invokestrat(_bt_sortrel, 1, BTGreaterStrategyNumber, - k2->btsk_datum, k1->btsk_datum)) { + } + else if (k2->btsk_item == (BTItem) NULL) return(-1); /* 1 < 2 */ + + for (i = 0; i < _bt_nattr; i++) + { + if ( k1_nulls[i] != ' ' ) /* k1 attr is NULL */ + { + if ( k2_nulls[i] != ' ' ) /* the same for k2 */ + { + equal_isnull = true; + continue; + } + return (1); /* NULL ">" NOT_NULL */ + } + else if ( k2_nulls[i] != ' ' ) /* k2 attr is NULL */ + return (-1); /* NOT_NULL "<" NULL */ + + if (_bt_invokestrat(_bt_sortrel, i+1, BTGreaterStrategyNumber, + k1_datum[i], k2_datum[i])) + return(1); /* 1 > 2 */ + else if (_bt_invokestrat(_bt_sortrel, i+1, BTGreaterStrategyNumber, + k2_datum[i], k1_datum[i])) + return(-1); /* 1 < 2 */ } - if ( _bt_inspool->isunique ) + + if ( _bt_inspool->isunique && !equal_isnull ) { _bt_spooldestroy ((void*)_bt_inspool); elog (WARN, "Cannot create unique index. Table contains non-unique values"); @@ -180,15 +207,29 @@ static void _bt_setsortkey(Relation index, BTItem bti, BTSortKey *sk) { sk->btsk_item = (BTItem) NULL; - sk->btsk_datum = (Datum) NULL; - if (bti != (BTItem) NULL) { + sk->btsk_datum = (Datum*) NULL; + sk->btsk_nulls = (char*) NULL; + + if (bti != (BTItem) NULL) + { + IndexTuple it = &(bti->bti_itup); + TupleDesc itdesc = index->rd_att; + Datum *dp = (Datum*) palloc (_bt_nattr * sizeof (Datum)); + char *np = (char*) palloc (_bt_nattr * sizeof (char)); bool isnull; - Datum d = index_getattr(&(bti->bti_itup), 1, index->rd_att, &isnull); - - if (!isnull) { - sk->btsk_item = bti; - sk->btsk_datum = d; + int i; + + for (i = 0; i < _bt_nattr; i++) + { + dp[i] = index_getattr(it, i+1, itdesc, &isnull); + if ( isnull ) + np[i] = 'n'; + else + np[i] = ' '; } + sk->btsk_item = bti; + sk->btsk_datum = dp; + sk->btsk_nulls = np; } } @@ -622,27 +663,25 @@ _bt_spool(Relation index, BTItem btitem, void *spool) BTItem bti; char *pos; int btisz; + int it_ntup = itape->bttb_ntup; int i; /* * build an array of pointers to the BTItemDatas on the input * block. */ - if (itape->bttb_ntup > 0) { + if (it_ntup > 0) { parray = - (BTSortKey *) palloc(itape->bttb_ntup * sizeof(BTSortKey)); - if (parray == (BTSortKey *) NULL) { - elog(WARN, "_bt_spool: out of memory"); - } + (BTSortKey *) palloc(it_ntup * sizeof(BTSortKey)); pos = itape->bttb_data; - for (i = 0; i < itape->bttb_ntup; ++i) { + for (i = 0; i < it_ntup; ++i) { _bt_setsortkey(index, _bt_tapenext(itape, &pos), &(parray[i])); } /* * qsort the pointer array. */ - qsort((void *) parray, itape->bttb_ntup, sizeof(BTSortKey), + qsort((void *) parray, it_ntup, sizeof(BTSortKey), (int (*)(const void *,const void *))_bt_isortcmp); } @@ -656,7 +695,7 @@ _bt_spool(Relation index, BTItem btitem, void *spool) * block..) */ otape = btspool->bts_otape[btspool->bts_tape]; - for (i = 0; i < itape->bttb_ntup; ++i) { + for (i = 0; i < it_ntup; ++i) { bti = parray[i].btsk_item; btisz = BTITEMSZ(bti); btisz = DOUBLEALIGN(btisz); @@ -694,7 +733,15 @@ _bt_spool(Relation index, BTItem btitem, void *spool) /* * destroy the pointer array. */ - if (parray != (BTSortKey *) NULL) { + if (parray != (BTSortKey *) NULL) + { + for (i = 0; i < it_ntup; i++) + { + if ( parray[i].btsk_datum != (Datum*) NULL ) + pfree ((void*)(parray[i].btsk_datum)); + if ( parray[i].btsk_nulls != (char*) NULL ) + pfree ((void*)(parray[i].btsk_nulls)); + } pfree((void *) parray); } } @@ -976,7 +1023,7 @@ _bt_buildadd(Relation index, void *pstate, BTItem bti, int flags) #endif if (last_bti == (BTItem) NULL) { first_off = P_FIRSTKEY; - } else if (!_bt_itemcmp(index, 1, bti, last_bti, BTEqualStrategyNumber)) { + } else if (!_bt_itemcmp(index, _bt_nattr, bti, last_bti, BTEqualStrategyNumber)) { first_off = off; } last_off = off; @@ -1044,6 +1091,7 @@ _bt_merge(Relation index, BTSpool *btspool) BTPageState *state; BTPriQueue q; BTPriQueueElem e; + BTSortKey btsk; BTItem bti; BTTapeBlock *itape; BTTapeBlock *otape; @@ -1136,7 +1184,8 @@ _bt_merge(Relation index, BTSpool *btspool) * if it hits either End-Of-Run or EOF. */ t = e.btpqe_tape; - bti = e.btpqe_item.btsk_item; + btsk = e.btpqe_item; + bti = btsk.btsk_item; if (bti != (BTItem) NULL) { btisz = BTITEMSZ(bti); btisz = DOUBLEALIGN(btisz); @@ -1177,6 +1226,12 @@ _bt_merge(Relation index, BTSpool *btspool) } #endif /* FASTBUILD_DEBUG && FASTBUILD_MERGE */ } + + if ( btsk.btsk_datum != (Datum*) NULL ) + pfree ((void*)(btsk.btsk_datum)); + if ( btsk.btsk_nulls != (char*) NULL ) + pfree ((void*)(btsk.btsk_nulls)); + } itape = btspool->bts_itape[t]; if (!tapedone[t]) { diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c index 6d0a40ef132fbdbdfac92de0676cb6008d4ca501..fa2ff890fe9c5a19e3c8748585c708c10a0b5183 100644 --- a/src/backend/access/nbtree/nbtutils.c +++ b/src/backend/access/nbtree/nbtutils.c @@ -7,7 +7,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtutils.c,v 1.8 1997/03/18 18:38:46 scrappy Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtutils.c,v 1.9 1997/03/24 08:48:16 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -20,6 +20,11 @@ #include #include #include +#include +#include + +extern int NIndexTupleProcessed; + #ifndef HAVE_MEMMOVE # include @@ -37,6 +42,7 @@ _bt_mkscankey(Relation rel, IndexTuple itup) Datum arg; RegProcedure proc; bool null; + bits16 flag; natts = rel->rd_rel->relnatts; itupdesc = RelationGetTupleDescriptor(rel); @@ -45,9 +51,18 @@ _bt_mkscankey(Relation rel, IndexTuple itup) for (i = 0; i < natts; i++) { arg = index_getattr(itup, i + 1, itupdesc, &null); - proc = index_getprocid(rel, i + 1, BTORDER_PROC); + if ( null ) + { + proc = NullValueRegProcedure; + flag = SK_ISNULL; + } + else + { + proc = index_getprocid(rel, i + 1, BTORDER_PROC); + flag = 0x0; + } ScanKeyEntryInitialize(&skey[i], - 0x0, (AttrNumber) (i + 1), proc, arg); + flag, (AttrNumber) (i + 1), proc, arg); } return (skey); @@ -90,22 +105,35 @@ _bt_orderkeys(Relation relation, BTScanOpaque so) int i, j; int init[BTMaxStrategyNumber+1]; ScanKey key; - uint16 numberOfKeys, new_numberOfKeys = 0; + uint16 numberOfKeys = so->numberOfKeys; + uint16 new_numberOfKeys = 0; AttrNumber attno = 1; - numberOfKeys = so->numberOfKeys; + if ( numberOfKeys < 1 ) + return; + key = so->keyData; - if ( numberOfKeys <= 1 ) + cur = &key[0]; + if ( cur->sk_attno != 1 ) + elog (WARN, "_bt_orderkeys: key(s) for attribute 1 missed"); + + if ( numberOfKeys == 1 ) + { + /* + * We don't use indices for 'A is null' and 'A is not null' + * currently and 'A < = > <> NULL' is non-sense' - so + * qual is not Ok. - vadim 03/21/97 + */ + if ( cur->sk_flags & SK_ISNULL ) + so->qual_ok = 0; + so->numberOfFirstKeys = 1; return; + } /* get space for the modified array of keys */ nbytes = BTMaxStrategyNumber * sizeof(ScanKeyData); xform = (ScanKey) palloc(nbytes); - - cur = &key[0]; - if ( cur->sk_attno != 1 ) - elog (WARN, "_bt_orderkeys: key(s) for attribute 1 missed"); memset(xform, 0, nbytes); map = IndexStrategyGetStrategyMap(RelationGetIndexStrategy(relation), @@ -119,6 +147,10 @@ _bt_orderkeys(Relation relation, BTScanOpaque so) { if ( i < numberOfKeys ) cur = &key[i]; + + if ( cur->sk_flags & SK_ISNULL ) /* see comments above */ + so->qual_ok = 0; + if ( i == numberOfKeys || cur->sk_attno != attno ) { if ( cur->sk_attno != attno + 1 && i < numberOfKeys ) @@ -243,6 +275,32 @@ _bt_orderkeys(Relation relation, BTScanOpaque so) pfree(xform); } +BTItem +_bt_formitem(IndexTuple itup) +{ + int nbytes_btitem; + BTItem btitem; + Size tuplen; + extern Oid newoid(); + + /* see comments in btbuild + + if (itup->t_info & INDEX_NULL_MASK) + elog(WARN, "btree indices cannot include null keys"); + */ + + /* make a copy of the index tuple with room for the sequence number */ + tuplen = IndexTupleSize(itup); + nbytes_btitem = tuplen + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + + btitem = (BTItem) palloc(nbytes_btitem); + memmove((char *) &(btitem->bti_itup), (char *) itup, tuplen); + + btitem->bti_oid = newoid(); + return (btitem); +} + bool _bt_checkqual(IndexScanDesc scan, IndexTuple itup) { @@ -269,26 +327,57 @@ _bt_checkforkeys(IndexScanDesc scan, IndexTuple itup, Size keysz) return (true); } -BTItem -_bt_formitem(IndexTuple itup) +bool +_bt_checkkeys (IndexScanDesc scan, IndexTuple tuple, Size *keysok) { - int nbytes_btitem; - BTItem btitem; - Size tuplen; - extern Oid newoid(); + BTScanOpaque so = (BTScanOpaque) scan->opaque; + Size keysz = so->numberOfKeys; + TupleDesc tupdesc; + ScanKey key; + Datum datum; + bool isNull; + int test; - /* disallow nulls in btree keys */ - if (itup->t_info & INDEX_NULL_MASK) - elog(WARN, "btree indices cannot include null keys"); + *keysok = 0; + if ( keysz == 0 ) + return (true); - /* make a copy of the index tuple with room for the sequence number */ - tuplen = IndexTupleSize(itup); - nbytes_btitem = tuplen + - (sizeof(BTItemData) - sizeof(IndexTupleData)); + key = so->keyData; + tupdesc = RelationGetTupleDescriptor(scan->relation); - btitem = (BTItem) palloc(nbytes_btitem); - memmove((char *) &(btitem->bti_itup), (char *) itup, tuplen); + IncrIndexProcessed(); - btitem->bti_oid = newoid(); - return (btitem); + while (keysz > 0) + { + datum = index_getattr(tuple, + key[0].sk_attno, + tupdesc, + &isNull); + + /* btree doesn't support 'A is null' clauses, yet */ + if ( isNull || key[0].sk_flags & SK_ISNULL ) + { + return (false); + } + + if (key[0].sk_flags & SK_COMMUTE) { + test = (int) (*(key[0].sk_func)) + (DatumGetPointer(key[0].sk_argument), + datum); + } else { + test = (int) (*(key[0].sk_func)) + (datum, + DatumGetPointer(key[0].sk_argument)); + } + + if (!test == !(key[0].sk_flags & SK_NEGATE)) { + return (false); + } + + keysz -= 1; + key++; + (*keysok)++; + } + + return (true); }