Skip to content

Commit

Permalink
wired up the saving of workflow files to S3
Browse files Browse the repository at this point in the history
  • Loading branch information
DinisCruz committed Oct 16, 2024
1 parent fe47af9 commit ea91703
Show file tree
Hide file tree
Showing 13 changed files with 131 additions and 72 deletions.
22 changes: 22 additions & 0 deletions osbot_serverless_flows/Serverless_Flows__Server_Config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from osbot_utils.utils.Env import get_env
from osbot_utils.base_classes.Type_Safe import Type_Safe


DEFAULT__SERVERLESS_FLOWS__AWS_ACCOUNT_ID = '0000111100001111'
ENV_NAME__SERVERLESS_FLOWS__USE_LOCAL_STACK = 'SERVERLESS_FLOWS__USE_LOCAL_STACK'

class Serverless_Flows__Server_Config(Type_Safe):
aws_account_id : str = DEFAULT__SERVERLESS_FLOWS__AWS_ACCOUNT_ID
use_local_stack : bool = False

def setup(self):
self.use_local_stack = get_env(ENV_NAME__SERVERLESS_FLOWS__USE_LOCAL_STACK) == 'True' or self.use_local_stack
self.aws_account_id = get_env('AWS_ACCOUNT_ID') or self.aws_account_id
return self

def reset(self):
self.aws_account_id = DEFAULT__SERVERLESS_FLOWS__AWS_ACCOUNT_ID
self.use_local_stack = False
return self

serverless_flows__server_config = Serverless_Flows__Server_Config()
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from osbot_serverless_flows.aws.s3.S3_DB__Flows import S3_DB__Flows
from osbot_utils.base_classes.Type_Safe import Type_Safe
from osbot_utils.base_classes.Type_Safe import Type_Safe


class OSBot_Serverless_Flows__Shared_Objects(Type_Safe):
class Serverless_Flows__Shared_Objects(Type_Safe):

def s3_db_flows(self):
s3_db_flows = S3_DB__Flows()
s3_db_flows.setup() # will create buckey if needed
return s3_db_flows

osbot_serverless_flows__shared_objects = OSBot_Serverless_Flows__Shared_Objects()
serverless_flows__shared_objects = Serverless_Flows__Shared_Objects()
2 changes: 1 addition & 1 deletion osbot_serverless_flows/aws/s3/S3_DB__Flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@

class S3_DB__Flows(S3__DB_Base): # todo: refactor the need of this generic class/bucket into specific buckets
bucket_name__suffix : str = 'flows'
bucket_name__prefix : str = 'osbot-serverless-flows'
bucket_name__prefix : str = 'serverless-flows'

20 changes: 19 additions & 1 deletion osbot_serverless_flows/fast_api/Fast_API__Serverless_Flows.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
from osbot_fast_api.api.Fast_API import Fast_API

from osbot_aws.flows.Flow_Events__To__S3 import Flow_Events__To__S3
from osbot_local_stack.local_stack.Local_Stack import Local_Stack
from osbot_prefect.flows.Flow_Events__To__Prefect_Server import Flow_Events__To__Prefect_Server
from osbot_serverless_flows.Serverless_Flows__Server_Config import serverless_flows__server_config
from osbot_serverless_flows.Serverless_Flows__Shared_Objects import serverless_flows__shared_objects
from osbot_serverless_flows.fast_api.routes.Routes__Debug import Routes__Debug
from osbot_serverless_flows.fast_api.routes.Routes__Dev import Routes__Dev
from osbot_serverless_flows.fast_api.routes.Routes__GSuite import Routes__GSuite
Expand All @@ -9,9 +14,11 @@


class Fast_API__Serverless_Flows(Fast_API):
prefect_enabled : bool = False
prefect_enabled : bool = False
flow_events_to_s3_enabled : bool = False

def setup(self):
self.setup__local_stack()
self.setup__prefect_cloud()
super().setup()
return self
Expand All @@ -20,12 +27,23 @@ def setup(self):
def flow_events_to_prefect_server(self):
return Flow_Events__To__Prefect_Server()

@cache_on_self
def flow_events_to_s3(self):
return Flow_Events__To__S3()

def setup__prefect_cloud(self):
with self.flow_events_to_prefect_server() as _:
if _.prefect_cloud_api.prefect_rest_api.prefect_is_server_online():
_.add_event_listener()
self.prefect_enabled = True

def setup__local_stack(self):
if serverless_flows__server_config.use_local_stack:
Local_Stack().activate()
with self.flow_events_to_s3() as _:
_.s3_bucket = serverless_flows__shared_objects.s3_db_flows().s3_bucket()
_.start()
self.flow_events_to_s3_enabled = True


def setup_routes(self):
Expand Down
21 changes: 1 addition & 20 deletions osbot_serverless_flows/lambdas/handler.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,10 @@
from fastapi import FastAPI
from mangum import Mangum

from mangum import Mangum
from osbot_serverless_flows.fast_api.Fast_API__Serverless_Flows import Fast_API__Serverless_Flows
from osbot_serverless_flows.utils.Version import version__osbot_serverless_flows

# app = FastAPI()
#
# @app.get("/")
# def root():
# return {"message": "Hello World!"}
#
# @app.get("/ping")
# def ping():
# return {"pong": "42"}
#
# @app.get("/version")
# def version():
# return version__osbot_serverless_flows


serverless_flows = Fast_API__Serverless_Flows().setup()
app = serverless_flows.app()
run = Mangum(app)

if __name__ == "__main__":
import uvicorn

uvicorn.run(app, host="0.0.0.0", port=8080)

This file was deleted.

23 changes: 13 additions & 10 deletions tests/integration/aws/s3/test_S3_DB__Flows.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
from unittest import TestCase

from tests.integration.fast_api_objs_for_tests import osbot_serverless__flows_local_stack, DEFAULT_TEST__AWS_ACCOUNT_ID
from osbot_serverless_flows.utils.OSBot_Serverless_Flows__Shared_Objects import osbot_serverless_flows__shared_objects
from unittest import TestCase

from osbot_utils.utils.Dev import pprint

from osbot_local_stack.testing.TestCase__Local_Stack import TestCase__Local_Stack
from osbot_serverless_flows.aws.s3.S3_DB__Flows import S3_DB__Flows
from tests.integration.fast_api_objs_for_tests import fast_api__local_stack
from osbot_local_stack.local_stack.Local_Stack import Local_Stack
from osbot_serverless_flows.Serverless_Flows__Server_Config import DEFAULT__SERVERLESS_FLOWS__AWS_ACCOUNT_ID
from osbot_serverless_flows.Serverless_Flows__Shared_Objects import serverless_flows__shared_objects
from osbot_serverless_flows.aws.s3.S3_DB__Flows import S3_DB__Flows


class test_S3_DB__Flows(TestCase):

@classmethod
def setUpClass(cls):
cls.local_stack = osbot_serverless__flows_local_stack.local_stack
cls.s3_db_flows = osbot_serverless_flows__shared_objects.s3_db_flows()
cls.local_stack = fast_api__local_stack
cls.s3_db_flows = serverless_flows__shared_objects.s3_db_flows()

def test__setup_class(self):
assert self.s3_db_flows.bucket_exists() is True
assert self.s3_db_flows.s3_bucket () == f'osbot-serverless-flows-{DEFAULT_TEST__AWS_ACCOUNT_ID}-flows'
assert type(self.local_stack) is Local_Stack
assert type(self.s3_db_flows) == S3_DB__Flows
assert self.local_stack.is_local_stack_configured_and_available() is True
assert self.s3_db_flows.bucket_exists() is True
assert self.s3_db_flows.s3_bucket () == f'serverless-flows-{DEFAULT__SERVERLESS_FLOWS__AWS_ACCOUNT_ID}-flows'
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from fastapi import FastAPI
from osbot_fast_api.utils.Fast_API_Server import Fast_API_Server
from osbot_fast_api.utils.Version import version__osbot_fast_api

from osbot_serverless_flows.Serverless_Flows__Shared_Objects import serverless_flows__shared_objects
from osbot_utils.utils.Misc import list_set
from tests.integration.fast_api_objs_for_tests import fast_api__serverless_flows
from osbot_serverless_flows.fast_api.Fast_API__Serverless_Flows import Fast_API__Serverless_Flows
Expand Down Expand Up @@ -46,3 +48,11 @@ def test__dev__flow_testing_tasks(self):
assert response.json() == {'flow_result': f'flow completed: {flow_run_id} ',
'post_data' : {'answer': 42 }}


# confirm logs where saved to Local_Stack's S3
with fast_api__serverless_flows.flow_events_to_s3() as _:
s3_path_segments = _.s3_key_generator.create_path_elements__from_when()
s3_path_segments.append(flow_run_id)
s3_folder = '/'.join(s3_path_segments)

assert len(_.s3().find_files(_.s3_bucket, s3_folder)) > 0
28 changes: 19 additions & 9 deletions tests/integration/fast_api_objs_for_tests.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
from osbot_serverless_flows.fast_api.Fast_API__Serverless_Flows import Fast_API__Serverless_Flows
from osbot_serverless_flows.utils.OSBot_Serverless_Flows__Local_Stack import OSBot_Serverless_Flows__Local_Stack
from osbot_utils.context_managers.capture_duration import capture_duration
from osbot_aws.testing.Temp__Random__AWS_Credentials import Temp_AWS_Credentials

DEFAULT_TEST__AWS_ACCOUNT_ID = '0000111100001111'
from osbot_utils.utils.Dev import pprint

with OSBot_Serverless_Flows__Local_Stack() as _:
osbot_serverless__flows_local_stack = _
_.temp_asw_credentials.env_vars['AWS_ACCOUNT_ID'] = DEFAULT_TEST__AWS_ACCOUNT_ID
_.activate() # todo : see side effects of putting this here
from osbot_serverless_flows.Serverless_Flows__Server_Config import DEFAULT__SERVERLESS_FLOWS__AWS_ACCOUNT_ID, \
ENV_NAME__SERVERLESS_FLOWS__USE_LOCAL_STACK, serverless_flows__server_config
from osbot_utils.utils.Env import set_env

from osbot_local_stack.local_stack.Local_Stack import Local_Stack
from osbot_serverless_flows.fast_api.Fast_API__Serverless_Flows import Fast_API__Serverless_Flows
from osbot_utils.context_managers.capture_duration import capture_duration
# todo : see side effects of putting this here

def setup_env_vars():
Temp_AWS_Credentials().set_vars()
set_env('AWS_ACCOUNT_ID' , DEFAULT__SERVERLESS_FLOWS__AWS_ACCOUNT_ID)
set_env(ENV_NAME__SERVERLESS_FLOWS__USE_LOCAL_STACK, 'True')
serverless_flows__server_config.setup()

with capture_duration() as duration:
setup_env_vars()
fast_api__local_stack = Local_Stack()
fast_api__serverless_flows = Fast_API__Serverless_Flows()
fast_api__serverless_flows.setup() # setup_server
fast_api__serverless_flows.setup() # setup_server
client__serverless_flows = fast_api__serverless_flows.client()
assert fast_api__local_stack.is_local_stack_configured_and_available() is True

assert duration.seconds < 1 # make sure the setup time is less than 1 second

Expand Down
30 changes: 30 additions & 0 deletions tests/integration/test_Serverless_Flows__Server_Config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from unittest import TestCase

from osbot_serverless_flows.Serverless_Flows__Server_Config import serverless_flows__server_config, \
DEFAULT__SERVERLESS_FLOWS__AWS_ACCOUNT_ID, ENV_NAME__SERVERLESS_FLOWS__USE_LOCAL_STACK
from osbot_utils.testing.Temp_Env_Vars import Temp_Env_Vars
from osbot_utils.utils.Objects import __


class test_OSBot_Serverless_Flows__Server_Config(TestCase):

def test_setup(self):
original_values = serverless_flows__server_config.obj()

def reset_values():
serverless_flows__server_config.reset()
assert serverless_flows__server_config.obj() == __(aws_account_id=DEFAULT__SERVERLESS_FLOWS__AWS_ACCOUNT_ID, use_local_stack=False)
serverless_flows__server_config.setup()
assert serverless_flows__server_config.obj() == original_values

with Temp_Env_Vars(env_vars={'AWS_ACCOUNT_ID': '1234567890'}):
assert serverless_flows__server_config.setup().aws_account_id == '1234567890'
reset_values()

with Temp_Env_Vars(env_vars={ENV_NAME__SERVERLESS_FLOWS__USE_LOCAL_STACK: 'True'}):
assert serverless_flows__server_config.setup().use_local_stack == True
reset_values()

with Temp_Env_Vars(env_vars={ENV_NAME__SERVERLESS_FLOWS__USE_LOCAL_STACK: 'False'}):
assert serverless_flows__server_config.setup().use_local_stack == original_values.use_local_stack
reset_values()
13 changes: 13 additions & 0 deletions tests/integration/test_Serverless_Flows__Shared_Objects.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from unittest import TestCase
from integration.fast_api_objs_for_tests import fast_api__local_stack
from osbot_serverless_flows.Serverless_Flows__Server_Config import DEFAULT__SERVERLESS_FLOWS__AWS_ACCOUNT_ID
from osbot_serverless_flows.Serverless_Flows__Shared_Objects import serverless_flows__shared_objects

class test_Serverless_Flows__Shared_Objects(TestCase):

def test_s3_db_flows(self):
assert fast_api__local_stack.is_local_stack_configured_and_available() is True
s3_db_flows = serverless_flows__shared_objects.s3_db_flows()

assert s3_db_flows.s3_bucket() == F'serverless-flows-{DEFAULT__SERVERLESS_FLOWS__AWS_ACCOUNT_ID}-flows'
assert s3_db_flows.bucket_exists() is True
1 change: 0 additions & 1 deletion tests/integration/test__for_integration_tests.py

This file was deleted.

This file was deleted.

0 comments on commit ea91703

Please sign in to comment.