Skip to content

Commit

Permalink
search provider: Port to async
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelmardojai committed Sep 16, 2024
1 parent 137af42 commit 6761195
Showing 1 changed file with 118 additions and 118 deletions.
236 changes: 118 additions & 118 deletions dialect/search_provider/search_provider.in
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,25 @@
import gettext
import locale
import sys
from typing import Callable
import inspect
import logging
from typing import Any, Callable, Coroutine

import gi

gi.require_version("Secret", "1")
gi.require_version("Soup", "3.0")
from gi.repository import Gio, GLib

from dialect.providers import TRANSLATORS
from dialect.providers.base import ProviderErrorCode
from dialect.asyncio import create_background_task, glib_event_loop_policy
from dialect.providers import (
TRANSLATORS,
TranslationRequest,
ProviderError,
RequestError,
APIKeyInvalid,
APIKeyRequired,
)
from dialect.settings import Settings

CLIPBOARD_PREFIX = "copy-to-clipboard"
Expand Down Expand Up @@ -77,17 +86,12 @@ class TranslateServiceApplication(Gio.Application):
self.search_interface = Gio.DBusNodeInfo.new_for_xml(dbus_interface_description).interfaces[0]

self.loaded = False
self.load_failed = False

# Translations store
self.translations = {}
self.translations = {} # Translations store
self.src_language = "auto"
self.dest_language = None

# Translator
self._load_translator()
Settings.get().connect("changed", self._on_settings_changed)
Settings.get().connect("translator-changed", self._on_translator_changed)
Settings.get().connect("provider-changed::translator", self._on_translator_changed)

def do_dbus_register(self, connection, object_path):
try:
Expand All @@ -112,7 +116,8 @@ class TranslateServiceApplication(Gio.Application):
parameters: GLib.Variant,
invocation: Gio.DBusMethodInvocation,
):
def return_value(results):

def wrap_results(results: Any) -> GLib.Variant:
results = (results,)
if results == (None,):
results = ()
Expand All @@ -126,78 +131,82 @@ class TranslateServiceApplication(Gio.Application):
)
+ ")"
)
wrapped_results = GLib.Variant(results_type, results)

invocation.return_value(wrapped_results)
return GLib.Variant(results_type, results)

async def return_async_value(method: Callable[..., Coroutine], *args):
results = wrap_results(await method(*args))
self.release()
invocation.return_value(results)

method = getattr(self, method_name)
arguments = list(parameters.unpack())
arguments.append(return_value)
args = list(parameters.unpack())

method(*arguments)
if inspect.iscoroutinefunction(method): # Async methods
create_background_task(return_async_value(method, *args))
self.hold()
else: # Sync methods
results = wrap_results(method(*args))
invocation.return_value(results)

@property
def live_enabled(self) -> bool:
return Settings.get().live_translation and Settings.get().sp_translation

def GetInitialResultSet(self, terms: list[str], callback: Callable[[list[str]], None]):
async def GetInitialResultSet(self, terms: list[str]) -> list[str]:
"""
Join separate terms in one ID line, start translation and send this line back
on start of input
"""

def on_done(translation):
self.translations[text] = translation.text
callback([text, CLIPBOARD_PREFIX + text])
self.release()

def on_fail(error):
match error.code:
case ProviderErrorCode.NETWORK:
self.translations[error_id] = _("Translation failed, check for network issues")
case ProviderErrorCode.API_KEY_INVALID:
self.translations[error_id] = _("The provided API key is invalid")
case ProviderErrorCode.API_KEY_REQUIRED:
self.translations[error_id] = _("API key is required to use the service")
case _:
self.translations[error_id] = _("Translation failed")
callback([error_id])
self.release()

text = " ".join(terms)

if self.live_enabled:
error_id = ERROR_PREFIX + text

if self.load_failed:
self.translations[error_id] = _("Failed loading the translation service")
callback([error_id])
elif not self.loaded:
return self.GetInitialResultSet(terms, callback)
# Load the translator if needed
# TODO: Verify API key when needed
if not self.loaded:
try:
await self._load_translator()
except Exception:
self.translations[error_id] = _("Failed loading the translation service")
return [error_id]

# If the two languages are the same, nothing is done
if self.dest_language and self.src_language != self.dest_language and text != "":
src, dest = self.translator.denormalize_lang(self.src_language, self.dest_language)
self.translator.translate(text, src, dest, on_done, on_fail)
self.hold()

request = TranslationRequest(text, src, dest)

try:
translation = await self.translator.translate(request)
self.translations[text] = translation.text
return [text, CLIPBOARD_PREFIX + text]
except (RequestError, ProviderError) as exc:
logging.error(exc)

if isinstance(exc, RequestError):
self.translations[error_id] = _("Translation failed, check for network issues")
elif isinstance(exc, APIKeyInvalid):
self.translations[error_id] = _("The provided API key is invalid")
elif isinstance(exc, APIKeyRequired):
self.translations[error_id] = _("API key is required to use the service")
else:
self.translations[error_id] = _("Translation failed")

return [error_id]
else:
return []
else:
provider = Settings.get().active_translator

callback(
[
_("Translate “{text}” with {provider_name}").format(
text=text, provider_name=TRANSLATORS[provider].prettyname
)
]
)
return [
_("Translate “{text}” with {provider_name}").format(
text=text, provider_name=TRANSLATORS[Settings.get().active_translator].prettyname
)
]

def GetSubsearchResultSet(
self, _previous_results: list[str], new_terms: list[str], callback: Callable[[list[str]], None]
):
self.GetInitialResultSet(new_terms, callback)
async def GetSubsearchResultSet(self, _previous_results: list[str], new_terms: list[str]) -> list[str]:
return await self.GetInitialResultSet(new_terms)

def GetResultMetas(self, ids: list[str], callback: Callable[[list[dict[str, GLib.Variant]]], None]):
def GetResultMetas(self, ids: list[str]) -> list[dict[str, GLib.Variant]]:
"""Send translated text"""

translate_id = ids[0]
Expand All @@ -207,14 +216,12 @@ class TranslateServiceApplication(Gio.Application):
if translate_id in self.translations:
text = self.translations[translate_id]

callback(
[
{
"id": GLib.Variant("s", translate_id),
"name": GLib.Variant("s", text),
}
]
)
return [
{
"id": GLib.Variant("s", translate_id),
"name": GLib.Variant("s", text),
}
]

elif len(ids) == 2 and translate_id in self.translations and ids[1] == CLIPBOARD_PREFIX + ids[0]:
text = self.translations[translate_id]
Expand All @@ -227,80 +234,73 @@ class TranslateServiceApplication(Gio.Application):

self.translations.clear()

callback(
[
{
"id": GLib.Variant("s", translate_id),
"name": GLib.Variant("s", text),
"description": GLib.Variant("s", description),
},
{
"id": GLib.Variant("s", ids[1]),
"name": GLib.Variant("s", _("Copy")),
"description": GLib.Variant("s", _("Copy translation to clipboard")),
"clipboardText": GLib.Variant("s", text),
},
]
)
return [
{
"id": GLib.Variant("s", translate_id),
"name": GLib.Variant("s", text),
"description": GLib.Variant("s", description),
},
{
"id": GLib.Variant("s", ids[1]),
"name": GLib.Variant("s", _("Copy")),
"description": GLib.Variant("s", _("Copy translation to clipboard")),
"clipboardText": GLib.Variant("s", text),
},
]

else:
# Probably never needed, just in case
callback(
[
dict(
id=GLib.Variant("s", id),
name=GLib.Variant("s", id),
)
for id in ids
]
)
return [
dict(
id=GLib.Variant("s", id),
name=GLib.Variant("s", id),
)
for id in ids
]

def ActivateResult(self, result_id: str, terms: list[str], timestamp: int, callback: Callable):
def ActivateResult(self, result_id: str, terms: list[str], timestamp: int):
if not result_id.startswith(CLIPBOARD_PREFIX):
self.LaunchSearch(terms, timestamp, callback)
else:
callback((None,))
self.LaunchSearch(terms, timestamp)

def LaunchSearch(self, terms: list[str], _timestamp: int, callback: Callable):
def LaunchSearch(self, terms: list[str], _timestamp: int):
text = " ".join(terms)
GLib.spawn_async_with_pipes(None, ["@BIN@", "--text", text], None, GLib.SpawnFlags.SEARCH_PATH, None)

callback((None,))
async def _load_translator(self):
if self.loaded:
return

self.translator = TRANSLATORS[Settings.get().active_translator]()

# Init translator
try:
await self.translator.init_trans()

def _load_translator(self):
def on_done():
self.loaded = True
self.load_failed = False
self.dest_language = self.translator.recent_dest_langs[0]

self.translator.settings.connect("changed", self._on_translator_settings_changed)

def on_fail(_error):
self.loaded = False
self.load_failed = True
except Exception:
self.dest_language = None

self.loaded = False
provider = Settings.get().active_translator
self.translator = TRANSLATORS[provider]()

# Init translator
self.translator.init_trans(on_done, on_fail)

def _on_settings_changed(self, _settings, key: str):
if key.startswith("translator-"):
self._load_translator()
raise

def _on_translator_changed(self, *args):
self._load_translator()
self.loaded = False

def _on_translator_settings_changed(self, _settings, key: str):
if key == "src-langs" or key == "dest-langs":
self.dest_language = self.translator.recent_dest_langs[0]
else:
self._load_translator()
self.loaded = False


if __name__ == "__main__":
def main():
app = TranslateServiceApplication()
sys.exit(app.run(None))
exit_code = 0
with glib_event_loop_policy():
exit_code = app.run(None)
return exit_code


if __name__ == "__main__":
sys.exit(main())

0 comments on commit 6761195

Please sign in to comment.