Skip to content

Commit

Permalink
Merge pull request #81 from MatteoGioioso/plugins
Browse files Browse the repository at this point in the history
Queries abstraction, plugins
  • Loading branch information
MatteoGioioso authored Jun 21, 2023
2 parents 3bbd4e8 + 249f3ed commit ceea1f0
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 98 deletions.
110 changes: 77 additions & 33 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,6 @@ using trusted backoff algorithms.
Feel free to request additional features and contribute =)

## Changelog

- **Default connections filtering (>= v2)**: this feature leverage postgres `application_name` to differentiate
clients created by this library and others, this will avoid terminating connections belonging to long-running
process, batch jobs, ect...
By default, we set the same `application_name` parameter for all the serverless clients, if you wish you can change it
by just specifying it in the client config:
```javascript
const client = new ServerlessClient({
...
application_name: 'my_client',
});
```

## Install

```bash
Expand Down Expand Up @@ -103,27 +89,85 @@ const handler = async (event, context) => {

```

## Connections filtering (>= v2)

This feature leverage postgres `application_name` to differentiate
clients created by this library and others, this will avoid terminating connections belonging to long-running
processes, batch jobs, ect...
By default, we set the same `application_name` parameter for all the serverless clients, if you wish you can change it
by just specifying it in the client config:

```javascript
const client = new ServerlessClient({
application_name: 'my_client',
});
```

## Plugins (>= v2)

Serverless-postgres is extensible and could be used for any wire compatible postgres engines such as Redshift, Google
Cloud Spanner, CockroachDB, YugabyteDB, etc...
If needed you can write your own plugin implementing the following interface:

```typescript
interface Plugin {
getIdleProcessesListByMinimumTimeout(self: ServerlessClient): Promise<NodePgClientResponse<ProcessList>>;

getIdleProcessesListOrderByDate(self: ServerlessClient): Promise<NodePgClientResponse<ProcessList>>;

processCount(self: ServerlessClient): Promise<NodePgClientResponse<Count>>;

killProcesses(self: ServerlessClient, pids: string[]): Promise<NodePgClientResponse<any>>;

showMaxConnections(self: ServerlessClient): Promise<NodePgClientResponse<MaxConnections>>;
}

```

Every function supply as argument the serverless client itself so you can access any configuration parameter such
as `database`, `user`, `applicationName`, `ect...`;
if your changes are minimal you can inherit the main Postgres plugin class:

```javascript
class MyPlugin extends Postgres {
constructor() {
super();
}
// ...
}
```

You can then use your plugin like this:

```javascript
const client = new ServerlessClient({
plugin: new MyServerlessPGPlugin(someObject)
});
```

## Configuration Options

| Property | Type | Description | Default |
| -------- | ---- | ----------- | ------- |
| config | `Object` | A `node-pg` configuration object as defined [here](https://node-postgres.com/api/client) | `{}` |
| maxConnsFreqMs | `Integer` | The number of milliseconds to cache lookups of max_connections. | `60000` |
| manualMaxConnections | `Boolean` | if this parameters is set to true it will query to get the maxConnections values, to maximize performance you should set the `maxConnections` yourself | `false` |
| maxConnections | `Integer` | Max connections of your PostgreSQL. I highly suggest to set this yourself | `100` |
| strategy | `String` | Name of your chosen strategy for cleaning up "zombie" connections, allowed values `minimum_idle_time` or `ranked` | `minimum_idle_time` |
| minConnectionIdleTimeSec | `Integer` | The minimum number of seconds that a connection must be idle before the module will recycle it. | `0.5` |
| maxIdleConnectionsToKill | `Integer` or `null` | The amount of max connection that will get killed. Default is `ALL` | `null` |
| connUtilization | `Number` | The percentage of total connections to use when connecting to your PostgreSQL server. A value of `0.80` would use 80% of your total available connections. | `0.8` |
| debug | `Boolean` | Enable/disable debugging logs. | `false` |
| capMs | `Integer` | Maximum number of milliseconds between connection retries. | `1000` |
| baseMs | `Integer` | Number of milliseconds added to random backoff values. | `2` |
| delayMs | `Integer` | Additional delay to add to the exponential backoff. | `1000` |
| maxRetries | `Integer` | Maximum number of times to retry a connection before throwing an error. | `3` |
| processCountCacheEnabled | `Boolean` | Enable caching for get process count. | `False` |
| processCountFreqMs | `Integer` | The number of milliseconds to cache lookups of process count. | `6000` |
| allowCredentialsDiffing | `Boolean` | If you are using dynamic credentials, such as IAM, you can set this parameter to `true` and the client will be refreshed | `false` |
| library | `Function` | Custom postgres library | `require('pg')` |
| Property | Type | Description | Default | Version |
|--------------------------|---------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------|---------|
| config | `Object` | A `node-pg` configuration object as defined [here](https://node-postgres.com/api/client) | `{}` | |
| maxConnsFreqMs | `Integer` | The number of milliseconds to cache lookups of max_connections. | `60000` | |
| manualMaxConnections | `Boolean` | if this parameters is set to true it will query to get the maxConnections values, to maximize performance you should set the `maxConnections` yourself | `false` | |
| maxConnections | `Integer` | Max connections of your PostgreSQL, it should be set equal to `max_connections` in your cluster. I highly suggest to set this yourself | `100` | |
| strategy | `String` | Name of your chosen strategy for cleaning up "zombie" connections, allowed values `minimum_idle_time` or `ranked` | `minimum_idle_time` | |
| minConnectionIdleTimeSec | `Integer` | The minimum number of seconds that a connection must be idle before the module will recycle it. | `0.5` | |
| maxIdleConnectionsToKill | `Integer` or `null` | The amount of max connection that will get killed. Default is `ALL` | `null` | |
| connUtilization | `Number` | The percentage of total connections to use when connecting to your PostgreSQL server. A value of `0.80` would use 80% of your total available connections. | `0.8` | |
| debug | `Boolean` | Enable/disable debugging logs. | `false` | |
| capMs | `Integer` | Maximum number of milliseconds between connection retries. | `1000` | |
| baseMs | `Integer` | Number of milliseconds added to random backoff values. | `2` | |
| delayMs | `Integer` | Additional delay to add to the exponential backoff. | `1000` | |
| maxRetries | `Integer` | Maximum number of times to retry a connection before throwing an error. | `3` | |
| processCountCacheEnabled | `Boolean` | Enable caching for get process count. | `False` | |
| processCountFreqMs | `Integer` | The number of milliseconds to cache lookups of process count. | `6000` | |
| allowCredentialsDiffing | `Boolean` | If you are using dynamic credentials, such as IAM, you can set this parameter to `true` and the client will be refreshed | `false` | |
| library | `Function` | Custom postgres library | `require('pg')` | |
| application_name | `String` | This is postgres specific configuration; serverless-pg uses it to avoid closing other applications connections. | `serverless_client` | `>= v2` |
| plugin | `Object` | This is where you need to initialize your plugin class | `Postgres` | `>= v2` |

## Note

Expand Down
39 changes: 37 additions & 2 deletions index.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import stream = require('stream');
import stream = require("stream");

declare interface TlsOptions {
rejectUnauthorized?: boolean;
Expand Down Expand Up @@ -40,19 +40,54 @@ declare interface Config {
delayMs?: number;
maxRetries?: number;
library?: typeof import("pg");
plugin?: Plugin;
}

declare interface Plugin {
getIdleProcessesListByMinimumTimeout(self: ServerlessClient): Promise<NodePgClientResponse<ProcessList>>;

getIdleProcessesListOrderByDate(self: ServerlessClient): Promise<NodePgClientResponse<ProcessList>>;

processCount(self: ServerlessClient): Promise<NodePgClientResponse<Count>>;

killProcesses(self: ServerlessClient, pids: string[]): Promise<NodePgClientResponse<any>>;

showMaxConnections(self: ServerlessClient): Promise<NodePgClientResponse<MaxConnections>>;
}

declare interface ProcessList {
pid: string;
}

declare interface Count {
count: number;
}

declare interface MaxConnections {
max_connections: number;
}

declare interface NodePgClientResponse<T> {
rows: T[];
}

declare namespace ServerlessClient {
export { TlsOptions, Config }
export { TlsOptions, Config };
}

declare class ServerlessClient {
constructor(config: Config)

clean(): Promise<number | undefined>

setConfig(config: Config): void

connect(): Promise<void>

query(...args): Promise<any>

end(): Promise<any>

on(...args): void
}

Expand Down
79 changes: 16 additions & 63 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,16 @@
*/

const {isValidStrategy, type, validateNum, isWithinRange} = require("./utils");
const Postgres = require("./postgres");

function ServerlessClient(config) {
this._client = null;
if (config.plugin) {
this._plugin = config.plugin
} else {
this._plugin = new Postgres()
}

this.setConfig(config)
}

Expand All @@ -24,7 +31,7 @@ ServerlessClient.prototype._sleep = delay =>
ServerlessClient.prototype._setMaxConnections = async (__self) => {
// If cache is expired
if (Date.now() - __self._maxConns.cache.updated > __self._maxConns.freqMs) {
const results = await __self._client.query(`SHOW max_connections`)
const results = await __self._plugin.showMaxConnections(__self)
const maxConnections = results.rows[0].max_connections

__self._logger("Getting max connections from database...", maxConnections)
Expand All @@ -39,25 +46,8 @@ ServerlessClient.prototype._setMaxConnections = async (__self) => {
// This strategy arbitrarily (maxIdleConnections) terminates connections starting from the oldest one in idle.
// It is very aggressive and it can cause disruption if a connection was in idle for a short period of time
ServerlessClient.prototype._getIdleProcessesListOrderByDate = async function () {
const query = `
SELECT pid, backend_start, state
FROM pg_stat_activity
WHERE datname = $1
AND state = 'idle'
AND usename = $2
AND application_name = $4
ORDER BY state_change
LIMIT $3;`

const values = [
this._client.database,
this._client.user,
this._strategy.maxIdleConnectionsToKill,
this._application_name
]

try {
const result = await this._client.query(query, values);
const result = await this._plugin.getIdleProcessesListOrderByDate(this);

return result.rows
} catch (e) {
Expand All @@ -71,31 +61,8 @@ ServerlessClient.prototype._getIdleProcessesListOrderByDate = async function ()
// than a minimum amount of seconds, it is very accurate as it only takes the process that have been in idle
// for more than a threshold time (minConnectionTimeoutSec)
ServerlessClient.prototype._getIdleProcessesListByMinimumTimeout = async function () {
const query = `
WITH processes AS (
SELECT EXTRACT(EPOCH FROM (Now() - state_change)) AS idle_time,
pid
FROM pg_stat_activity
WHERE usename = $1
AND datname = $2
AND state = 'idle'
AND application_name = $5
)
SELECT pid
FROM processes
WHERE idle_time > $3
LIMIT $4;`

const values = [
this._client.user,
this._client.database,
this._strategy.minConnIdleTimeSec,
this._strategy.maxIdleConnectionsToKill,
this._application_name
]

try {
const result = await this._client.query(query, values)
const result = await this._plugin.getIdleProcessesListByMinimumTimeout(this)

return result.rows
} catch (e) {
Expand All @@ -116,17 +83,9 @@ ServerlessClient.prototype._getProcessesCount = async function () {
}

if (isCacheExpiredOrDisabled(this)) {
const query = `
SELECT COUNT(pid)
FROM pg_stat_activity
WHERE datname = $1
AND usename = $2
AND application_name = $3;`

const values = [this._client.database, this._client.user, this._application_name]

try {
const result = await this._client.query(query, values);
const result = await this._plugin.processCount(this);

this._processCount.cache = {
count: result.rows[0].count || 0,
updated: Date.now()
Expand All @@ -136,6 +95,7 @@ ServerlessClient.prototype._getProcessesCount = async function () {
} catch (e) {
this._logger("Swallowed internal error", e.message)
// Swallow the error, if this produce an error there is no need to error the function
// TODO: maybe return the cached process count would be better
return 0
}
}
Expand All @@ -147,17 +107,8 @@ ServerlessClient.prototype._getProcessesCount = async function () {
ServerlessClient.prototype._killProcesses = async function (processesList) {
const pids = processesList.map(proc => proc.pid);

const query = `
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE pid = ANY ($1)
AND state = 'idle'
AND application_name = $2;`

const values = [pids, this._application_name]

try {
return await this._client.query(query, values)
return await this._plugin.killProcesses(this, pids)
} catch (e) {
this._logger("Swallowed internal error: ", e.message)
// Swallow the error, if this produce an error there is no need to error the function
Expand Down Expand Up @@ -195,6 +146,8 @@ ServerlessClient.prototype.clean = async function () {
const processesList = await strategy();
if (processesList.length) {
const killedProcesses = await this._killProcesses(processesList);
// This to minimize the chances of re-triggering the killProcesses if the lambda is called after few seconds
this._processCount.cache.count = this._processCount.cache.count - killedProcesses.rows.length
this._logger("+++++ Killed processes: ", killedProcesses.rows.length, " +++++")
return killedProcesses.rows
}
Expand Down
Loading

0 comments on commit ceea1f0

Please sign in to comment.