The demo follows closely the one defined for Materialize :
The docker compose file in this directory starts a compose with images for mysql, Redpanda, Debezium and Deephaven, plus an additional image to generate an initial mysql schema and then generate updates to the tables over time for a simple e-commerce demo.
docker-compose.yml
- The Docker Compose file for the application. This is mostly the same as the Deephaven docker-compose file with modifications to run Redpanda, mysql, debezium and the scripts to generate the simulated website..env
- The environmental variables used in this demo.scripts/demo.py
- The Deephaven commands used in this demo.scripts/demo.sql
- The Materialize demo script.loadgen/*
- The load generation scripts.
Configure the update rate for both purchase (mysql updates) and pageviews (kafka pageview events) via ENVIRONMENT arguments set for the loadgen image in the docker-compose.yml file.
First, to run this demo you will need to clone our github examples repo
gh repo clone deephaven-examples/deephaven-debezium-demo
To build you need have the these dependances for any Deephaven dockerized initialization such as docker and docker-compose
For more detailed instructions see our documentation.
Launch via Docker
cd deephaven-debezium-demo
docker-compose -f docker-compose.yml up -d
Then start a Deephaven web console (will be in python mode by default per the command above) by navigating to
http://localhost:10000/ide
Cut and paste to it from /scripts/demo.py
.
As you cut & paste the script, you can see tables as they are created and populated and watch them update before you execute the next command. Details below.
The Bullet numbers are from the demo defined for the Redpanda + Materialize Demo.
- First, to run this demo, you will need to clone our Github examples repo.
gh repo clone deephaven-examples/deephaven-debezium-demo
- Bring up the Docker Compose containers in the background:
:::note For more detailed instructions, see our Quickstart guide. :::
cd deephaven-debezium-demo
docker-compose up -d
- Optional Confirm that everything is running as expected:
docker stats
- Optional Log in to MySQL to confirm that tables are created and seeded:
docker-compose -f docker-compose.yml run mysql mysql -uroot -pdebezium -h mysql shop
SHOW TABLES;
SELECT * FROM purchases LIMIT 1;
- Optional
exec
in to theredpanda
container to look around using Redpanda's amazingrpk
CLI:
docker-compose -f docker-compose.yml exec redpanda /bin/bash
rpk debug info
rpk topic list
rpk topic create dd_flagged_profiles
rpk topic consume pageviews
You should see a live feed of JSON formatted pageview Kafka messages:
{
"key": "3290",
"message": "{\"user_id\": 3290, \"url\": \"/products/257\", \"channel\": \"social\", \"received_at\": 1634651213}",
"partition": 0,
"offset": 21529,
"size": 89,
"timestamp": "2021-10-19T13:46:53.15Z"
}
- Launch the Deephaven web console (IDE) by navigating to:
http://localhost:10000/ide
You have now launched a docker-compose file that starts images for MySQL, Debezium, Redpanda (Kafka implementation), and Deephaven, plus an additional image to generate an initial MySQL schema and then generate updates to the tables over time for a simple e-commerce demo.
Here, we will show you the analogous Deephaven commands and continue the bullet numbers from the Materialize Demo. You can follow along in the Deephaven IDE by entering the commands into theconsole.
- Now that you're in the Deephaven IDE, define all of the tables in
mysql.shop
as Kafka sources:
import deephaven.ConsumeCdc as cc
import deephaven.ConsumeKafka as ck
import deephaven.ProduceKafka as pk
import deephaven.Types as dh
from deephaven import Aggregation as agg, as_list
import deephaven.TableManipulation.WindowCheck as wck
server_name = 'mysql'
db_name='shop'
kafka_base_properties = {
'group.id' : 'dh-server',
'bootstrap.servers' : 'redpanda:9092',
'schema.registry.url' : 'http://redpanda:8081',
}
def make_cdc_table(table_name:str):
return cc.consumeToTable(
kafka_base_properties,
cc.cdc_short_spec(server_name,
db_name,
table_name)
)
users = make_cdc_table('users')
items = make_cdc_table('items')
purchases = make_cdc_table('purchases')
consume_properties = {
**kafka_base_properties,
**{
'deephaven.partition.column.name' : '',
'deephaven.timestamp.column.name' : '',
'deephaven.offset.column.name' : ''
}
}
pageviews = ck.consumeToTable(
consume_properties,
topic = 'pageviews',
offsets = ck.ALL_PARTITIONS_SEEK_TO_BEGINNING,
key = ck.IGNORE,
value = ck.json([ ('user_id', dh.long_),
('url', dh.string),
('channel', dh.string),
('received_at', dh.datetime) ]),
table_type = 'append'
)
Because the first three sources are pulling message schema data from the registry, Deephaven knows the column types to use for each attribute. The last source is a JSON-formatted source for the page views.
Now you should automatically see the four sources we created in the IDE. These are fully interactable. In the UI, you can sort, filter or scroll through all the data without any other commands.
- Next, we'll create a table for staging the page views. We can use this to aggregate information later.
pageviews_stg = pageviews \
.updateView(
'url_path = url.split(`/`)',
'pageview_type = url_path[1]',
'target_id = Long.parseLong(url_path[2])'
).dropColumns('url_path')
- Let's create a couple analytical views to get a feel for how it works.
Start simple with a table that aggregates purchase stats by item:
purchases_by_item = purchases.aggBy(
as_list([
agg.AggSum('revenue = purchase_price'),
agg.AggCount('orders'),
agg.AggSum('items_sold = quantity')
]),
'item_id'
)
The next query creates something similar that uses our pageview_stg
static view to quickly aggregate page views by item:
pageviews_by_item = pageviews_stg \
.where('pageview_type = `products`') \
.countBy('pageviews', 'item_id = target_id')
Now let's show how you can combine and stack views by creating a single table that brings everything together:
item_summary = items \
.view('item_id = id', 'name', 'category') \
.naturalJoin(purchases_by_item, 'item_id') \
.naturalJoin(pageviews_by_item, 'item_id') \
.dropColumns('item_id') \
.moveColumnsDown('revenue', 'pageviews') \
.updateView('conversion_rate = orders / (double) pageviews')
We can automatically see that it's working by watching these tables update in the IDE. To see just the top elements, we can filter the data with two new tables:
top_viewed_items = item_summary \
.sortDescending('pageviews') \
.head(20)
top_converting_items = item_summary \
.sortDescending('conversion_rate') \
.head(20)
Another useful table is pageviews_summary
that counts the total number of pages seen:
pageviews_summary = pageviews_stg \
.aggBy(
as_list([
agg.AggCount('total'),
agg.AggMax('max_received_at = received_at')])) \
.updateView('dt_ms = (DateTime.now() - max_received_at)/1_000_000.0')
- Redpanda is often used in building rich data-intensive applications. Let's try creating a view meant to power something like the "Who has viewed your profile" feature on Linkedin:
User views of other user profiles:
minute_in_nanos = 60 * 1000 * 1000 * 1000
profile_views_per_minute_last_10 = \
wck.addTimeWindow(
pageviews_stg.where('pageview_type = `profiles`'),
'received_at',
10*minute_in_nanos,
'in_last_10min'
).where(
'in_last_10min = true'
).updateView(
'received_at_minute = lowerBin(received_at, minute_in_nanos)'
).view(
'user_id = target_id',
'received_at_minute'
).countBy(
'pageviews',
'user_id',
'received_at_minute'
).sort(
'user_id',
'received_at_minute'
)
Confirm that this is the data we could use to populate a "profile views" graph for user 10
:
profile_views = pageviews_stg \
.view(
'owner_id = target_id',
'viewer_id = user_id',
'received_at'
).sort(
'received_at'
).tailBy(10, 'owner_id')
Next, let's use a naturalJoin
to get the last five users who have viewed each profile:
profile_views_enriched = profile_views \
.naturalJoin(users, 'owner_id = id', 'owner_email = email') \
.naturalJoin(users, 'viewer_id = id', 'viewer_email = email') \
.moveColumnsDown('received_at')
- Since Redpanda has such a nice HTTP interface, it makes it easier to extend without writing lots of glue code and services. Here's an example where we use pandaproxy to do a "demand-driven query".
Add a message to the dd_flagged_profiles
topic:
dd_flagged_profiles = ck.consumeToTable(
consume_properties,
topic = 'dd_flagged_profiles',
offsets = ck.ALL_PARTITIONS_SEEK_TO_BEGINNING,
key = ck.IGNORE,
value = ck.simple('user_id_str', dh.string),
table_type = 'append'
).view('user_id = Long.parseLong(user_id_str.substring(1, user_id_str.length() - 1))') # strip quotes
Now let's join the flagged_profile
id to a much larger dataset:
dd_flagged_profile_view = dd_flagged_profiles \
.join(pageviews_stg, 'user_id')
- Sink data back out to Redpanda.
Let's create a view that flags "high-value" users that have spent $10k or more total:
high_value_users = purchases \
.updateView(
'purchase_total = purchase_price * quantity'
).aggBy(
as_list([
agg.AggSum('lifetime_value = purchase_total'),
agg.AggCount('purchases'),
]),
'user_id'
) \
.where('lifetime_value > 10000') \
.naturalJoin(users, 'user_id = id', 'email') \
.view('id = user_id', 'email', 'lifetime_value', 'purchases') # column rename and reorder
Then, a sink to stream updates to this view back out to Redpanda:
schema_namespace = 'io.deephaven.examples'
cancel_callback = pk.produceFromTable(
high_value_users,
kafka_base_properties,
topic = 'high_value_users_sink',
key = pk.avro(
'high_value_users_sink_key',
publish_schema = True,
schema_namespace = schema_namespace,
include_only_columns = [ 'user_id' ]
),
value = pk.avro(
'high_value_users_sink_value',
publish_schema = True,
schema_namespace = schema_namespace,
column_properties = {
"lifetime_value.precision" : "12",
"lifetime_value.scale" : "4"
}
),
last_by_key_columns = True
)
This is a bit more complex because it is an exactly-once
sink. This means that across Deephaven restarts, it will never output the same update more than once.
We won't be able to preview the results with rpk
because it's AVRO formatted. But we can actually stream it BACK into Deephaven to confirm the format!
hvu_test = ck.consumeToTable(
consume_properties,
topic = 'high_value_users_sink',
offsets = ck.ALL_PARTITIONS_SEEK_TO_BEGINNING,
key = ck.IGNORE,
value = ck.avro('high_value_users_sink_value'),
table_type = 'append'
)
You now have Deephaven doing real-time views on a changefeed from a database and page view events from Redpanda. You have complex multi-layer views doing joins and aggregations in order to distill the raw data into a form that's useful for downstream applications.
You have a lot of infrastructure running in Docker containers - don't forget to run docker-compose down
to shut everything down!
Next time, if you want to load everything in one command, you can load that in its entirety on the DH console with exec(open('/scripts/demo.py').read())
You've how fun and easy it is to use Deephaven. Within the IDE, you can see your query results and interact with all the data. You can take this even further with Deephaven's plotting capabailities, such as by visualizing the aggregrate page views in real time.
Files in this directory are based on demo code by Debezium, Redpanda, and Materialize