Skip to content

Commit

Permalink
connect: use the public /v1/tasks/{id} endpoint (#609)
Browse files Browse the repository at this point in the history
* connect: use the public /v1/tasks/{id} endpoint

The v0 task is still returned by the v0 deploy endpoint.

* poll-wait is an integer, not a float, and is used to configure the long-poll
  task requests against the Connect server.

* task_get() calls /v1/tasks/{id}. It takes first and wait arguments,
  indicating the starting point for requested output and the duration for
  long-poll requests.

* wait_for_task() does no sleeping, but asks task_get() to perform a long-poll
  with its wait argument.

* output_task_log() sends every output line to the log callback. It previously
  used data abut the last-line to determine when to send lines, but that is
  unnecessary, as wait_for_task() requests only new lines from task_get().

* RSConnectExecutor.delete_runtime_cache() returns its delete result and task
  for easier testing. It no longer stores state on the object.

fixes #608

* backwards-compatible fields for rsconnect-jupyter
  • Loading branch information
aronatkins authored Aug 26, 2024
1 parent 2c12229 commit 4ffef39
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 103 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Changed

- The `rsconnect content build run --poll-wait` argument specifies an integral
number of seconds. It previously allowed fractional seconds. (#608)

- Uses the public Connect server API endpoint `/v1/tasks/{id}` to poll task
progress. (#608)

### Removed

- Uncalled `RSConnectClient.app_publish()` function, which referenced an
Expand Down
10 changes: 5 additions & 5 deletions rsconnect/actions_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def build_start(
running: bool = False,
retry: bool = False,
all: bool = False,
poll_wait: float = 2,
poll_wait: int = 1,
debug: bool = False,
):
build_store = ensure_content_build_store(connect_server)
Expand Down Expand Up @@ -302,7 +302,7 @@ def _monitor_build(connect_server: RSConnectServer, content_items: list[ContentI
return True


def _build_content_item(connect_server: RSConnectServer, content: ContentItemWithBuildState, poll_wait: float):
def _build_content_item(connect_server: RSConnectServer, content: ContentItemWithBuildState, poll_wait: int):
build_store = ensure_content_build_store(connect_server)
with RSConnectClient(connect_server) as client:
# Pending futures will still try to execute when ThreadPoolExecutor.shutdown() is called
Expand Down Expand Up @@ -333,7 +333,7 @@ def _build_content_item(connect_server: RSConnectServer, content: ContentItemWit
def write_log(line: str):
log.write("%s\n" % line)

_, _, task_status = emit_task_log(
_, _, task = emit_task_log(
connect_server,
guid,
task_id,
Expand All @@ -347,8 +347,8 @@ def write_log(line: str):
if build_store.aborted():
return

build_store.set_content_item_last_build_task_result(guid, task_status)
if task_status["code"] != 0:
build_store.set_content_item_last_build_task_result(guid, task)
if task["code"] != 0:
logger.error("Build failed: %s" % guid)
build_store.set_content_item_build_status(guid, BuildStatus.ERROR)
else:
Expand Down
116 changes: 54 additions & 62 deletions rsconnect/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
PyInfo,
ServerSettings,
TaskStatusV0,
TaskStatusV1,
UserRecord,
)
from .timeouts import get_task_timeout, get_task_timeout_help_message
Expand Down Expand Up @@ -393,12 +394,26 @@ def system_caches_runtime_delete(self, target: DeleteInputDTO) -> DeleteOutputDT
response = self._server.handle_bad_response(response)
return response

def task_get(self, task_id: str, first_status: Optional[int] = None) -> TaskStatusV0:
def task_get(
self,
task_id: str,
first: Optional[int] = None,
wait: Optional[int] = None,
) -> TaskStatusV1:
params = None
if first_status is not None:
params = {"first_status": first_status}
response = cast(Union[TaskStatusV0, HTTPResponse], self.get("tasks/%s" % task_id, query_params=params))
if first is not None or wait is not None:
params = {}
if first is not None:
params["first"] = first
if wait is not None:
params["wait"] = wait
response = cast(Union[TaskStatusV1, HTTPResponse], self.get("v1/tasks/%s" % task_id, query_params=params))
response = self._server.handle_bad_response(response)

# compatibility with rsconnect-jupyter
response["status"] = response["output"]
response["last_status"] = response["last"]

return response

def deploy(
Expand Down Expand Up @@ -467,76 +482,55 @@ def wait_for_task(
log_callback: Optional[Callable[[str], None]],
abort_func: Callable[[], bool] = lambda: False,
timeout: int = get_task_timeout(),
poll_wait: float = 0.5,
poll_wait: int = 1,
raise_on_error: bool = True,
) -> tuple[list[str] | None, TaskStatusV0]:
) -> tuple[list[str] | None, TaskStatusV1]:
if log_callback is None:
log_lines: list[str] | None = []
log_callback = log_lines.append
else:
log_lines = None

last_status: int | None = None
first: int | None = None
start_time = time.time()
sleep_duration = 0.5
time_slept = 0.0
while True:
if (time.time() - start_time) > timeout:
raise RSConnectException(get_task_timeout_help_message(timeout))
elif abort_func():
raise RSConnectException("Task aborted.")

# we continue the loop so that we can re-check abort_func() in case there was an interrupt (^C),
# otherwise the user would have to wait a full poll_wait cycle before the program would exit.
if time_slept <= poll_wait:
time_slept += sleep_duration
time.sleep(sleep_duration)
continue
else:
time_slept = 0
task_status = self.task_get(task_id, last_status)
last_status = self.output_task_log(task_status, last_status, log_callback)
if task_status["finished"]:
result = task_status.get("result")
if isinstance(result, dict):
data = result.get("data", "")
type = result.get("type", "")
if data or type:
log_callback("%s (%s)" % (data, type))

err = task_status.get("error")
if err:
log_callback("Error from Connect server: " + err)

exit_code = task_status["code"]
if exit_code != 0:
exit_status = "Task exited with status %d." % exit_code
if raise_on_error:
raise RSConnectException(exit_status)
else:
log_callback("Task failed. %s" % exit_status)
return log_lines, task_status
task = self.task_get(task_id, first=first, wait=poll_wait)
self.output_task_log(task, log_callback)
first = task["last"]
if task["finished"]:
result = task.get("result")
if isinstance(result, dict):
data = result.get("data", "")
type = result.get("type", "")
if data or type:
log_callback("%s (%s)" % (data, type))

err = task.get("error")
if err:
log_callback("Error from Connect server: " + err)

exit_code = task["code"]
if exit_code != 0:
exit_status = "Task exited with status %d." % exit_code
if raise_on_error:
raise RSConnectException(exit_status)
else:
log_callback("Task failed. %s" % exit_status)
return log_lines, task

@staticmethod
def output_task_log(
task_status: TaskStatusV0,
last_status: int | None,
task: TaskStatusV1,
log_callback: Callable[[str], None],
):
"""Pipe any new output through the log_callback.
Returns an updated last_status which should be passed into
the next call to output_task_log.
Raises RSConnectException on task failure.
"""
new_last_status = last_status
if task_status["last_status"] != last_status:
for line in task_status["status"]:
log_callback(line)
new_last_status = task_status["last_status"]

return new_last_status
"""Pipe any new output through the log_callback."""
for line in task["output"]:
log_callback(line)


# for backwards compatibility with rsconnect-jupyter
Expand Down Expand Up @@ -601,8 +595,6 @@ def __init__(

self.bundle: IO[bytes] | None = None
self.deployed_info: RSConnectClientDeployResult | None = None
self.result: DeleteOutputDTO | None = None
self.task_status: TaskStatusV0 | None = None

self.logger: logging.Logger | None = logger
self.ctx = ctx
Expand Down Expand Up @@ -954,7 +946,7 @@ def emit_task_log(
log_callback: logging.Logger = connect_logger,
abort_func: Callable[[], bool] = lambda: False,
timeout: int = get_task_timeout(),
poll_wait: float = 0.5,
poll_wait: int = 1,
raise_on_error: bool = True,
):
"""
Expand Down Expand Up @@ -1207,10 +1199,10 @@ def delete_runtime_cache(self, language: Optional[str], version: Optional[str],
self.result = result
if result["task_id"] is None:
print("Dry run finished")
return result, None
else:
(_, task_status) = self.client.wait_for_task(result["task_id"], connect_logger.info, raise_on_error=False)
self.task_status = task_status
return self
(_, task) = self.client.wait_for_task(result["task_id"], connect_logger.info, raise_on_error=False)
return result, task


class S3Client(HTTPServer):
Expand Down Expand Up @@ -1825,7 +1817,7 @@ def emit_task_log(
log_callback: Optional[Callable[[str], None]],
abort_func: Callable[[], bool] = lambda: False,
timeout: int = get_task_timeout(),
poll_wait: float = 0.5,
poll_wait: int = 1,
raise_on_error: bool = True,
):
"""
Expand Down
8 changes: 4 additions & 4 deletions rsconnect/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2690,9 +2690,9 @@ def get_build_logs(
@click.option("--all", is_flag=True, help="Build all content, even if it is already marked as COMPLETE.")
@click.option(
"--poll-wait",
type=click.FloatRange(min=0.5, clamp=True),
default=2,
help="Defines the number of seconds between polls when polling for build output. Defaults to 2.",
type=click.IntRange(min=1, clamp=True),
default=1,
help="Defines the number of seconds between polls when polling for build output. Defaults to 1.",
)
@click.option(
"--format",
Expand Down Expand Up @@ -2720,7 +2720,7 @@ def start_content_build(
running: bool,
retry: bool,
all: bool,
poll_wait: float,
poll_wait: int,
format: LogOutputFormat.All,
debug: bool,
verbose: int,
Expand Down
14 changes: 6 additions & 8 deletions rsconnect/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

from .exception import RSConnectException
from .log import logger
from .models import AppMode, AppModes, ContentItemV1, TaskStatusResult, TaskStatusV0
from .models import AppMode, AppModes, ContentItemV1, TaskStatusResult, TaskStatusV1

T = TypeVar("T", bound=Mapping[str, object])

Expand Down Expand Up @@ -544,21 +544,20 @@ def resolve(self, server: str, app_id: Optional[str], app_mode: Optional[AppMode
DEFAULT_BUILD_DIR = join(os.getcwd(), "rsconnect-build")


# A trimmed version of TaskStatusV0 which doesn't contain `status` and `last_status` fields.
class TaskStatusV0Trimmed(TypedDict):
# A trimmed version of TaskStatusV1 which doesn't contain `output` and `last` fields.
class TaskStatusV1Trimmed(TypedDict):
id: str
finished: bool
code: int
error: str
user_id: int
result: TaskStatusResult | None


class ContentItemWithBuildState(ContentItemV1, TypedDict):
rsconnect_build_status: str
rsconnect_last_build_time: NotRequired[str]
rsconnect_last_build_log: NotRequired[str | None]
rsconnect_build_task_result: NotRequired[TaskStatusV0Trimmed]
rsconnect_build_task_result: NotRequired[TaskStatusV1Trimmed]


class ContentBuildStoreData(TypedDict):
Expand Down Expand Up @@ -745,7 +744,7 @@ def update_content_item_last_build_log(self, guid: str, log_file: str | None, de
if not defer_save:
self.save()

def set_content_item_last_build_task_result(self, guid: str, task: TaskStatusV0, defer_save: bool = False) -> None:
def set_content_item_last_build_task_result(self, guid: str, task: TaskStatusV1, defer_save: bool = False) -> None:
"""
Set the latest task_result for a content build
"""
Expand All @@ -754,12 +753,11 @@ def set_content_item_last_build_task_result(self, guid: str, task: TaskStatusV0,
# status contains the log lines for the build. We have already recorded these in the
# log file on disk so we can remove them from the task result before storing it
# to reduce the data stored in our state-file.
task_copy: TaskStatusV0Trimmed = {
task_copy: TaskStatusV1Trimmed = {
"id": task["id"],
"finished": task["finished"],
"code": task["code"],
"error": task["error"],
"user_id": task["user_id"],
"result": task["result"],
}
content["rsconnect_build_task_result"] = task_copy
Expand Down
4 changes: 4 additions & 0 deletions rsconnect/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,10 @@ class TaskStatusV1(TypedDict):
last: int
result: TaskStatusResult | None

# redundant fields for compatibility with rsconnect-python.
last_status: int
status: list[str]


class BootstrapOutputDTO(TypedDict):
api_key: str
Expand Down
Loading

0 comments on commit 4ffef39

Please sign in to comment.