Skip to content

Commit

Permalink
Merge pull request #47 from robe2/master
Browse files Browse the repository at this point in the history
#10 PostgreSQL 9.5 IMPORT FOREIGN SCHEMA support
  • Loading branch information
pramsey committed Jan 5, 2016
2 parents 9c32a63 + d64e14a commit 9ee6883
Show file tree
Hide file tree
Showing 4 changed files with 395 additions and 2 deletions.
64 changes: 62 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,5 +189,65 @@ Wraparound action! Handy for testing. Connect your database back to your databas
OPTIONS ( layer 'typetest' );

SELECT * FROM typetest_fdw;

Enjoy!

### Using IMPORT FOREIGN SCHEMA (for PostgreSQL 9.5+ only)

## Importing links to all tables
If you want to import all tables use the special schema called *ogr_all*

CREATE SCHEMA fgdball;
IMPORT FOREIGN SCHEMA ogr_all
FROM server fgdbtest INTO fgdball;

## Importing subset of tables using prefixes
Not all ogr data sources have a concept of schema, so we use the remote_schema as a prefix.
Note this is case sensitive, so make sure casing matches your layer names.

For example the following will only import tables that start with *CitiesIn*. As long as you quote, you can handle
true schemaed databases such as SQL server or PostgreSQL by using something like *"dbo."*

CREATE SCHEMA fgdbcityinf;
IMPORT FOREIGN SCHEMA "CitiesIn"
FROM server fgdbtest INTO fgdbcityinf;
## Preserving case and special characters in column names and table names
By default, when IMPORT FOREIGN SCHEMA is run on an ogr foreign data server, the table names and column names are laundered
(meaning all upper case is converted to lowercase and special characters such as spaces are replaced with _).

This is not desirable in all cases. You can override this behavior with 2 IMPORT FOREIGN SCHEMA options specific to ogr fdw servers.

These are `launder_column_names` and `launder_tables_names`.

To preserve casing and other funky characters in both column names and table names you can do the following:

CREATE SCHEMA fgdbcitypreserve;
IMPORT FOREIGN SCHEMA ogr_all
FROM server fgdbtest INTO fgdbpreserve
OPTIONS(launder_table_names 'false', launder_column_names 'false') ;
## Importing subset of layers using LIMIT and EXCEPT
Note: LIMIT TO /EXCEPT should contain resulting table names (NOT the layer names)
In the default case, the table names are laundered should not have mixed case or weird characters.

CREATE SCHEMA fgdbcitysub;
-- import only layer called Cities
IMPORT FOREIGN SCHEMA ogr_all
LIMIT TO(cities)
FROM server fgdbtest INTO fgdbcitysub ;
-- import only layers not called Cities or Countries
IMPORT FOREIGN SCHEMA ogr_all
EXCEPT (cities, countries)
FROM server fgdbtest INTO fgdbcitysub;
-- With table laundering turned off, need to use exact layer names
DROP SCHEMA IF EXISTS fgdbcitysub CASCADE;

IMPORT FOREIGN SCHEMA ogr_all
LIMIT TO("Cities")
FROM server fgdbtest INTO fgdbcitysub OPTIONS(launder_table_names 'false') ;

Enjoy!
303 changes: 303 additions & 0 deletions ogr_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,16 @@ static TupleTableSlot *ogrIterateForeignScan(ForeignScanState *node);
static void ogrReScanForeignScan(ForeignScanState *node);
static void ogrEndForeignScan(ForeignScanState *node);

static void strTableColumnLaunder (char *str);

#if PG_VERSION_NUM >= 90500
/*
* Require PostgreSQL >= 9.5
*/
static List *ogrImportForeignSchema(ImportForeignSchemaStmt *stmt,
Oid serverOid);
#endif

/*
* Helper functions
*/
Expand All @@ -115,6 +125,8 @@ static void ogr_fdw_exit(int code, Datum arg);
/* Global to hold GEOMETRYOID */
Oid GEOMETRYOID = InvalidOid;

#define STR_MAX_LEN 256


void
_PG_init(void)
Expand Down Expand Up @@ -180,6 +192,12 @@ ogr_fdw_handler(PG_FUNCTION_ARGS)
fdwroutine->IterateForeignScan = ogrIterateForeignScan;
fdwroutine->ReScanForeignScan = ogrReScanForeignScan;
fdwroutine->EndForeignScan = ogrEndForeignScan;

#if PG_VERSION_NUM >= 90500
/* PostgreSQL 9.5+
Support functions for IMPORT FOREIGN SCHEMA */
fdwroutine->ImportForeignSchema = ogrImportForeignSchema;
#endif

PG_RETURN_POINTER(fdwroutine);
}
Expand Down Expand Up @@ -1265,3 +1283,288 @@ ogrEndForeignScan(ForeignScanState *node)


#endif /* PostgreSQL 9.3 version check */

#if PG_VERSION_NUM >= 90500
/*
* PostgreSQL 9.5 or above. Import a foreign schema
*/
static List *
ogrImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
{
List *commands = NIL;
ForeignServer *server;

List *options;
ListCell *cell;
char *sGeomType;
bool check_schema = false;
bool launder_column_names = true;
bool launder_table_names = true;
StringInfoData buf;
OgrConnection ogr;
int i;
int j;
int k;
char layer_name[STR_MAX_LEN];
char table_name[STR_MAX_LEN];
ListCell *lc;
bool include_item = false;
OGRDataSourceH ogr_ds = NULL;
OGRSFDriverH ogr_dr = NULL;
OGRFeatureDefnH ogr_fd = NULL;
OGRLayerH ogr_lyr = NULL;

/** check table prefix if remote_schema asked for is not ogr_all **/
check_schema = !( strcmp(stmt->remote_schema, "ogr_all") == 0 );

elog(NOTICE, "Check schema %d %s", check_schema, stmt->remote_schema);
if ( GEOMETRYOID == BYTEAOID){ /* postgis is not in search path */
sGeomType = "bytea";
}
else {
sGeomType = "geometry";
}

#if GDAL_VERSION_MAJOR >= 2 || GDAL_VERSION_MINOR >= 11
int geom_field_count;
#endif
/* Null all values */
memset(&ogr, 0, sizeof(OgrConnection));

/*
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
server = GetForeignServer(serverOid);

/* Read server druver and data source connection string
*/
options = NIL;
options = list_concat(options, server->options);
foreach(cell, options)
{
DefElem *def = (DefElem *) lfirst(cell);
if (strcmp(def->defname, OPT_SOURCE) == 0)
ogr.ds_str = defGetString(def);
if (strcmp(def->defname, OPT_DRIVER) == 0)
ogr.dr_str = defGetString(def);
}

/* Parse statement laundering options */
foreach(lc, stmt->options)
{
DefElem *def = (DefElem *) lfirst(lc);

if (strcmp(def->defname, "launder_column_names") == 0)
launder_column_names = defGetBoolean(def);
else if (strcmp(def->defname, "launder_table_names") == 0)
launder_table_names = defGetBoolean(def);
else
ereport(ERROR,
(errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
errmsg("invalid option \"%s\"", def->defname)));
}

OGRRegisterAll();
ogr_ds = OGROpen(ogr.ds_str, FALSE, &ogr_dr);


/* Create workspace for strings */
initStringInfo(&buf);

for ( i = 0; i < OGR_DS_GetLayerCount(ogr_ds); i++ )
{
include_item = false;
ogr_lyr = OGR_DS_GetLayer(ogr_ds, i);
/* we have a table */
if ( ogr_lyr )
{
/* layer name is never laundered, since it's link back to foreign data */
strncpy(layer_name, OGR_L_GetName(ogr_lyr), STR_MAX_LEN);

/* We need to compare against created table names
* because postgres does an extra check on create foriegn table
* and removes statements not in limit
*/
/* having this as separate variable since we may choose to launder it */
strncpy(table_name, OGR_L_GetName(ogr_lyr), STR_MAX_LEN);
if (launder_table_names){
strTableColumnLaunder(table_name);
}

/* only include if layer prefix starts with remote schema
or remote schema is ogr_all */
include_item = (!check_schema ||
( strncmp(layer_name, stmt->remote_schema, strlen(stmt->remote_schema) ) == 0 ) );
/* Apply restrictions for LIMIT TO and EXCEPT */
if (include_item && ( stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT ) )
{
/* Check if current table is in list of except/include tables */
/* default state is only true if type is EXCEPT */
include_item = ( stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT );
foreach(lc, stmt->table_list)
{
RangeVar *rv = (RangeVar *) lfirst(lc);
if ( strcmp(rv->relname, table_name) == 0 ){
//elog(NOTICE, "MATCH layer %s, table %s", layer_name, rv->relname );
/* bit is true on match only if limit to */
include_item = ( stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO );
break;
}

}
}
}

if (include_item){
resetStringInfo(&buf);

if (launder_table_names){
strTableColumnLaunder(table_name);
}
ogr_fd = OGR_L_GetLayerDefn(ogr_lyr);
if ( !ogr_fd )
{
/** TODO raise error **/
elog(NOTICE, "Error in layer def load %s", layer_name);
}

appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
quote_identifier(table_name));
appendStringInfo(&buf, " fid integer");

#if GDAL_VERSION_MAJOR >= 2 || GDAL_VERSION_MINOR >= 11
geom_field_count = OGR_FD_GetGeomFieldCount(ogr_fd);
if( geom_field_count == 1 )
{
appendStringInfo(&buf, " ,geom %s", sGeomType);
}
else
{
for ( j = 0; j < geom_field_count; j++ )
{
appendStringInfo(&buf, " ,geom%d %s", j + 1, sGeomType);
}
}
#else
if( OGR_L_GetGeomType(ogr_lyr) != wkbNone )
appendStringInfo(&buf, " ,geom %s", sGeomType);
#endif

for ( k = 0; k < OGR_FD_GetFieldCount(ogr_fd); k++ )
{
char field_name[STR_MAX_LEN];
OGRFieldDefnH ogr_fld = OGR_FD_GetFieldDefn(ogr_fd, k);
strncpy(field_name, OGR_Fld_GetNameRef(ogr_fld), STR_MAX_LEN);
if (launder_column_names){
strTableColumnLaunder(field_name);
}
appendStringInfo(&buf, " , %s ", quote_identifier(field_name));
switch( OGR_Fld_GetType(ogr_fld) )
{
case OFTInteger:
#if GDAL_VERSION_MAJOR >= 2
if( OGR_Fld_GetSubType(ogr_fld) == OFSTBoolean )
appendStringInfoString(&buf,"boolean");
else
#endif
appendStringInfoString(&buf,"integer");
break;
case OFTReal:
appendStringInfoString(&buf,"real");
break;
case OFTString:
appendStringInfoString(&buf,"varchar");
break;
case OFTBinary:
appendStringInfoString(&buf,"bytea");
break;
case OFTDate:
appendStringInfoString(&buf,"date");
break;
case OFTTime:
appendStringInfoString(&buf,"time");
break;
case OFTDateTime:
appendStringInfoString(&buf,"timestamp");
break;
case OFTIntegerList:
appendStringInfoString(&buf,"integer[]");
break;
case OFTRealList:
appendStringInfoString(&buf,"real[]");
break;
case OFTStringList:
appendStringInfoString(&buf,"varchar[]");
break;

#if GDAL_VERSION_MAJOR >= 2
case OFTInteger64:
appendStringInfoString(&buf,"bigint");
break;
#endif
default:
elog(NOTICE, "Unsupported GDAL type '%s'", OGR_GetFieldTypeName(OGR_Fld_GetType(ogr_fld)) );
//CPLError(CE_Failure, CPLE_AppDefined, "Unsupported GDAL type '%s'", OGR_GetFieldTypeName(OGR_Fld_GetType(ogr_fld)));
//return OGRERR_FAILURE;
}
}

/*
* Add server name and layer-level options. We specify remote
* layer name as option
*/
appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
quote_identifier(server->servername));

appendStringInfoString(&buf, "layer ");
ogrDeparseStringLiteral(&buf, layer_name);

appendStringInfoString(&buf, ");");

commands = lappend(commands, pstrdup(buf.data));
}
}
OGR_DS_Destroy(ogr_ds);
elog(NOTICE, "Number of tables to be created %d", list_length(commands) );
//elog(NOTICE, "The nth item %s", list_nth(commands,0) );

/* Clean up */
pfree(buf.data);
/** returns list of create foreign table statements to run **/
return commands;
}
#endif /*end import foreign schema **/

static void strTableColumnLaunder (char *str)
{
int i, j = 0;
for(i = 0; str[i]; i++)
{
char c = tolower(str[i]);

/* First character is a numeral, prefix with 'n' */
if ( i == 0 && (c >= 48 && c <= 57) )
{
str[j++] = 'n';
}

/* Replace non-safe characters w/ _ */
if ( (c >= 48 && c <= 57) || /* 0-9 */
(c >= 65 && c <= 90) || /* A-Z */
(c >= 97 && c <= 122 ) /* a-z */ )
{
/* Good character, do nothing */
}
else
{
c = '_';
}
str[j++] = c;

/* Avoid mucking with data beyond the end of our stack-allocated strings */
if ( j >= STR_MAX_LEN )
j = STR_MAX_LEN - 1;
}
}
Loading

0 comments on commit 9ee6883

Please sign in to comment.