Skip to content

Commit

Permalink
Complete tests and fixes for no_gaps with numeric
Browse files Browse the repository at this point in the history
  • Loading branch information
jhf committed Nov 9, 2023
1 parent 7f9abdc commit ab536dc
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 67 deletions.
1 change: 1 addition & 0 deletions expected/33_no_gaps_numeric_test.out
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ WHERE job_id = 1;
f
(1 row)

SET client_min_messages TO NOTICE;
SELECT sql_saga.drop_unique_key('numeric_shifts', 'numeric_shifts_job_id_worker_id_valid');
drop_unique_key
-----------------
Expand Down
118 changes: 51 additions & 67 deletions no_gaps.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@


// Declarations/Prototypes
char *DatumGetString(TypeCacheEntry *typcache, RangeBound bound);
Datum DatumGet(TypeCacheEntry *typcache, RangeBound bound);
Datum DatumNegativeInfinity(TypeCacheEntry *typcache);
char *DatumGetString(Oid elem_oid, RangeBound bound);
Datum DatumNegativeInfinity(Oid elem_oid);

Datum no_gaps_transfn(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(no_gaps_transfn);
Expand Down Expand Up @@ -72,7 +71,8 @@ Datum no_gaps_transfn(PG_FUNCTION_ARGS)
RangeType *current_range,
*target_range;
RangeBound current_start, current_end;
TypeCacheEntry *typcache;
TypeCacheEntry *typcache, *elem_typcache;
Oid elem_oid;
bool current_empty;
bool first_time;

Expand Down Expand Up @@ -105,18 +105,25 @@ Datum no_gaps_transfn(PG_FUNCTION_ARGS)
memcpy(state->target, target_range, VARSIZE(target_range));
typcache = range_get_typcache(fcinfo, RangeTypeGetOid(state->target));
range_deserialize(typcache, state->target, &state->target_start, &state->target_end, &state->target_empty);

ereport(DEBUG1, (errmsg("target is [%ld, %ld)", DatumGet(typcache, state->target_start), DatumGet(typcache, state->target_end))));

// Initialize covered_to to negative infinity bound
oldContext = MemoryContextSwitchTo(aggContext);
state->covered_to.val = DatumNegativeInfinity(typcache);
MemoryContextSwitchTo(oldContext);
elem_oid = typcache->rngelemtype->type_id;
elem_typcache = lookup_type_cache(elem_oid, 0);

//ereport(DEBUG1, (errmsg("target is [%s, %s)", DatumGetString(elem_oid, state->target_start), DatumGetString(elem_oid, state->target_end))));

// Initialize covered_to to negative infinity bound, and make sure it is allocated for regular free.
//if (!elem_typcache->typbyval && elem_typcache->typlen == -1) {
// ereport(DEBUG1, (errmsg("Performing memory copy.")));
// oldContext = MemoryContextSwitchTo(aggContext);
// state->covered_to.val = datumCopy(DatumNegativeInfinity(elem_oid), /*pass by reference when false*/ false, /*dynamic length when -1*/ -1);
// MemoryContextSwitchTo(oldContext);
//} else {
// ereport(DEBUG1, (errmsg("Skipping memory copy.")));
state->covered_to.val = DatumNegativeInfinity(elem_oid);
//}
state->covered_to.infinite = true;
state->covered_to.inclusive = true;
state->covered_to.lower = true;

ereport(DEBUG1, (errmsg("initial covered_to is %ld", DatumGet(typcache, state->covered_to))));
//ereport(DEBUG1, (errmsg("initial covered_to is %s", DatumGetString(elem_oid, state->covered_to))));
} else {
// ereport(NOTICE, (errmsg("looking up state....")));
state = (no_gaps_state *)PG_GETARG_POINTER(0);
Expand All @@ -129,6 +136,9 @@ Datum no_gaps_transfn(PG_FUNCTION_ARGS)

// Make sure the second arg is always the same:
typcache = range_get_typcache(fcinfo, RangeTypeGetOid(state->target));
elem_oid = typcache->rngelemtype->type_id;
elem_typcache = lookup_type_cache(elem_oid, 0);

if (PG_ARGISNULL(2) || range_ne_internal(typcache, state->target, PG_GETARG_RANGE_P(2))) {
ereport(ERROR, (errmsg("no_gaps second argument must be constant across the group")));
}
Expand All @@ -146,8 +156,10 @@ Datum no_gaps_transfn(PG_FUNCTION_ARGS)

range_deserialize(typcache, current_range, &current_start, &current_end, &current_empty);

ereport(DEBUG1, (errmsg("current is [%ld, %ld)", DatumGet(typcache, current_start), DatumGet(typcache, current_end))));
ereport(DEBUG1, (errmsg("pre state->covered_to is %ld", DatumGet(typcache, state->covered_to))));
oldContext = MemoryContextSwitchTo(aggContext);
//ereport(DEBUG1, (errmsg("current is [%s, %s)", DatumGetString(elem_oid, current_start), DatumGetString(elem_oid, current_end))));
//ereport(DEBUG1, (errmsg("pre state->covered_to is %s", DatumGetString(elem_oid, state->covered_to))));
MemoryContextSwitchTo(oldContext);

if (first_time) {
// If the target range start is unbounded, but the current range start is not, then we cannot have full coverage
Expand All @@ -174,9 +186,9 @@ Datum no_gaps_transfn(PG_FUNCTION_ARGS)
// If the current range starts after the last covered range, it means the ranges are not sorted
if (range_cmp_bounds(typcache, &current_start, &state->covered_to) < 0) {
//ereport(ERROR, (errmsg(
// "no_gaps first argument should be sorted but got %ld after covering up to %ld",
// DatumGet(typcache, current_start),
// DatumGet(typcache, state->covered_to)
// "no_gaps first argument should be sorted but got %s after covering up to %s",
// DatumGetString(elem_oid, current_start),
// DatumGetString(elem_oid, state->covered_to)
//)));
ereport(ERROR, (errmsg(
"no_gaps first argument should be sorted but got a range ending before the last covered_to"
Expand All @@ -185,24 +197,19 @@ Datum no_gaps_transfn(PG_FUNCTION_ARGS)

// Update the covered range if the current range extends beyond it
if (range_cmp_bounds(typcache, &current_end, &state->covered_to) > 0) {
Oid datum_oid = typcache->rngelemtype->type_id;
TypeCacheEntry *datumtypcache;

state->covered_to = current_end;

datumtypcache = lookup_type_cache(datum_oid, 0);
if (!datumtypcache->typbyval && datumtypcache->typlen == -1) {
MemoryContext oldContext;
ereport(DEBUG1, (errmsg("Performing memory copy.")));
if (!elem_typcache->typbyval && elem_typcache->typlen == -1) {
//ereport(DEBUG1, (errmsg("Performing memory copy.")));
//ereport(DEBUG1, (errmsg("Before pfree(state->covered_to.val)")));
//pfree(state->covered_to.val);
oldContext = MemoryContextSwitchTo(aggContext);
ereport(DEBUG1, (errmsg("Before datumCopy.")));
// The first covered_to is not allocated on the stack, so it can not be freed...
//pfree(state->covered_to.val);
//ereport(DEBUG1, (errmsg("Before datumCopy.")));
state->covered_to.val = datumCopy(current_end.val, /*pass by reference when false*/ false, /*dynamic length when -1*/ -1);
ereport(DEBUG1, (errmsg("After datumCopy.")));
//ereport(DEBUG1, (errmsg("After datumCopy.")));
MemoryContextSwitchTo(oldContext);
} else {
ereport(DEBUG1, (errmsg("Skipping memory copy.")));
//ereport(DEBUG1, (errmsg("Skipping memory copy.")));
}

// Notice that the previous non-inclusive end is included in the next start.
Expand All @@ -214,7 +221,10 @@ Datum no_gaps_transfn(PG_FUNCTION_ARGS)
state->no_gaps = true;
state->finished = true;
}
ereport(DEBUG1, (errmsg("post state->covered_to is %ld", DatumGet(typcache, state->covered_to))));
//oldContext = MemoryContextSwitchTo(aggContext);
//ereport(DEBUG1, (errmsg("post state->covered_to is %s", DatumGetString(elem_oid, state->covered_to))));
//MemoryContextSwitchTo(oldContext);

PG_RETURN_POINTER(state);
}

Expand All @@ -233,34 +243,9 @@ Datum no_gaps_finalfn(PG_FUNCTION_ARGS)
}


Datum DatumGet(TypeCacheEntry *typcache, RangeBound bound)
{
Oid elem_type_id = typcache->rngelemtype->type_id;
switch (elem_type_id)
{
case INT4OID:
return DatumGetInt32(bound.val);
case INT8OID:
return DatumGetInt64(bound.val);
case DATEOID:
return DatumGetDateADT(bound.val);
case NUMERICOID:
return NumericGetDatum(bound.val);
case TIMESTAMPOID:
return DatumGetTimestamp(bound.val);
case TIMESTAMPTZOID:
return DatumGetTimestampTz(bound.val);
default:
elog(ERROR, "Unsupported element type id: %u", elem_type_id);
return (Datum) 0; // This line will not be reached due to the elog(ERROR) above
}
}


Datum DatumNegativeInfinity(TypeCacheEntry *typcache)
Datum DatumNegativeInfinity(Oid elem_oid)
{
Oid elem_type_id = typcache->rngelemtype->type_id;
switch (elem_type_id)
switch (elem_oid)
{
case INT4OID:
return Int32GetDatum(INT32_MIN);
Expand All @@ -274,17 +259,16 @@ Datum DatumNegativeInfinity(TypeCacheEntry *typcache)
case TIMESTAMPTZOID:
return DatumGetTimestampTz(DT_NOBEGIN);
default:
elog(ERROR, "Unsupported range type: %u", elem_type_id);
elog(ERROR, "Unsupported range type: %u", elem_oid);
return (Datum) 0; // This line will not be reached due to the elog(ERROR) above
}
}


char *DatumGetString(TypeCacheEntry *typcache, RangeBound bound) {
Oid elem_type_id = typcache->rngelemtype->type_id;
char *DatumGetString(Oid elem_oid, RangeBound bound) {
char *result;

switch (elem_type_id) {
switch (elem_oid) {
case INT4OID:
result = psprintf("%d", DatumGetInt32(bound.val));
break;
Expand All @@ -294,29 +278,29 @@ char *DatumGetString(TypeCacheEntry *typcache, RangeBound bound) {
case DATEOID: {
char *dateStr = DatumGetCString(DirectFunctionCall1(date_out, bound.val));
result = psprintf("%s", dateStr);
pfree(dateStr);
//pfree(dateStr);
break;
}
case NUMERICOID: {
char *numericStr = DatumGetCString(DirectFunctionCall1(numeric_out, bound.val));
result = psprintf("%s", numericStr);
pfree(numericStr);
//pfree(numericStr);
break;
}
case TIMESTAMPOID: {
char *timestampStr = DatumGetCString(DirectFunctionCall1(timestamp_out, bound.val));
result = psprintf("%s", timestampStr);
pfree(timestampStr);
//pfree(timestampStr);
break;
}
case TIMESTAMPTZOID: {
char *timestamptzStr = DatumGetCString(DirectFunctionCall1(timestamptz_out, bound.val));
result = psprintf("%s", timestamptzStr);
pfree(timestamptzStr);
//pfree(timestamptzStr);
break;
}
default:
elog(ERROR, "Unsupported element type id: %u", elem_type_id);
elog(ERROR, "Unsupported element type id: %u", elem_oid);
return NULL; // This line will not be reached due to the elog(ERROR) above
}
return result;
Expand Down
2 changes: 2 additions & 0 deletions sql/33_no_gaps_numeric_test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ SELECT sql_saga.no_gaps(numrange(valid_from, valid_to), numrange(1.5, 15.5))
FROM numeric_shifts
WHERE job_id = 1;

SET client_min_messages TO NOTICE;

SELECT sql_saga.drop_unique_key('numeric_shifts', 'numeric_shifts_job_id_worker_id_valid');
SELECT sql_saga.drop_era('numeric_shifts');

Expand Down

0 comments on commit ab536dc

Please sign in to comment.