-
Using the setup here, a parent DB that has a lot of data would have that data duplicated in the child DB, taking up a lot of unnecessary storage, right? Is there any way to avoid this? Or do I have the wrong idea of how that data would be stored? |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
@p-zach yes, each pipe has an underlying SQL table, so in your example the child would duplicate the parent because it only adds rows to the parent table. One way to save space is to create a view that selects from the parent to act as the child. A view is basically an alias for a CREATE VIEW AS plugin_test_derivative_a_deriv_1 AS
SELECT
timestamp, random1, random2,
(random1 * 2) AS deriv_random1,
(random2 + 0.5) AS deriv_random2
FROM plugin_test_derivative_a You could even drop the table for the child plugin and remove its In fact, if you prefer to write your transformations in SQL, you can register a pipe with Another space-saving pattern I have seen used in production with Meerschaum is to create a temporary from meerschaum.utils.typing import SuccessTuple
import meerschaum as mrsm
import datetime
def fetch(pipe: mrsm.Pipe, **kw) -> 'pd.DataFrame':
...
def process_raw_data(raw_df: 'pd.DataFrame') -> 'pd.DataFrame':
...
def sync(pipe: mrsm.Pipe, **kw) -> SuccessTuple:
"""
Store the raw data in a temporary pipe.
"""
### You can use any string as the connector
#### if the pipe will only be updated via `.sync(df)`.
bucket = mrsm.Pipe('foo', 'bar', instance='sql:local')
### Grab the last datetime of the bucket.
### Will be `None` if no results are found.
### You can also use `params` if you wanted to filter by columns,
### and `newest=False` to get the oldest datetime value.
last_sync_time = bucket.get_sync_time()
raw_df = fetch(bucket, **kw)
bucket.sync(raw_df)
new_sync_time = bucket.get_sync_time()
### If you mutate your data, you might need to call `pipe.clear()`.
### Depending on your situation, you might be able to just use `raw_df`.
processed_df = process_raw_data(
bucket.get_data(begin=last_sync_time, end=new_sync_time)
)
success, msg = pipe.sync(processed_df)
if not success:
return success, msg
### You can use any bounds you like.
### Here we're only deleting rows older than 10 days.
return bucket.clear(end=new_sync_time+datetime.timedelta(days=10)) I hope this helps! Meerschaum is flexible enough to give you a lot of different options for balancing between space and speed (trust me, I wrote a whole thesis on it!). |
Beta Was this translation helpful? Give feedback.
@p-zach yes, each pipe has an underlying SQL table, so in your example the child would duplicate the parent because it only adds rows to the parent table.
One way to save space is to create a view that selects from the parent to act as the child. A view is basically an alias for a
SELECT
statement that behaves like a virtual table. For example, a view like this would mimic the child data from your example plugin:You could even drop the table for the child plugin and remove its
.sync()
and use the…