Skip to content
Daniyaal Khan edited this page Oct 10, 2024 · 13 revisions

Welcome to the Degressly wiki!

Contents

Advanced Configurations Supported in Degressly

Intercepting SSL Communication in degressly-downstream

Since degressly works as an HTTP Proxy, intercepting SSL requests takes some good ol' man in the middle interception.

We recommend using mitmproxy as the HTTPS_PROXY to intercept the https requests and forward it to degressly-downstream as an HTTP request. A sample config for mitmproxy follows:

from mitmproxy import http

def request(flow: http.HTTPFlow) -> None:
    original_host = flow.request.host_header
    original_scheme = flow.request.scheme

    flow.request.host = 'degressly-downstream'
    flow.request.port = 8080
    flow.request.scheme = "http"

    # Change the Host header to the original destination
    if original_host:
        flow.request.host_header = original_host

    # Optionally, log or debug the redirected flow
    print(f"Redirecting HTTPS request to HTTP: {flow.request.url}")

    flow.request.headers["X-Forwarded-Proto"] = original_scheme

You will have to provide mitmproxy with a Root CA certificate or let mitmproxy generate one of its own, and that certificate then must be added to the trust store of your docker containers.

Refer to the mitmproxy certificate authority documentation for more details regarding certificate setup.

For example, the following directives can be added in Ubuntu/Debian based images for adding trusted certificates:

...
COPY ./certs/mitmproxy-ca-cert.crt /usr/local/share/ca-certificates/mitmproxy-ca-cert.crt
RUN update-ca-certificates
...

As an addendum, some runtimes like python and JVM do not use the OS trusted root CAs for their trust store. In such cases, you may have to manually configure the certificates. For example, if you are using the python requests library you can add the following directive to your Dockerfile:

...
ENV REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt
...

A working example for easy reference is present in degressly/degressly-demo with the MITM interceptor.


Caller inference via Proxy in degressly-downstream

The degressly-downstream service requires the presence of a x-degressly-caller header in each HTTP request to determine which replica is calling the given endpoint so that it can handle non-idempotent requests and also to determine the calling replica for making observations. If you do not wish to modify your code to inject the header into all our downstream requests, you may setup 3 mitmproxy instances with code to inject this header for each replica, and point each of your replicas to the respective mitmproxy instance.

To begin with, create an mimtproxy script that populates this header based on an environment variable:

from mitmproxy import http
import os

def request(flow: http.HTTPFlow):
    original_host = flow.request.host_header
    original_scheme = flow.request.scheme

    flow.request.host = 'degressly-downstream'
    flow.request.port = 8080
    flow.request.scheme = "http"

    # Change the Host header to the original destination
    if original_host:
        flow.request.host_header = original_host

    # Optionally, log or debug the redirected flow
    print(f"Redirecting HTTPS request to HTTP as: {flow.request.url}")

    flow.request.headers["X-Forwarded-Proto"] = original_scheme
    flow.request.headers["x-degressly-caller"] = os.getenv("PROXY_HEADER")

Now, to create three separate instances of the mitm proxy as follows:

degressly-mitm-primary:
    image: mitmproxy/mitmproxy
    container_name: "degressly-mitm-primary"
    environment:
     PROXY_HEADER: PRIMARY
    volumes:
      - ./certs:/home/mitmproxy/.mitmproxy
      - ./mitm.py:/mitm.py:ro
    command: ["mitmdump", "-s", "mitm.py", "--no-http2"]
    networks:
      - degressly_demo_network
    depends_on:
      - degressly-downstream

degressly-mitm-secondary:
    ..
    container_name: "degressly-mitm-secondary"
    environment:
        PROXY_HEADER: SECONDARY
    ..

degressly-mitm-candidate:
    ..
    container_name: "degressly-mitm-candidate"
    environment:
        PROXY_HEADER: CANDIDATE
    ..

Point each of your replicas to the appropriate proxy server

degressly-demo-primary:
    ..
    environment:
        HTTP_PROXY: https://degressly-mitm-primary:8080
        HTTPS_PROXY: https://degressly-mitm-primary:8080
    ..
  
degressly-demo-secondary:
    ..
        HTTP_PROXY: https://degressly-mitm-secondary:8080
        HTTPS_PROXY: https://degressly-mitm-secondary:8080
    ..

degressly-demo-candidate:
    ..
        HTTP_PROXY: https://degressly-mitm-candidate:8080
        HTTPS_PROXY: https://degressly-mitm-candidate:8080
    ..

Now, each instance will perform calls to upstream services via it's own proxy, which in turn will inject the degressly caller header for the respective replica of code.

A working example for easy reference is present in degressly/degressly-demo with the MITM interceptor.


Custom Config for populating Trace ID / Idempotency Handling in degressly-downstream

degressly-downstream supports Groovy based configuration files for use cases such as:

  • When your application calls a wide array of upstream services, some idempotent and some not.
  • When changes cannot be made to the code for obtaining the idempotency keys or trace IDs of requests.

A groovy script that configuration has to implement the methods of com.degressly.proxy.downstream.handler.DownstreamHandler.

A sample Groovy Config is as follows:

package config

import com.degressly.proxy.downstream.dto.RequestContext
import groovy.json.JsonSlurper
import org.springframework.util.MultiValueMap

class DownstreamHandler implements com.degressly.proxy.downstream.handler.DownstreamHandler {

    Set<String> idempotentURIs = Set.of("/sample-idempotent", );

    @Override
    Optional<Boolean> isIdempotent(RequestContext requestContext) {
        if (idempotentURIs.contains(requestContext.getRequest().getRequestURI())) {
            return Optional.of(Boolean.TRUE)
        } else {
            return Optional.of(Boolean.FALSE)
        }

    }

    @Override
    Optional<String> getTraceId(RequestContext requestContext) {

        JsonSlurper jsonSlurper = new JsonSlurper()
        def bodyJson

        try {
            bodyJson = jsonSlurper.parseText(requestContext.getBody())
        } catch(Exception ignored) {
            // Do nothing
            bodyJson = null
        }

        def optional

        optional = getField(requestContext.getHeaders(), requestContext.getParams(), bodyJson,"trace-id")
        if (optional.isPresent())
            return Optional.of(optional.get())

        optional = getField(requestContext.getHeaders(), requestContext.getParams(), bodyJson,"seqNo")
        if (optional.isPresent())
            return Optional.of(optional.get())

        optional = getField(requestContext.getHeaders(), requestContext.getParams(), bodyJson,"seq-no")
        if (optional.isPresent())
            return Optional.of((optional.get()))

        optional = getField(requestContext.getHeaders(), requestContext.getParams(), bodyJson,"txn-id")
        if (optional.isPresent())
            return Optional.of((optional.get()))

        optional = getField(requestContext.getHeaders(), requestContext.getParams(), bodyJson,"txnId")
        if (optional.isPresent())
            return Optional.of(optional.get())

        return Optional.empty()
    }

    @Override
    Optional<String> getIdempotencyKey(RequestContext requestContext) {
        if (requestContext.getTraceId()) {
            return Optional.of(requestContext.getRequest().getRequestURL().append("_").append(requestContext.getTraceId()).toString());
        }

        return Optional.empty();
    }

    private static Optional<String> getField(MultiValueMap<String, String> headers, MultiValueMap<String, String> params, Object bodyJson, String field) {
        if (headers.containsKey(field))
            return Optional.of(headers.getFirst(field))
        if (params.containsKey(field))
            return Optional.of(params.getFirst(field))
        if (bodyJson!=null && bodyJson[field] != null && bodyJson[field] instanceof String)
            return Optional.of((String) bodyJson[field])

        return Optional.empty()
    }
}

This config does the following:

  • declares the /sample-idempotent API as idempotent. i.e., each replica's calls will be forwarded to the upstream system. Whereas for all other APIs will be treated as non-idempotent.
  • extracts the traceId based on whichever field is found first in the request as per the order of precedence in the getTraceId method.
  • returns _ as the idempotency key.

Database seggregation without maintaining multiple copies of unchanged data

One common issue that may come up is maintaining different databases for each replica of your code. Although small scale databases don't incur much cost for maintianing multiple copies, it becomes prohibitively expensive with large datasets.

The idea is to perform cherrypicks of data during the execution of the application, whenever a particular row is needed.

The following data access pattern can be implmented for achieving this:

  • Primary replica uses the main, full dataset.
  • Secondary and candidate replicas both use datasets that are empty on startup.
  • If a select query occurs on the secondary/candidate dataset and the particular row does not exist in its DB, the data is imported from the Primary dataset using a read-only user and brought into the current database and the results are returned to the application.

Requirements for implementing this approach are as follows:

  1. Initialize primary and secondary databases with the federated flag in my.cnf
  2. Initialize the same schema(say: my_db) across all three datastores.
  3. Create a federated schema on secondary and candidate datastores which is a federation of the primary schema
  4. Create a stored procedure to lookup a datasource's own db first, then to lookup the federated db.
  5. Use proxysql to rewrite all select queries into calls of he above stored procedure.

The following process needs to be followed on secondary and candidate datastores:

For the purpose of this demonstration, we will assume the name of the database is my_db. We will assume this schema is already setup on all three datastores.

Add support for federated engine to mysql. (If running in docker, use volume mounts)

federated

Create the federated database:

CREATE DATABASE switchrouter_federated;

Use this bash script to import the schema from the primary database into the federated database:

#!/bin/bash

# Connection details for server0 and server1
REMOTE_HOST="mysql-primary"
REMOTE_USER="read_only_user"
REMOTE_PASSWORD="read_only_user_password"
REMOTE_DB="my_db"
LOCAL_USER="write_user"
LOCAL_PASSWORD="write_user_password"
LOCAL_DB="my_db_federated"


# Fetch the list of tables from server0
TABLES=$(mysql -h $REMOTE_HOST -u $REMOTE_USER -p$REMOTE_PASSWORD -D $REMOTE_DB -Bse 'SHOW TABLES')

for TABLE in $TABLES; do
    set -e
    # Fetch the CREATE TABLE statement for the table from server0
    CREATE_STMT=$(mysql -h $REMOTE_HOST -u $REMOTE_USER -p$REMOTE_PASSWORD -D $REMOTE_DB -Bse "SHOW CREATE TABLE $TABLE" | cut -c $((${#TABLE}+2))-)

    # Modify the CREATE statement to use FEDERATED engine and connection
    echo "===========================" 1>&2
    echo "Create stmt" 1>&2
    echo $CREATE_STMT 1>&2
    FEDERATED_STMT=$(echo "$CREATE_STMT" | sed 's/ENGINE=.*/ENGINE=FEDERATED CONNECTION="mysql:\/\/'$REMOTE_USER':'$REMOTE_PASSWORD'@'$REMOTE_HOST':3306\/'$REMOTE_DB'\/'$TABLE'"/')

    # Create the federated table on server1
    echo "===========================" 1>&2
    echo "Federated stmt" 1>&2
    echo $FEDERATED_STMT 1>&2
    set +e
    mysql -h localhost -u $LOCAL_USER -p$LOCAL_PASSWORD -D $LOCAL_DB -Bse "$FEDERATED_STMT"
done

Create a stored procedure that performs that runs the query in my_db, and imports rows from my_db_federated if not found in my_db

USE my_db;

DELIMITER $$

CREATE PROCEDURE execute_query(IN query TEXT)
BEGIN

    -- Prepare the dynamic SQL statement for counting rows
    SET @row_count_query = CONCAT('SELECT COUNT(*) FROM (', query, ') AS temp into @row_count');
    SET @original_query = query;

    PREPARE stmt1 FROM @row_count_query;
    EXECUTE stmt1;
    DEALLOCATE PREPARE stmt1;


    -- If rows are not found in current DB, fetch rows from federated DB
    if @row_count = 0 THEN
        SET @adapted_query = REPLACE(@original_query, 'FROM ', 'FROM my_db_federated.');
        SET @adapted_query = REPLACE(@adapted_query, 'my_db.', 'my_db_federated.');

        SET @from_end = LOCATE('FROM', UPPER(query))+5;
        SET @table_name_end = LOCATE(' ', UPPER(query), @from_end);
        SET @table_name = SUBSTRING(query, @from_end, @table_name_end-@from_end);


        -- If database name is present in @table_name variable, 
        if LOCATE('.', @table_name) != 0 THEN
            SET @table_name = RIGHT(@table_name, LOCATE('.', @table_name));
        END IF;

        SET @federated_row_count_query = CONCAT('SELECT COUNT(*) FROM (', @adapted_query, ') AS temp INTO @federated_row_count');
        PREPARE stmt5 FROM @federated_row_count_query;
        EXECUTE stmt5;
        DEALLOCATE PREPARE stmt5;


        -- If rows are returned from federated DB, add them into current DB
        if @federated_row_count != 0 THEN

            SET @insert_statement = CONCAT('REPLACE INTO ', @table_name, ' ', @adapted_query);

            PREPARE stmt2 FROM @insert_statement;
            EXECUTE stmt2;
            DEALLOCATE PREPARE stmt2;
        END IF;

    END IF;

    -- Return rows from current DB
    PREPARE stmt3 FROM @original_query;
    EXECUTE stmt3;
    DEALLOCATE PREPARE stmt3;

END $$

DELIMITER ;

Finally, we need to configure proxysql to rewrite SELECT queries into CALL execute_query() calls:

/var/lib/proxysql/proxysql.cnf:

datadir="/var/lib/proxysql"


# 6034: mysql-secondary
# 6035: mysql-candidate

admin_variables=
{
	admin_credentials="admin:admin;radmin:radmin"
	mysql_ifaces="0.0.0.0:6032"
}

mysql_variables=
{
	threads=4
	max_connections=2048
	default_query_delay=0
	default_query_timeout=36000000
	have_compress=true
	poll_timeout=2000
	interfaces="0.0.0.0:6034;0.0.0.0:6035"
	default_schema="information_schema"
	stacksize=1048576
	server_version="8.0.32"
	connect_timeout_server=3000
	ping_interval_server_msec=120000
	ping_timeout_server=500
	commands_stats=true
	sessions_sort=true
	connect_retries_on_failure=10
	eventslog_filename="queries.log"
	eventslog_format=2
    set_query_lock_on_hostgroup=0
    eventslog_default_log=1
}


mysql_servers=
(
    {
        address = "mysql-secondary"
        port = "3306"
        hostgroup = 1
    },
    {
        address = "mysql-candidate"
        port = "3306"
        hostgroup = 2
    }
)

mysql_users= (
    {
        username = "root"
        password = "asd"
        default_hostgroup = 2
        active=1
    },
    {
        username = "sample_user"
        password = "sample_password"
        default_hostgroup = 2
        active = 1
    }
)



mysql_query_rules = (
    {
        rule_id = 1
        rule_active = 1
        proxy_port = 6034
        destination_hostgroup = 1
        log = 1
    },
    {
        rule_id = 2
        rule_active = 1
        proxy_port = 6035
        destination_hostgroup = 2
        log = 1
    },
    {
        rule_id = 3
        rule_active = 1
        match_pattern = "^SELECT .*FROM(.|\\n)*$"
        replace_pattern = "CALL execute_query(\"\"\0\"\")"
        log = 1
        schemaname = "my_db"
    },
    {
        rule_id = 4
        rule_active = 1
        match_digest = "set session transaction read only"
        OK_msg = 1
        log = 1
    }

)

Explanation:

  • Rule 1: Routes all connections to port 6034 to secondary datastore.
  • Rule 2: Routes all connections to port 6034 to candidate datastore.
  • Rule 3: Rewrite all SELECT queries on my_db to CALL_EXECUTE().
  • Rule 4: Ignore set session transaction read only queries because writes will have to occur for updating my_db from data that is brought in from my_db_federated.

Caveats:

  • Aggregation queries will not import data from federated tables. (Running aggregated queries on federated tables also has massive performance implications on the secondary and candidate databases).
  • There may be some data inconsistencies in secondary and candidate datastores if primary key conflicts occur since the prepared statement uses REPLACE INTO.
  • The secondary and candidate databases will not be able to operate in read only mode because they have to import data on a fetch-as-you-go basis.
  • Each business use case is different, you may need to modify the stored procedure to fit your particular needs.