diff --git a/dialect/asyncio.py b/dialect/asyncio.py
new file mode 100644
index 00000000..ebe67103
--- /dev/null
+++ b/dialect/asyncio.py
@@ -0,0 +1,50 @@
+import asyncio
+import contextlib
+import functools
+from typing import Callable, Coroutine
+
+from gi.events import GLibEventLoopPolicy
+
+
+@contextlib.contextmanager
+def glib_event_loop_policy():
+ original = asyncio.get_event_loop_policy()
+ policy = GLibEventLoopPolicy()
+ asyncio.set_event_loop_policy(policy)
+ try:
+ yield policy
+ finally:
+ asyncio.set_event_loop_policy(original)
+
+
+_background_tasks: set[asyncio.Task] = set()
+
+
+def create_background_task(coro: Coroutine) -> asyncio.Task:
+ """
+ Create and track a task.
+
+ Normally tasks are weak-referenced by asyncio.
+ We keep track of them, so they can be completed before GC kicks in.
+ """
+ task = asyncio.create_task(coro)
+ _background_tasks.add(task)
+ task.add_done_callback(_background_tasks.discard)
+ return task
+
+
+def background_task(f: Callable[..., Coroutine]):
+ """
+ Wraps an async function to be run using ``create_background_task``.
+
+ Useful to use async functions like signal handlers or GTK template callbacks.
+
+ Note: The return value will be lost, so this is not suitable when you need to
+ return something from the coroutine, what might be needed in some signal handlers.
+ """
+
+ @functools.wraps(f)
+ def decor(*args, **kwargs):
+ create_background_task(f(*args, **kwargs))
+
+ return decor
diff --git a/dialect/main.py b/dialect/main.py
index 0fb16d7f..77411c64 100755
--- a/dialect/main.py
+++ b/dialect/main.py
@@ -21,6 +21,7 @@
except ImportError or ValueError:
logging.error("Error: GObject dependencies not met.")
+from dialect.asyncio import glib_event_loop_policy
from dialect.define import APP_ID, RES_PATH, VERSION
from dialect.preferences import DialectPreferencesDialog
from dialect.settings import Settings
@@ -168,10 +169,7 @@ def _on_pronunciation(self, action: Gio.SimpleAction, value: GLib.Variant):
# Update UI
if self.window:
- if self.window.trans_src_pron is not None:
- self.window.src_pron_revealer.props.reveal_child = value # type: ignore
- if self.window.trans_dest_pron is not None:
- self.window.dest_pron_revealer.props.reveal_child = value # type: ignore
+ self.window._check_pronunciation()
def _on_preferences(self, _action, _param):
"""Show preferences window"""
@@ -198,4 +196,9 @@ def _on_quit(self, _action, _param):
def main():
# Run the Application
app = Dialect()
- return app.run(sys.argv)
+ exit_code = 0
+
+ with glib_event_loop_policy():
+ exit_code = app.run(sys.argv)
+
+ return exit_code
diff --git a/dialect/meson.build b/dialect/meson.build
index 11c7c209..79c4b6ea 100644
--- a/dialect/meson.build
+++ b/dialect/meson.build
@@ -59,6 +59,7 @@ subdir('search_provider')
# Python sources
sources = [
'__init__.py',
+ 'asyncio.py',
'languages.py',
'main.py',
'preferences.py',
diff --git a/dialect/providers/__init__.py b/dialect/providers/__init__.py
index aa8a50b3..832b5a8b 100644
--- a/dialect/providers/__init__.py
+++ b/dialect/providers/__init__.py
@@ -12,9 +12,22 @@
from dialect.providers.base import ( # noqa
BaseProvider,
ProviderCapability,
- ProviderError,
- ProviderErrorCode,
ProviderFeature,
+ Translation,
+ TranslationMistake,
+ TranslationPronunciation,
+ TranslationRequest,
+)
+from dialect.providers.errors import ( # noqa
+ APIKeyInvalid,
+ APIKeyRequired,
+ BatchSizeExceeded,
+ CharactersLimitExceeded,
+ InvalidLangCode,
+ ProviderError,
+ RequestError,
+ ServiceLimitReached,
+ UnexpectedError,
)
MODULES: dict[str, type[BaseProvider]] = {}
diff --git a/dialect/providers/base.py b/dialect/providers/base.py
index 1bcf2fa5..1b371f43 100644
--- a/dialect/providers/base.py
+++ b/dialect/providers/base.py
@@ -3,9 +3,9 @@
# SPDX-License-Identifier: GPL-3.0-or-later
import urllib.parse
-from dataclasses import dataclass
+from dataclasses import dataclass, field
from enum import Enum, Flag, auto
-from typing import IO, Callable
+from typing import IO
from dialect.define import LANG_ALIASES
from dialect.languages import get_lang_name
@@ -55,35 +55,32 @@ class ProvideLangModel(Enum):
"""
-class ProviderErrorCode(Enum):
- UNEXPECTED = auto()
- NETWORK = auto()
- EMPTY = auto()
- API_KEY_REQUIRED = auto()
- API_KEY_INVALID = auto()
- INVALID_LANG_CODE = auto()
- BATCH_SIZE_EXCEEDED = auto()
- CHARACTERS_LIMIT_EXCEEDED = auto()
- SERVICE_LIMIT_REACHED = auto()
- TRANSLATION_FAILED = auto()
- TTS_FAILED = auto()
+@dataclass
+class TranslationRequest:
+ text: str
+ src: str
+ dest: str
+
+@dataclass
+class TranslationMistake:
+ markup: str
+ text: str
-class ProviderError:
- """Helper error handing class to be passed between callbacks"""
- def __init__(self, code: ProviderErrorCode, message: str = "") -> None:
- self.code = code # Serves for quick error matching
- self.message = message # More detailed error info if needed
+@dataclass
+class TranslationPronunciation:
+ src: str | None
+ dest: str | None
@dataclass
class Translation:
text: str
- original: tuple[str, str, str]
+ original: TranslationRequest
detected: str | None = None
- mistakes: tuple[str | None, str | None] = (None, None)
- pronunciation: tuple[str | None, str | None] = (None, None)
+ mistakes: TranslationMistake | None = None
+ pronunciation: TranslationPronunciation = field(default_factory=lambda: TranslationPronunciation(None, None))
class BaseProvider:
@@ -131,119 +128,93 @@ def __init__(self):
Providers API methods
"""
- def validate_instance(self, url: str, on_done: Callable[[bool], None], on_fail: Callable[[ProviderError], None]):
+ async def validate_instance(self, url: str) -> bool:
"""
Validate an instance of the provider.
Args:
- url: The instance URL to test, only hostname and tld, e.g. libretranslate.com, localhost
- on_done: Called when the validation is done, argument is the result of the validation
- on_fail: Called when there's a fail in the validation process
+ url: The instance URL to test, only hostname and tld, e.g. ``libretranslate.com``, ``localhost``.
+
+ Returns:
+ If the URL is a valid instance of the provider ot not.
"""
raise NotImplementedError()
- def validate_api_key(self, key: str, on_done: Callable[[bool], None], on_fail: Callable[[ProviderError], None]):
+ async def validate_api_key(self, key: str) -> bool:
"""
Validate an API key.
Args:
- key: The API key to validate
- on_done: Called when the validation is done, argument is the result of the validation
- on_fail: Called when there's a fail in the validation process
- """
- raise NotImplementedError()
+ key: The API key to validate.
- def init_trans(self, on_done: Callable[[], None], on_fail: Callable[[ProviderError], None]):
+ Returns:
+ If the API key is valid or not.
"""
- Initializes the provider translation capabilities.
-
- Args:
- on_done: Called after the provider was successfully initialized
- on_fail: Called after any error on initialization
- """
- on_done()
+ raise NotImplementedError()
- def init_tts(self, on_done: Callable[[], None], on_fail: Callable[[ProviderError], None]):
- """
- Initializes the provider text-to-speech capabilities.
+ async def init_trans(self) -> None:
+ """Initializes the provider translation capabilities."""
+ raise NotImplementedError()
- Args:
- on_done: Called after the provider was successfully initialized
- on_fail: Called after any error on initialization
- """
- on_done()
+ async def init_tts(self) -> None:
+ """Initializes the provider text-to-speech capabilities."""
+ raise NotImplementedError()
- def translate(
- self,
- text: str,
- src: str,
- dest: str,
- on_done: Callable[[Translation], None],
- on_fail: Callable[[ProviderError], None],
- ):
+ async def translate(self, request: TranslationRequest) -> Translation:
"""
Translates text in the provider.
+ Providers are expected to use ``BaseProvider.denormalize_lang`` because
+ ``request`` will use normalized lang codes.
+
Args:
- text: The text to translate
- src: The lang code of the source text
- dest: The lang code to translate the text to
- on_done: Called after the text was successfully translated
- on_fail: Called after any error on translation
+ request: The translation request.
+
+ Returns:
+ A new translation object.
"""
raise NotImplementedError()
- def suggest(
- self,
- text: str,
- src: str,
- dest: str,
- suggestion: str,
- on_done: Callable[[bool], None],
- on_fail: Callable[[ProviderError], None],
- ):
+ async def suggest(self, text: str, src: str, dest: str, suggestion: str) -> bool:
"""
Sends a translation suggestion to the provider.
+ Providers are expected to use ``BaseProvider.denormalize_lang`` because
+ ``src`` and ``dest`` will use normalized lang codes.
+
Args:
- text: Original text without translation
- src: The lang code of the original text
- dest: The lang code of the translated text
- suggestion: Suggested translation for text
- on_done: Called after the suggestion was successfully send, argument means if it was accepted or rejected
- on_fail: Called after any error on the suggestion process
+ text: Original text without translation.
+ src: The lang code of the original text.
+ dest: The lang code of the translated text.
+ suggestion: Suggested translation for text.
+
+ Returns:
+ If the suggestion was successful or not.
"""
raise NotImplementedError()
- def speech(
- self,
- text: str,
- language: str,
- on_done: Callable[[IO], None],
- on_fail: Callable[[ProviderError], None],
- ):
+ async def speech(self, text: str, language: str) -> IO:
"""
- Generate speech audio from text
+ Generate speech audio from text.
+
+ Providers are expected to use ``BaseProvider.denormalize_lang`` because
+ ``language`` will use normalized lang codes.
Args:
- text: Text to generate speech from
- language: The lang code of text
- on_done: Called after the process successful
- on_fail: Called after any error on the speech process
+ text: Text to generate speech from.
+ language: The lang code of text.
+
+ Returns:
+ The file object with the speech audio written.
"""
raise NotImplementedError()
- def api_char_usage(
- self,
- on_done: Callable[[int, int], None],
- on_fail: Callable[[ProviderError], None],
- ):
+ async def api_char_usage(self) -> tuple[int, int]:
"""
- Retrieves the API usage status
+ Retrieves the API usage status.
- Args:
- on_done: Called after the process successful, with the usage and limit as args
- on_fail: Called after any error on the speech process
+ Returns:
+ The current usage and limit.
"""
raise NotImplementedError()
@@ -251,13 +222,17 @@ def cmp_langs(self, a: str, b: str) -> bool:
"""
Compare two language codes.
- It assumes that the codes have been normalized by `normalize_lang_code`.
+ It assumes that the codes have been normalized by ``BaseProvider.normalize_lang_code``
+ so providers might need to use ``BaseProvider.denormalize_lang`` on ``a`` and ``b``.
This method exists so providers can add additional comparison logic.
Args:
- a: First lang to compare
- b: Second lang to compare
+ a: First lang to compare.
+ b: Second lang to compare.
+
+ Returns:
+ Whether both languages are equals in some way or not.
"""
return a == b
@@ -265,6 +240,9 @@ def cmp_langs(self, a: str, b: str) -> bool:
def dest_langs_for(self, code: str) -> list[str]:
"""
Get the available destination languages for a source language.
+
+ Returns:
+ The codes of available languages.
"""
raise NotImplementedError()
@@ -273,25 +251,61 @@ def lang_aliases(self) -> dict[str, str]:
"""
Mapping of Dialect/CLDR's lang codes to the provider ones.
- Some providers might use different lang codes from the ones used by Dialect to for example get localized
- language names.
+ Some providers might use different lang codes from the ones used by Dialect.
- This dict is used by `add_lang` so lang codes can later be denormalized with `denormalize_lang`.
+ This dict is used by ``BaseProvider.add_lang`` so lang codes can later be denormalized with
+ ``BaseProvider.denormalize_lang``.
- Codes must be formatted with the criteria from normalize_lang_code, because this value would be used by
- `add_lang` after normalization.
+ Codes must be formatted with the criteria from ``BaseProvider.normalize_lang_code``, because this value would
+ be used by ``BaseProvider.add_lang`` after normalization.
- Check `dialect.define.LANG_ALIASES` for reference mappings.
+ Check ``dialect.define.LANG_ALIASES`` for reference mappings.
"""
return {}
+ """
+ Provider features helpers
+ """
+
+ @property
+ def supports_instances(self) -> bool:
+ return ProviderFeature.INSTANCES in self.features
+
+ @property
+ def supports_api_key(self) -> bool:
+ return ProviderFeature.API_KEY in self.features
+
+ @property
+ def api_key_required(self) -> bool:
+ return ProviderFeature.API_KEY_REQUIRED in self.features
+
+ @property
+ def supports_api_usage(self) -> bool:
+ return ProviderFeature.API_KEY_USAGE in self.features
+
+ @property
+ def supports_detection(self) -> bool:
+ return ProviderFeature.DETECTION in self.features
+
+ @property
+ def supports_mistakes(self) -> bool:
+ return ProviderFeature.MISTAKES in self.features
+
+ @property
+ def supports_pronunciation(self) -> bool:
+ return ProviderFeature.PRONUNCIATION in self.features
+
+ @property
+ def supports_suggestions(self) -> bool:
+ return ProviderFeature.SUGGESTIONS in self.features
+
"""
Provider settings helpers and properties
"""
@property
def instance_url(self) -> str:
- """Instance url saved on settings."""
+ """Instance url saved on settings"""
return self.settings.instance_url
@instance_url.setter
@@ -299,12 +313,12 @@ def instance_url(self, url: str):
self.settings.instance_url = url
def reset_instance_url(self):
- """Resets saved instance url."""
+ """Resets saved instance url"""
self.instance_url = ""
@property
def api_key(self) -> str:
- """API key saved on settings."""
+ """API key saved on settings"""
return self.settings.api_key
@api_key.setter
@@ -317,7 +331,7 @@ def reset_api_key(self):
@property
def recent_src_langs(self) -> list[str]:
- """Saved recent source langs of the user."""
+ """Saved recent source langs of the user"""
return self.settings.src_langs
@recent_src_langs.setter
@@ -330,7 +344,7 @@ def reset_src_langs(self):
@property
def recent_dest_langs(self) -> list[str]:
- """Saved recent destination langs of the user."""
+ """Saved recent destination langs of the user"""
return self.settings.dest_langs
@recent_dest_langs.setter
@@ -346,17 +360,20 @@ def reset_dest_langs(self):
"""
@staticmethod
- def format_url(url: str, path: str = "", params: dict = {}, http: bool = False):
+ def format_url(url: str, path: str = "", params: dict = {}, http: bool = False) -> str:
"""
Compose a HTTP url with the given pieces.
- If url is localhost, `http` is ignored and HTTP protocol is forced.
+ If url is "localhost", ``http`` is ignored and HTTP protocol is forced.
Args:
- url: Base url, hostname and tld
- path: Path of the url
- params: Params to populate a url query
- http: If HTTP should be used instead of HTTPS
+ url: Base url, hostname and tld.
+ path: Path of the url.
+ params: Params to populate a url query.
+ http: If HTTP should be used instead of HTTPS.
+
+ Returns:
+ The new formatted URL.
"""
if not path.startswith("/"):
@@ -372,21 +389,25 @@ def format_url(url: str, path: str = "", params: dict = {}, http: bool = False):
return protocol + url + path + params_str
- def normalize_lang_code(self, code: str):
+ def normalize_lang_code(self, code: str) -> str:
"""
- Normalice a language code to Dialect's criteria.
+ Normalice a language code with Dialect's criteria.
- Criteria:
- - Codes must be lowercase, e.g. ES => es
- - Codes can have a second code delimited by a hyphen, e.g. zh_CN => zh-CN
- - If second code is two chars long it's considered a country code and must be uppercase, e.g. zh-cn => zh-CN
- - If second code is four chars long it's considered a script code and must be capitalized,
- e.g. zh-HANS => zh-Hans
+ This method also maps to lang codes aliases using ``BaseProvider.lang_aliases`` and
+ ``dialect.define.LANG_ALIASES``.
- This method also maps lang codes aliases using `lang_aliases` and `dialect.define.LANG_ALIASES`.
+ Criteria:
+ - Codes must be lowercase, e.g. ES => es
+ - Codes can have a second code delimited by a hyphen, e.g. zh_CN => zh-CN
+ - If second code is two chars long it's considered a country code and must be uppercase, e.g. zh-cn => zh-CN
+ - If second code is four chars long it's considered a script code and must be capitalized,
+ e.g. zh-HANS => zh-Hans
Args:
- code: Language ISO code
+ code: Language ISO code.
+
+ Returns:
+ The normalize language code.
"""
code = code.replace("_", "-").lower() # Normalize separator
codes = code.split("-")
@@ -413,11 +434,12 @@ def add_lang(
trans_src: bool = True,
trans_dest: bool = True,
tts: bool = False,
- ):
+ ) -> None:
"""
- Add lang supported by provider after normalization.
+ Register lang supported by the provider.
- Normalized lang codes are saved for latter denormalization using `denormalize_lang`.
+ Lang codes are normalized and saved for latter denormalization using
+ ``BaseProvider.denormalize_lang``.
Args:
original_code: Lang code to add
@@ -441,21 +463,19 @@ def add_lang(
self._nonstandard_langs[code] = original_code
if name is not None and code not in self._languages_names:
- # Save name provider by the service
+ # Save name provided by the service
self._languages_names[code] = name
- def denormalize_lang(self, *codes: str) -> str | tuple[str, ...]:
+ def denormalize_lang(self, *codes: str) -> tuple[str, ...]:
"""
Get denormalized lang code if available.
- This method will return a tuple with the same length of given codes or a str if only one code was passed.
-
Args:
- *codes: Lang codes to denormalize
- """
+ *codes: Lang codes to denormalize
- if len(codes) == 1:
- return self._nonstandard_langs.get(codes[0], codes[0])
+ Returns:
+ The same amount of given codes but denormalized.
+ """
result = []
for code in codes:
@@ -466,10 +486,14 @@ def get_lang_name(self, code: str) -> str | None:
"""
Get a localized language name.
- Fallback to a name provided by the provider if available or ultimately just the code.
+ Fallback to a name provided by the provider if available or ultimately
+ just the code.
Args:
code: Language to get a name for
+
+ Returns:
+ The language name.
"""
name = get_lang_name(code) # Try getting translated name from Dialect
diff --git a/dialect/providers/errors.py b/dialect/providers/errors.py
new file mode 100644
index 00000000..4d12b5d6
--- /dev/null
+++ b/dialect/providers/errors.py
@@ -0,0 +1,34 @@
+class RequestError(Exception):
+ """Exception raised when request fails."""
+
+
+class ProviderError(Exception):
+ """Exception raised when provider fails."""
+
+
+class UnexpectedError(ProviderError):
+ """Exception raised when provider fails."""
+
+
+class APIKeyRequired(ProviderError):
+ """Exception raised when provider fails."""
+
+
+class APIKeyInvalid(ProviderError):
+ """Exception raised when provider fails."""
+
+
+class InvalidLangCode(ProviderError):
+ """Exception raised when provider fails."""
+
+
+class BatchSizeExceeded(ProviderError):
+ """Exception raised when provider fails."""
+
+
+class CharactersLimitExceeded(ProviderError):
+ """Exception raised when provider fails."""
+
+
+class ServiceLimitReached(ProviderError):
+ """Exception raised when provider fails."""
diff --git a/dialect/providers/local.py b/dialect/providers/local.py
index 50d88044..c38e9e61 100644
--- a/dialect/providers/local.py
+++ b/dialect/providers/local.py
@@ -2,8 +2,9 @@
# Copyright 2023 Rafael Mardojai CM
# SPDX-License-Identifier: GPL-3.0-or-later
-import threading
-from typing import Callable
+import asyncio
+import concurrent.futures
+from typing import Callable, TypeVar
from dialect.providers.base import BaseProvider
@@ -11,12 +12,18 @@
class LocalProvider(BaseProvider):
"""Base class for providers needing local threaded helpers"""
- def launch_thread(self, worker: Callable, *args):
+ _T = TypeVar("_T")
+
+ async def run_async(self, worker: Callable[..., _T], *args) -> _T:
"""
- Launches a thread using Python's threading.
+ Runs worker in a ThreadPoolExecutor.
Args:
- worker: Function to execute on the thread
- *args: Args for the worker
+ worker: Function to execute on the thread.
+ *args: Args for the worker function.
"""
- threading.Thread(target=worker, args=args, daemon=True).start()
+
+ loop = asyncio.get_running_loop()
+
+ with concurrent.futures.ThreadPoolExecutor() as pool:
+ return await loop.run_in_executor(pool, worker, *args)
diff --git a/dialect/providers/modules/bing.py b/dialect/providers/modules/bing.py
index 0e3bb4d8..e8dfe431 100644
--- a/dialect/providers/modules/bing.py
+++ b/dialect/providers/modules/bing.py
@@ -1,18 +1,12 @@
# Copyright 2023 Rafael Mardojai CM
# SPDX-License-Identifier: GPL-3.0-or-later
-import logging
import re
from bs4 import BeautifulSoup, Tag
-from dialect.providers.base import (
- ProviderCapability,
- ProviderError,
- ProviderErrorCode,
- ProviderFeature,
- Translation,
-)
+from dialect.providers.base import ProviderCapability, ProviderFeature, Translation, TranslationPronunciation
+from dialect.providers.errors import ProviderError, UnexpectedError
from dialect.providers.soup import SoupProvider
@@ -58,77 +52,45 @@ def translate_url(self):
}
return self.format_url("www.bing.com", "/ttranslatev3", params)
- def init_trans(self, on_done, on_fail):
- def on_response(data):
- if data:
- try:
- soup = BeautifulSoup(data, "html.parser")
+ async def init_trans(self):
+ response = await self.get(self.html_url, self._headers, check_common=False, json=False)
- # Get Langs
- langs = soup.find("optgroup", {"id": "t_tgtAllLang"})
- if isinstance(langs, Tag):
- for child in langs.findChildren():
- if child.name == "option":
- self.add_lang(child["value"], child.contents[0])
-
- # Get IID
- iid = soup.find("div", {"id": "rich_tta"})
- if isinstance(iid, Tag):
- self._iid = iid["data-iid"]
-
- # Decode response bytes
- data = data.decode("utf-8")
-
- # Look for abuse prevention data
- params = re.findall(r"var params_AbusePreventionHelper = \[(.*?)\];", data)[0] # noqa
- abuse_params = params.replace('"', "").split(",")
- self._key = abuse_params[0]
- self._token = abuse_params[1]
-
- # Look for IG
- self._ig = re.findall('IG:"(.*?)",', data)[0]
-
- on_done()
-
- except Exception as exc:
- error = "Failed parsing HTML from bing.com"
- logging.warning(error, exc)
- on_fail(ProviderError(ProviderErrorCode.NETWORK, error))
+ if response:
+ try:
+ soup = BeautifulSoup(response, "html.parser")
- else:
- on_fail(ProviderError(ProviderErrorCode.EMPTY, "Could not get HTML from bing.com"))
+ # Get Langs
+ langs = soup.find("optgroup", {"id": "t_tgtAllLang"})
+ if isinstance(langs, Tag):
+ for child in langs.findChildren():
+ if child.name == "option":
+ self.add_lang(child["value"], child.contents[0])
- # Message request to get bing's website html
- message = self.create_message("GET", self.html_url, headers=self._headers)
+ # Get IID
+ iid = soup.find("div", {"id": "rich_tta"})
+ if isinstance(iid, Tag):
+ self._iid = iid["data-iid"]
- # Do async request
- self.send_and_read_and_process_response(message, on_response, on_fail, False, False)
+ # Decode response bytes
+ text = response.decode("utf-8")
- def translate(self, text, src, dest, on_done, on_fail):
- def on_response(data):
- try:
- data = data[0]
- detected = None
- pronunciation = None
+ # Look for abuse prevention data
+ params = re.findall(r"var params_AbusePreventionHelper = \[(.*?)\];", text)[0] # noqa
+ abuse_params = params.replace('"', "").split(",")
+ self._key = abuse_params[0]
+ self._token = abuse_params[1]
- if "translations" in data:
- if "detectedLanguage" in data:
- detected = data["detectedLanguage"]["language"]
+ # Look for IG
+ self._ig = re.findall('IG:"(.*?)",', text)[0]
- if "transliteration" in data["translations"][0]:
- pronunciation = data["translations"][0]["transliteration"]["text"]
+ except Exception as exc:
+ raise UnexpectedError("Failed parsing HTML from bing.com") from exc
- translation = Translation(
- data["translations"][0]["text"],
- (text, src, dest),
- detected=detected,
- pronunciation=(None, pronunciation),
- )
- on_done(translation)
+ else:
+ raise UnexpectedError("Could not get HTML from bing.com")
- except Exception as exc:
- logging.warning(exc)
- on_fail(ProviderError(ProviderErrorCode.TRANSLATION_FAILED, str(exc)))
+ async def translate(self, request):
+ src, dest = self.denormalize_lang(request.src, request.dest)
# Increment requests count
self._count += 1
@@ -136,19 +98,40 @@ def on_response(data):
# Form data
data = {
"fromLang": "auto-detect" if src == "auto" else src,
- "text": text,
+ "text": request.text,
"to": dest,
"token": self._token,
"key": self._key,
}
- # Request message
- message = self.create_message("POST", self.translate_url, data, self._headers, True)
- # Do async request
- self.send_and_read_and_process_response(message, on_response, on_fail)
+
+ # Do request
+ response = await self.post(self.translate_url, data, self._headers, True)
+
+ try:
+ data = response[0]
+ detected = None
+ pronunciation = None
+
+ if "translations" in data:
+ if "detectedLanguage" in data:
+ detected = data["detectedLanguage"]["language"]
+
+ if "transliteration" in data["translations"][0]:
+ pronunciation = data["translations"][0]["transliteration"]["text"]
+
+ return Translation(
+ data["translations"][0]["text"],
+ request,
+ detected=detected,
+ pronunciation=TranslationPronunciation(None, pronunciation),
+ )
+
+ except Exception as exc:
+ raise UnexpectedError from exc
def check_known_errors(self, _status, data):
if not data:
- return ProviderError(ProviderErrorCode.EMPTY, "Response is empty!")
+ raise UnexpectedError("Response is empty!")
if "errorMessage" in data:
error = data["errorMessage"]
@@ -156,6 +139,6 @@ def check_known_errors(self, _status, data):
match code:
case _:
- return ProviderError(ProviderErrorCode.UNEXPECTED, error)
+ raise ProviderError(error)
return None
diff --git a/dialect/providers/modules/deepl.py b/dialect/providers/modules/deepl.py
index 8e66c96d..7f55bc07 100644
--- a/dialect/providers/modules/deepl.py
+++ b/dialect/providers/modules/deepl.py
@@ -2,15 +2,8 @@
# Copyright 2024 Rafael Mardojai CM
# SPDX-License-Identifier: GPL-3.0-or-later
-import logging
-
-from dialect.providers.base import (
- ProviderCapability,
- ProviderError,
- ProviderErrorCode,
- ProviderFeature,
- Translation,
-)
+from dialect.providers.base import ProviderCapability, ProviderFeature, Translation
+from dialect.providers.errors import APIKeyInvalid, APIKeyRequired, ServiceLimitReached, UnexpectedError
from dialect.providers.soup import SoupProvider
API_V = "v2"
@@ -69,102 +62,64 @@ def usage_url(self):
def headers(self):
return {"Authorization": f"DeepL-Auth-Key {self.api_key}"}
- def init_trans(self, on_done, on_fail):
- def check_finished():
- self._init_count -= 1
+ async def init_trans(self):
+ # Get languages
+ src_langs = await self.get(self.source_lang_url, headers=self.headers)
+ dest_langs = await self.get(self.target_lang_url, headers=self.headers)
- if self._init_count == 0:
- if self._init_error:
- on_fail(self._init_error)
- else:
- on_done()
-
- def on_failed(error):
- self._init_error = error
- check_finished()
-
- def on_languages_response(data, type_):
- try:
- trans_src = type_ == "src"
- trans_dest = type_ == "dest"
-
- for lang in data:
- self.add_lang(lang["language"], lang["name"], trans_src=trans_src, trans_dest=trans_dest)
-
- check_finished()
-
- except Exception as exc:
- print(type_)
- logging.warning(exc)
- on_failed(ProviderError(ProviderErrorCode.UNEXPECTED, str(exc)))
-
- # Keep state of multiple request
- self._init_count = 2
- self._init_error = None
-
- # Request messages
- src_langs_message = self.create_message("GET", self.source_lang_url, headers=self.headers)
- dest_langs_message = self.create_message("GET", self.target_lang_url, headers=self.headers)
-
- # Do async requests
- self.send_and_read_and_process_response(src_langs_message, lambda d: on_languages_response(d, "src"), on_failed)
- self.send_and_read_and_process_response(
- dest_langs_message, lambda d: on_languages_response(d, "dest"), on_failed
- )
-
- def validate_api_key(self, key, on_done, on_fail):
- def on_response(_data):
- on_done(True)
+ if src_langs and dest_langs and isinstance(src_langs, list) and isinstance(dest_langs, list):
+ for lang in src_langs:
+ self.add_lang(lang["language"], lang["name"], trans_dest=False)
+ for lang in src_langs:
+ self.add_lang(lang["language"], lang["name"], trans_src=False)
+ async def validate_api_key(self, key):
api_url = self.__get_api_url(key)
url = self.format_url(api_url, f"/{API_V}/languages", {"type": "source"})
- # Headers
headers = {"Authorization": f"DeepL-Auth-Key {key}"}
- # Request messages
- languages_message = self.create_message("GET", url, headers=headers)
- # Do async requests
- self.send_and_read_and_process_response(languages_message, on_response, on_fail)
-
- def translate(self, text, src, dest, on_done, on_fail):
- def on_response(data):
- try:
- translations = data.get("translations")
- detected = translations[0].get("detected_source_language")
- translation = Translation(translations[0]["text"], (text, src, dest), detected)
- on_done(translation)
- except Exception as exc:
- logging.warning(exc)
- on_fail(ProviderError(ProviderErrorCode.TRANSLATION_FAILED, str(exc)))
+ try:
+ await self.get(url, headers=headers)
+ return True
+ except (APIKeyInvalid, APIKeyRequired):
+ return False
+ except Exception:
+ raise
+
+ async def translate(self, request):
+ src, dest = self.denormalize_lang(request.src, request.dest)
# Request body
data = {
- "text": [text],
+ "text": [request.text],
"target_lang": dest,
}
if src != "auto":
data["source_lang"] = src
- # Request message
- message = self.create_message("POST", self.translate_url, data, self.headers)
- # Do async request
- self.send_and_read_and_process_response(message, on_response, on_fail)
+ response = await self.post(self.translate_url, data, self.headers)
+
+ # Read translation
+ if response and isinstance(response, dict):
+ translations: list[dict[str, str]] | None = response.get("translations")
+ if translations:
+ detected = translations[0].get("detected_source_language")
+ translation = Translation(translations[0]["text"], request, detected)
+ return translation
+
+ raise UnexpectedError
- def api_char_usage(self, on_done, on_fail):
- def on_response(data):
- try:
- usage = data.get("character_count")
- limit = data.get("character_limit")
- on_done(usage, limit)
+ async def api_char_usage(self):
+ response = await self.get(self.usage_url, headers=self.headers)
- except Exception as exc:
- logging.warning(exc)
- on_fail(ProviderError(ProviderErrorCode.UNEXPECTED, str(exc)))
+ try:
+ usage = response.get("character_count")
+ limit = response.get("character_limit")
- # Request message
- message = self.create_message("GET", self.usage_url, headers=self.headers)
- # Do async request
- self.send_and_read_and_process_response(message, on_response, on_fail)
+ return usage, limit
+
+ except Exception as exc:
+ raise UnexpectedError from exc
def cmp_langs(self, a, b):
# Early return if both langs are just the same
@@ -186,10 +141,13 @@ def check_known_errors(self, status, data):
match status:
case 403:
if not self.api_key:
- return ProviderError(ProviderErrorCode.API_KEY_REQUIRED, message)
- return ProviderError(ProviderErrorCode.API_KEY_INVALID, message)
+ raise APIKeyRequired(message)
+ raise APIKeyInvalid(message)
case 456:
- return ProviderError(ProviderErrorCode.SERVICE_LIMIT_REACHED, message)
+ raise ServiceLimitReached(message)
if status != 200:
- return ProviderError(ProviderErrorCode.UNEXPECTED, message)
+ raise UnexpectedError(message)
+
+ if not data:
+ raise UnexpectedError
diff --git a/dialect/providers/modules/google.py b/dialect/providers/modules/google.py
index 1f489c7b..ebe09648 100644
--- a/dialect/providers/modules/google.py
+++ b/dialect/providers/modules/google.py
@@ -4,7 +4,6 @@
import html
import json
-import logging
import random
import re
from tempfile import NamedTemporaryFile
@@ -13,11 +12,12 @@
from dialect.providers.base import (
ProviderCapability,
- ProviderError,
- ProviderErrorCode,
ProviderFeature,
Translation,
+ TranslationMistake,
+ TranslationPronunciation,
)
+from dialect.providers.errors import UnexpectedError
from dialect.providers.local import LocalProvider
from dialect.providers.soup import SoupProvider
@@ -250,7 +250,7 @@ def __init__(self, **kwargs):
self.chars_limit = 2000
- def init_trans(self, on_done, on_fail):
+ async def init_trans(self):
languages = [
"af",
"sq",
@@ -365,14 +365,10 @@ def init_trans(self, on_done, on_fail):
for code in languages:
self.add_lang(code)
- on_done()
-
- def init_tts(self, on_done, on_fail):
+ async def init_tts(self):
for code in lang.tts_langs().keys():
self.add_lang(code, trans_src=False, trans_dest=False, tts=True)
- on_done()
-
@staticmethod
def _build_rpc_request(text: str, src: str, dest: str):
return json.dumps(
@@ -408,153 +404,144 @@ def translate_url(self):
return self.format_url(url, params=params)
- def translate(self, text, src_lang, dest_lang, on_done, on_fail):
- def on_response(data):
- try:
- token_found = False
- square_bracket_counts = [0, 0]
- resp = ""
- data = data.decode("utf-8")
-
- for line in data.split("\n"):
- token_found = token_found or f'"{RPC_ID}"' in line[:30]
- if not token_found:
- continue
-
- is_in_string = False
- for index, char in enumerate(line):
- if char == '"' and line[max(0, index - 1)] != "\\":
- is_in_string = not is_in_string
- if not is_in_string:
- if char == "[":
- square_bracket_counts[0] += 1
- elif char == "]":
- square_bracket_counts[1] += 1
-
- resp += line
- if square_bracket_counts[0] == square_bracket_counts[1]:
- break
-
- data = json.loads(resp)
- parsed = json.loads(data[0][2])
- translated_parts = None
- translated = None
- try:
- translated_parts = list(
- map(
- lambda part: TranslatedPart(
- part[0] if len(part) > 0 else "", part[1] if len(part) >= 2 else []
- ),
- parsed[1][0][0][5],
- )
- )
- except TypeError:
- translated_parts = [TranslatedPart(parsed[1][0][1][0], [parsed[1][0][0][0], parsed[1][0][1][0]])]
-
- first_iter = True
- translated = ""
- for part in translated_parts:
- if not part.text.isspace() and not first_iter:
- translated += " "
- if first_iter:
- first_iter = False
- translated += part.text
-
- src = None
- try:
- src = parsed[1][-1][1]
- except (IndexError, TypeError):
- pass
-
- if not src == src_lang:
- on_fail(ProviderError(ProviderErrorCode.TRANSLATION_FAILED, "source language mismatch"))
- return
-
- if src == "auto":
- try:
- if parsed[0][2] in self.src_languages:
- src = parsed[0][2]
- except (IndexError, TypeError):
- pass
+ async def translate(self, request):
+ src_lang, dest_lang = self.denormalize_lang(request.src, request.dest)
- dest = None
- try:
- dest = parsed[1][-1][2]
- except (IndexError, TypeError):
- pass
-
- if not dest == dest_lang:
- on_fail(ProviderError(ProviderErrorCode.TRANSLATION_FAILED, "destination language mismatch"))
- return
+ # Form data
+ data = {
+ "f.req": self._build_rpc_request(request.text, src_lang, dest_lang),
+ }
- origin_pronunciation = None
- try:
- origin_pronunciation = parsed[0][0]
- except (IndexError, TypeError):
- pass
+ # Do request
+ response = await self.post(self.translate_url, data, self._headers, True, False, False)
+
+ try:
+ token_found = False
+ square_bracket_counts = [0, 0]
+ resp = ""
+ data = response.decode("utf-8")
+
+ for line in data.split("\n"):
+ token_found = token_found or f'"{RPC_ID}"' in line[:30]
+ if not token_found:
+ continue
+
+ is_in_string = False
+ for index, char in enumerate(line):
+ if char == '"' and line[max(0, index - 1)] != "\\":
+ is_in_string = not is_in_string
+ if not is_in_string:
+ if char == "[":
+ square_bracket_counts[0] += 1
+ elif char == "]":
+ square_bracket_counts[1] += 1
+
+ resp += line
+ if square_bracket_counts[0] == square_bracket_counts[1]:
+ break
+
+ data = json.loads(resp)
+ parsed = json.loads(data[0][2])
+ translated_parts = None
+ translated = None
+ try:
+ translated_parts = list(
+ map(
+ lambda part: TranslatedPart(
+ part[0] if len(part) > 0 else "", part[1] if len(part) >= 2 else []
+ ),
+ parsed[1][0][0][5],
+ )
+ )
+ except TypeError:
+ translated_parts = [TranslatedPart(parsed[1][0][1][0], [parsed[1][0][0][0], parsed[1][0][1][0]])]
+
+ first_iter = True
+ translated = ""
+ for part in translated_parts:
+ if not part.text.isspace() and not first_iter:
+ translated += " "
+ if first_iter:
+ first_iter = False
+ translated += part.text
+
+ src = None
+ try:
+ src = parsed[1][-1][1]
+ except (IndexError, TypeError):
+ pass
- pronunciation = None
- try:
- pronunciation = parsed[1][0][0][1]
- except (IndexError, TypeError):
- pass
+ if not src == src_lang:
+ raise UnexpectedError("source language mismatch")
- mistake = None
+ if src == "auto":
try:
- mistake = parsed[0][1][0][0][1]
- # Convert to pango markup
- mistake = mistake.replace("", "").replace("", "")
+ if parsed[0][2] in self.src_languages:
+ src = parsed[0][2]
except (IndexError, TypeError):
pass
- result = Translation(
- translated,
- (text, src_lang, dest_lang),
- src,
- (mistake, self._strip_html_tags(mistake)),
- (origin_pronunciation, pronunciation),
- )
- on_done(result)
-
- except Exception as exc:
- logging.warning(exc)
- on_fail(ProviderError(ProviderErrorCode.TRANSLATION_FAILED, str(exc)))
+ dest = None
+ try:
+ dest = parsed[1][-1][2]
+ except (IndexError, TypeError):
+ pass
- # Form data
- data = {
- "f.req": self._build_rpc_request(text, src_lang, dest_lang),
- }
+ if not dest == dest_lang:
+ raise UnexpectedError("destination language mismatch")
- # Request message
- message = self.create_message("POST", self.translate_url, data, self._headers, True)
+ origin_pronunciation = None
+ try:
+ origin_pronunciation = parsed[0][0]
+ except (IndexError, TypeError):
+ pass
- # Do async request
- self.send_and_read_and_process_response(message, on_response, on_fail, False, False)
+ pronunciation = None
+ try:
+ pronunciation = parsed[1][0][0][1]
+ except (IndexError, TypeError):
+ pass
- def _strip_html_tags(self, text):
+ mistake = None
+ try:
+ mistake = parsed[0][1][0][0][1]
+ # Convert to pango markup
+ mistake = mistake.replace("", "").replace("", "")
+ except (IndexError, TypeError):
+ pass
+
+ return Translation(
+ translated,
+ request,
+ src,
+ TranslationMistake(mistake, self._strip_html_tags(mistake)) if mistake else None,
+ TranslationPronunciation(origin_pronunciation, pronunciation),
+ )
+
+ except Exception as exc:
+ raise UnexpectedError from exc
+
+ def _strip_html_tags(self, text: str):
"""Strip html tags"""
- if text is None:
- return None
-
tags_re = re.compile(r"(|<[^>]*>)")
tags_removed = tags_re.sub("", text)
escaped = html.escape(tags_removed)
return escaped
- def speech(self, text, language, on_done, on_fail):
- def work():
+ async def speech(self, text, language):
+ def get_speech():
try:
file = NamedTemporaryFile()
- tts = gTTS(text, lang=language, lang_check=False)
+ (lang,) = self.denormalize_lang(language)
+ tts = gTTS(text, lang=lang, lang_check=False)
tts.write_to_fp(file)
file.seek(0)
- on_done(file)
- except Exception as exc:
- logging.warning(exc)
- on_fail(ProviderError(ProviderErrorCode.TTS_FAILED, str(exc)))
+ return file
+ except Exception:
+ raise
- self.launch_thread(work)
+ return await self.run_async(get_speech)
class TranslatedPart:
diff --git a/dialect/providers/modules/libretrans.py b/dialect/providers/modules/libretrans.py
index 05f5f64d..a9435343 100644
--- a/dialect/providers/modules/libretrans.py
+++ b/dialect/providers/modules/libretrans.py
@@ -2,15 +2,19 @@
# Copyright 2021 Rafael Mardojai CM
# SPDX-License-Identifier: GPL-3.0-or-later
-import logging
-
from dialect.providers.base import (
ProviderCapability,
- ProviderError,
- ProviderErrorCode,
ProviderFeature,
Translation,
)
+from dialect.providers.errors import (
+ APIKeyInvalid,
+ APIKeyRequired,
+ BatchSizeExceeded,
+ CharactersLimitExceeded,
+ InvalidLangCode,
+ UnexpectedError,
+)
from dialect.providers.soup import SoupProvider
@@ -33,22 +37,6 @@ def __init__(self, **kwargs):
self.chars_limit = 0
- def validate_instance(self, url, on_done, on_fail):
- def on_response(data):
- valid = False
-
- try:
- valid = data["info"]["title"] == "LibreTranslate"
- except: # noqa
- pass
-
- on_done(valid)
-
- # Message request to LT API spec endpoint
- message = self.create_message("GET", self.format_url(url, "/spec"))
- # Do async request
- self.send_and_read_and_process_response(message, on_response, on_fail, False)
-
@property
def frontend_settings_url(self):
return self.format_url(self.instance_url, "/frontend/settings")
@@ -69,104 +57,74 @@ def suggest_url(self):
def translate_url(self):
return self.format_url(self.instance_url, "/translate")
- def init_trans(self, on_done, on_fail):
- def check_finished():
- self._init_count -= 1
-
- if self._init_count == 0:
- if self._init_error:
- on_fail(self._init_error)
- else:
- on_done()
-
- def on_failed(error: ProviderError):
- self._init_error = error
- check_finished()
-
- def on_languages_response(data):
- try:
- for lang in data:
- self.add_lang(lang["code"], lang["name"])
-
- check_finished()
-
- except Exception as exc:
- logging.warning(exc)
- on_failed(ProviderError(ProviderErrorCode.UNEXPECTED, str(exc)))
-
- def on_settings_response(data):
- try:
- if data.get("suggestions", False):
- self.features ^= ProviderFeature.SUGGESTIONS
- if data.get("apiKeys", False):
- self.features ^= ProviderFeature.API_KEY
- if data.get("keyRequired", False):
- self.features ^= ProviderFeature.API_KEY_REQUIRED
-
- self.chars_limit = int(data.get("charLimit", 0))
+ async def validate_instance(self, url):
+ response = await self.get(self.format_url(url, "/spec"), check_common=False)
+ valid = False
- check_finished()
+ try:
+ valid = response["info"]["title"] == "LibreTranslate"
+ except: # noqa
+ pass
- except Exception as exc:
- logging.warning(exc)
- on_failed(ProviderError(ProviderErrorCode.UNEXPECTED, str(exc)))
-
- # Keep state of multiple request
- self._init_count = 2
- self._init_error: ProviderError | None = None
-
- # Request messages
- languages_message = self.create_message("GET", self.lang_url)
- settings_message = self.create_message("GET", self.frontend_settings_url)
-
- # Do async requests
- self.send_and_read_and_process_response(languages_message, on_languages_response, on_failed)
- self.send_and_read_and_process_response(settings_message, on_settings_response, on_failed)
-
- def validate_api_key(self, key, on_done, on_fail):
- def on_response(data):
- valid = False
- try:
- valid = "confidence" in data[0]
- except: # noqa
- pass
-
- on_done(valid)
+ return valid
+ async def validate_api_key(self, key):
# Form data
data = {
"q": "hello",
"api_key": key,
}
- # Request message
- message = self.create_message("POST", self.detect_url, data, form=True)
- # Do async request
- self.send_and_read_and_process_response(message, on_response, on_fail)
+ try:
+ response = await self.post(self.detect_url, data, form=True)
+ return "confidence" in response[0]
+ except (APIKeyInvalid, APIKeyRequired):
+ return False
+ except Exception:
+ raise
+
+ async def init_trans(self):
+ languages = await self.get(self.lang_url)
+ settings = await self.get(self.frontend_settings_url)
- def translate(self, text, src, dest, on_done, on_fail):
- def on_response(data):
- detected = data.get("detectedLanguage", {}).get("language", None)
- translation = Translation(data["translatedText"], (text, src, dest), detected)
- on_done(translation)
+ try:
+ for lang in languages:
+ self.add_lang(lang["code"], lang["name"])
+
+ if settings.get("suggestions", False):
+ self.features ^= ProviderFeature.SUGGESTIONS
+ if settings.get("apiKeys", False):
+ self.features ^= ProviderFeature.API_KEY
+ if settings.get("keyRequired", False):
+ self.features ^= ProviderFeature.API_KEY_REQUIRED
+
+ self.chars_limit = int(settings.get("charLimit", 0))
+
+ except Exception as exc:
+ raise UnexpectedError from exc
+
+ async def translate(self, request):
+ src, dest = self.denormalize_lang(request.src, request.dest)
# Request body
data = {
- "q": text,
+ "q": request.text,
"source": src,
"target": dest,
}
if self.api_key and ProviderFeature.API_KEY in self.features:
data["api_key"] = self.api_key
- # Request message
- message = self.create_message("POST", self.translate_url, data)
- # Do async request
- self.send_and_read_and_process_response(message, on_response, on_fail)
+ # Do request
+ response = await self.post(self.translate_url, data)
+ try:
+ detected = response.get("detectedLanguage", {}).get("language", None)
+ return Translation(response["translatedText"], request, detected)
+ except Exception as exc:
+ raise UnexpectedError from exc
- def suggest(self, text, src, dest, suggestion, on_done, on_fail):
- def on_response(data):
- on_done(data.get("success", False))
+ async def suggest(self, text, src, dest, suggestion):
+ src, dest = self.denormalize_lang(src, dest)
# Form data
data = {
@@ -178,28 +136,29 @@ def on_response(data):
if self.api_key and ProviderFeature.API_KEY in self.features:
data["api_key"] = self.api_key
- # Request message
- message = self.create_message("POST", self.suggest_url, data, form=True)
- # Do async request
- self.send_and_read_and_process_response(message, on_response, on_fail)
+ # Do request
+ response = await self.post(self.suggest_url, data, form=True)
+ try:
+ return response.get("success", False)
+ except: # noqa
+ return False
def check_known_errors(self, _status, data):
if not data:
- return ProviderError(ProviderErrorCode.EMPTY, "Response is empty!")
+ raise UnexpectedError("Response is empty!")
+
if "error" in data:
error = data["error"]
if error == "Please contact the server operator to obtain an API key":
- return ProviderError(ProviderErrorCode.API_KEY_REQUIRED, error)
+ raise APIKeyRequired(error)
elif error == "Invalid API key":
- return ProviderError(ProviderErrorCode.API_KEY_INVALID, error)
+ raise APIKeyInvalid(error)
elif "is not supported" in error:
- return ProviderError(ProviderErrorCode.INVALID_LANG_CODE, error)
+ raise InvalidLangCode(error)
elif "exceeds text limit" in error:
- return ProviderError(ProviderErrorCode.BATCH_SIZE_EXCEEDED, error)
+ raise BatchSizeExceeded(error)
elif "exceeds character limit" in error:
- return ProviderError(ProviderErrorCode.CHARACTERS_LIMIT_EXCEEDED, error)
- elif "Cannot translate text" in error or "format is not supported" in error:
- return ProviderError(ProviderErrorCode.TRANSLATION_FAILED, error)
+ raise CharactersLimitExceeded(error)
else:
- return ProviderError(ProviderErrorCode.UNEXPECTED, error)
+ raise UnexpectedError(error)
diff --git a/dialect/providers/modules/lingva.py b/dialect/providers/modules/lingva.py
index e87bbeb0..aaa33909 100644
--- a/dialect/providers/modules/lingva.py
+++ b/dialect/providers/modules/lingva.py
@@ -2,17 +2,17 @@
# Copyright 2021 Rafael Mardojai CM
# SPDX-License-Identifier: GPL-3.0-or-later
-import logging
from tempfile import NamedTemporaryFile
from urllib.parse import quote
from dialect.providers.base import (
ProviderCapability,
- ProviderError,
- ProviderErrorCode,
ProviderFeature,
Translation,
+ TranslationMistake,
+ TranslationPronunciation,
)
+from dialect.providers.errors import InvalidLangCode, UnexpectedError
from dialect.providers.soup import SoupProvider
@@ -37,21 +37,6 @@ def __init__(self, **kwargs):
self.chars_limit = 5000
- def validate_instance(self, url, on_done, on_fail):
- def on_response(data):
- valid = False
- try:
- valid = "translation" in data
- except: # noqa
- pass
-
- on_done(valid)
-
- # Lingva translation endpoint
- message = self.create_message("GET", self.format_url(url, "/api/v1/en/es/hello"))
- # Do async request
- self.send_and_read_and_process_response(message, on_response, on_fail, False)
-
@property
def lang_url(self):
return self.format_url(self.instance_url, "/api/v1/languages/")
@@ -64,89 +49,87 @@ def translate_url(self):
def speech_url(self):
return self.format_url(self.instance_url, "/api/v1/audio/{lang}/{text}")
- def init(self, on_done, on_fail):
- def on_response(data):
- if "languages" in data:
- for lang in data["languages"]:
+ async def validate_instance(self, url):
+ request = await self.get(self.format_url(url, "/api/v1/en/es/hello"), check_common=False)
+
+ valid = False
+ try:
+ valid = "translation" in request
+ except: # noqa
+ pass
+
+ return valid
+
+ async def init(self) -> None:
+ response = await self.get(self.lang_url)
+
+ try:
+ if "languages" in response:
+ for lang in response["languages"]:
if lang["code"] != "auto":
self.add_lang(lang["code"], lang["name"], tts=True)
- on_done()
else:
- on_fail(ProviderError(ProviderErrorCode.UNEXPECTED, "No langs found in server."))
-
- # Languages message request
- message = self.create_message("GET", self.lang_url)
- # Do async request
- self.send_and_read_and_process_response(message, on_response, on_fail)
-
- def init_trans(self, on_done, on_fail):
- self.init(on_done, on_fail)
-
- def init_tts(self, on_done, on_fail):
- self.init(on_done, on_fail)
-
- def translate(self, text, src, dest, on_done, on_fail):
- def on_response(data):
- try:
- detected = data.get("info", {}).get("detectedSource", None)
- mistakes = data.get("info", {}).get("typo", None)
- src_pronunciation = data.get("info", {}).get("pronunciation", {}).get("query", None)
- dest_pronunciation = data.get("info", {}).get("pronunciation", {}).get("translation", None)
-
- translation = Translation(
- data["translation"],
- (text, src, dest),
- detected,
- (mistakes, mistakes),
- (src_pronunciation, dest_pronunciation),
- )
-
- on_done(translation)
-
- except Exception as exc:
- error = "Failed reading the translation data"
- logging.warning(error, exc)
- on_fail(ProviderError(ProviderErrorCode.TRANSLATION_FAILED, error))
+ raise UnexpectedError("No langs found in server.")
+ except Exception as exc:
+ raise UnexpectedError from exc
- # Format url query data
- text = quote(text, safe="")
- url = self.translate_url.format(text=text, src=src, dest=dest)
+ async def init_trans(self):
+ await self.init()
- # Request message
- message = self.create_message("GET", url)
+ async def init_tts(self):
+ await self.init()
- # Do async request
- self.send_and_read_and_process_response(message, on_response, on_fail)
-
- def speech(self, text, language, on_done, on_fail):
- def on_response(data):
- if "audio" in data:
- file = NamedTemporaryFile()
- audio = bytearray(data["audio"])
- file.write(audio)
- file.seek(0)
-
- on_done(file)
- else:
- on_fail(ProviderError(ProviderErrorCode.TTS_FAILED, "No audio was found."))
+ async def translate(self, request):
+ src, dest = self.denormalize_lang(request.src, request.dest)
+ # Format url query data
+ text = quote(request.text, safe="")
+ url = self.translate_url.format(text=text, src=src, dest=dest)
+ # Do request
+ response = await self.get(url)
+ try:
+ detected = response.get("info", {}).get("detectedSource", None)
+ mistakes = response.get("info", {}).get("typo", None)
+ src_pronunciation = response.get("info", {}).get("pronunciation", {}).get("query", None)
+ dest_pronunciation = response.get("info", {}).get("pronunciation", {}).get("translation", None)
+
+ return Translation(
+ response["translation"],
+ request,
+ detected,
+ TranslationMistake(mistakes, mistakes) if mistakes else None,
+ TranslationPronunciation(src_pronunciation, dest_pronunciation),
+ )
+
+ except Exception as exc:
+ raise UnexpectedError("Failed reading the translation data") from exc
+
+ async def speech(self, text, language):
+ (language,) = self.denormalize_lang(language)
# Format url query data
url = self.speech_url.format(text=text, lang=language)
-
- # Request message
- message = self.create_message("GET", url)
-
- # Do async request
- self.send_and_read_and_process_response(message, on_response, on_fail)
+ # Do request
+ response = await self.get(url)
+
+ try:
+ file = NamedTemporaryFile()
+ audio = bytearray(response["audio"])
+ file.write(audio)
+ file.seek(0)
+ return file
+ except Exception as exc:
+ file.close()
+ raise UnexpectedError from exc
def check_known_errors(self, _status, data):
"""Raises a proper Exception if an error is found in the data."""
if not data:
- return ProviderError(ProviderErrorCode.EMPTY, "Response is empty!")
+ raise UnexpectedError("Response is empty!")
+
if "error" in data:
error = data["error"]
if error == "Invalid target language" or error == "Invalid source language":
- return ProviderError(ProviderErrorCode.INVALID_LANG_CODE, error)
+ raise InvalidLangCode(error)
else:
- return ProviderError(ProviderErrorCode.UNEXPECTED, error)
+ raise UnexpectedError(error)
diff --git a/dialect/providers/modules/yandex.py b/dialect/providers/modules/yandex.py
index c0805778..bb2388f2 100644
--- a/dialect/providers/modules/yandex.py
+++ b/dialect/providers/modules/yandex.py
@@ -1,16 +1,14 @@
# Copyright 2023 Rafael Mardojai CM
# SPDX-License-Identifier: GPL-3.0-or-later
-import logging
from uuid import uuid4
from dialect.providers.base import (
ProviderCapability,
- ProviderError,
- ProviderErrorCode,
ProviderFeature,
Translation,
)
+from dialect.providers.errors import ProviderError, UnexpectedError
from dialect.providers.soup import SoupProvider
@@ -39,7 +37,7 @@ def __init__(self, **kwargs):
self._uuid = str(uuid4()).replace("-", "")
- def init_trans(self, on_done, on_fail):
+ async def init_trans(self):
languages = [
"af",
"sq",
@@ -145,43 +143,28 @@ def init_trans(self, on_done, on_fail):
for code in languages:
self.add_lang(code)
- on_done()
-
@property
def translate_url(self):
path = f"/api/v1/tr.json/translate?id={self._uuid}-0-0&srv=android"
return self.format_url("translate.yandex.net", path)
- def translate(self, text, src, dest, on_done, on_fail):
- def on_response(data):
- try:
- detected = None
- if "code" in data and data["code"] == 200:
- if "lang" in data:
- detected = data["lang"].split("-")[0]
-
- if "text" in data:
- translation = Translation(data["text"][0], (text, src, dest), detected)
- on_done(translation)
-
- else:
- on_fail(ProviderError(ProviderErrorCode.TRANSLATION_FAILED, "Translation failed"))
-
- else:
- error = data["message"] if "message" in data else ""
- on_fail(ProviderError(ProviderErrorCode.TRANSLATION_FAILED, error))
- except Exception as exc:
- error = "Failed reading the translation data"
- logging.warning(error, exc)
- on_fail(ProviderError(ProviderErrorCode.TRANSLATION_FAILED, error))
-
+ async def translate(self, request):
+ src, dest = self.denormalize_lang(request.src, request.dest)
# Form data
- data = {"lang": dest, "text": text}
+ data = {"lang": dest, "text": request.text}
if src != "auto":
data["lang"] = f"{src}-{dest}"
- # Request message
- message = self.create_message("POST", self.translate_url, data, self._headers, True)
-
- # Do async request
- self.send_and_read_and_process_response(message, on_response, on_fail)
+ # Do request
+ response = await self.post(self.translate_url, data, self._headers, True)
+ try:
+ detected = None
+ if "code" in response and response["code"] == 200:
+ if "lang" in response:
+ detected = response["lang"].split("-")[0]
+ return Translation(response["text"][0], request, detected)
+ else:
+ error = response["message"] if "message" in response else ""
+ raise ProviderError(error)
+ except Exception as exc:
+ raise UnexpectedError from exc
diff --git a/dialect/providers/settings.py b/dialect/providers/settings.py
index 27cd6ac8..2ff3475e 100644
--- a/dialect/providers/settings.py
+++ b/dialect/providers/settings.py
@@ -36,6 +36,7 @@ def __init__(self, name: str, defaults: ProviderDefaults):
self.name = name
self.defaults = defaults # set of per-provider defaults
self._secret_attr = {"provider": name}
+ self._api_key: str | None = None
@property
def instance_url(self) -> str:
@@ -50,6 +51,9 @@ def instance_url(self, url: str):
def api_key(self) -> str:
"""API key."""
+ if self._api_key:
+ return self._api_key
+
# Check if we have an old API KEY in GSettings for migration
if gsettings := self.get_string("api-key"):
self.api_key = gsettings # Save to secret
@@ -57,7 +61,10 @@ def api_key(self) -> str:
return gsettings
try:
- return Secret.password_lookup_sync(SECRETS_SCHEMA, self._secret_attr, None) or self.defaults["api_key"]
+ self._api_key = (
+ Secret.password_lookup_sync(SECRETS_SCHEMA, self._secret_attr, None) or self.defaults["api_key"]
+ )
+ return self._api_key
except GLib.Error as exc:
logging.warning(exc)
@@ -75,7 +82,9 @@ def api_key(self, api_key: str):
api_key,
None,
)
+ self._api_key = api_key
else: # Remove secret
+ self._api_key = None
Secret.password_clear_sync(SECRETS_SCHEMA, self._secret_attr, None)
# Fake change in api-key setting
diff --git a/dialect/providers/soup.py b/dialect/providers/soup.py
index acbfba03..68e53403 100644
--- a/dialect/providers/soup.py
+++ b/dialect/providers/soup.py
@@ -4,11 +4,12 @@
import json
import logging
-from typing import Callable
+from typing import Any
-from gi.repository import Gio, GLib, Soup
+from gi.repository import GLib, Soup
-from dialect.providers.base import BaseProvider, ProviderError, ProviderErrorCode
+from dialect.providers.base import BaseProvider
+from dialect.providers.errors import RequestError
from dialect.session import Session
@@ -18,12 +19,15 @@ class SoupProvider(BaseProvider):
def __init__(self, **kwargs):
super().__init__(**kwargs)
- def encode_data(self, data) -> GLib.Bytes | None:
+ def encode_data(self, data: Any) -> GLib.Bytes | None:
"""
Convert Python data to JSON and bytes.
Args:
- data: Data to encode, anything json.dumps can handle
+ data: Data to encode, anything json.dumps can handle.
+
+ Returns:
+ The GLib Bytes or None if something failed.
"""
data_glib_bytes = None
try:
@@ -33,19 +37,23 @@ def encode_data(self, data) -> GLib.Bytes | None:
logging.warning(exc)
return data_glib_bytes
- def create_message(self, method: str, url: str, data={}, headers: dict = {}, form: bool = False) -> Soup.Message:
+ def create_message(
+ self, method: str, url: str, data: Any = {}, headers: dict = {}, form: bool = False
+ ) -> Soup.Message:
"""
- Create a libsoup's message.
+ Create a Soup's message.
Encodes data and adds it to the message as the request body.
- If form is true, data is encoded as application/x-www-form-urlencoded.
Args:
- method: HTTP method of the message
- url: Url of the message
- data: Request body or form data
- headers: HTTP headers of the message
- form: If the data should be encoded as a form
+ method: HTTP method of the message.
+ url: Url of the message.
+ data: Request body or form data.
+ headers: HTTP headers of the message.
+ form: If the data should be encoded as ``application/x-www-form-urlencoded``.
+
+ Returns:
+ The Soup Message for the given parameters.
"""
if form and data:
@@ -66,119 +74,149 @@ def create_message(self, method: str, url: str, data={}, headers: dict = {}, for
return message # type: ignore
- def send_and_read(self, message: Soup.Message, callback: Callable[[Session, Gio.AsyncResult], None]):
+ async def send_and_read(self, message: Soup.Message) -> bytes | None:
"""
- Helper method for libsoup's send_and_read_async.
-
- Useful when priority and cancellable is not needed.
+ Helper method for Soup's send_and_read_async.
Args:
- message: Message to send
- callback: Callback called from send_and_read_async to finish request
- """
- Session.get().send_and_read_async(message, 0, None, callback)
+ message: Message to send.
- def read_data(self, data: bytes | None) -> dict:
+ Returns:
+ The bytes of the response or None.
"""
- Get JSON data from bytes.
+ response: GLib.Bytes = await Session.get().send_and_read_async(message, 0) # type: ignore
+ return response.get_data()
- Args:
- data: Bytes to read
- """
- return json.loads(data) if data else {}
-
- def read_response(self, session: Session, result: Gio.AsyncResult) -> dict:
+ async def send_and_read_json(self, message: Soup.Message) -> Any:
"""
- Get JSON data from session result.
-
- Finishes request from send_and_read_async and gets body dict.
+ Like ``SoupProvider.send_and_read`` but returns JSON parsed.
Args:
- session: Session where the request wa sent
- result: Result of send_and_read_async callback
+ message: Message to send.
+
+ Returns:
+ The JSON of the response deserialized to a python object.
"""
- response = session.get_response(session, result)
- return self.read_data(response)
+ response = await self.send_and_read(message)
+ return json.loads(response) if response else {}
- def check_known_errors(self, status: Soup.Status, data: dict | bytes | None) -> None | ProviderError:
+ def check_known_errors(self, status: Soup.Status, data: Any) -> None:
"""
- Checks data for possible response errors and return a found error if any.
+ Checks data for possible response errors and raises appropriated exceptions.
This should be implemented by subclases.
Args:
- data: Response body data
+ status: HTTP status.
+ data: Response body data.
"""
- return None
- def process_response(
+ async def send_and_read_and_process(
self,
- session: Session,
- result: Gio.AsyncResult,
message: Soup.Message,
- on_continue: Callable[[dict | bytes | None], None],
- on_fail: Callable[[ProviderError], None],
check_common: bool = True,
json: bool = True,
- ):
+ ) -> Any:
"""
- Helper method for the most common workflow for processing soup responses.
-
- Checks for soup errors, then checks for common errors on data and calls on_fail
- if any, otherwise calls on_continue where the provider will finish the process.
+ Helper mixing ``SoupProvider.send_and_read``, ``SoupProvider.send_and_read_json``
+ and ``SoupProvider.check_known_errors``.
- If json is false check_common is ignored and the data isn't processed as JSON and bites are passed to
- on_continue.
+ Converts `GLib.Error` to `RequestError`.
Args:
- session: Session where the request wa sent
- result: Result of send_and_read_async callback
- message: The message that was sent
- on_continue: Called after data was got successfully
- on_fail: Called after any error on request or in check_known_errors
- check_common: If response data should be checked for errors using check_known_errors
- json: If data should be processed as JSON using read_response
+ message: Message to send.
+ check_common: If response data should be checked for errors using check_known_errors.
+ json: If data should be processed as JSON.
+
+ Returns:
+ The JSON deserialized to a python object or bytes if ``json`` is ``False``.
"""
try:
if json:
- data = self.read_response(session, result)
+ response = await self.send_and_read_json(message)
else:
- data = Session.get_response(session, result)
+ response = await self.send_and_read(message)
if check_common:
- error = self.check_known_errors(message.get_status(), data)
- if error:
- on_fail(error)
- return
+ self.check_known_errors(message.get_status(), response)
- on_continue(data)
+ return response
+ except GLib.Error as exc:
+ raise RequestError(exc.message)
- except Exception as exc:
- logging.warning(exc)
- on_fail(ProviderError(ProviderErrorCode.NETWORK, str(exc)))
+ async def request(
+ self,
+ method: str,
+ url: str,
+ data: Any = {},
+ headers: dict = {},
+ form: bool = False,
+ check_common: bool = True,
+ json: bool = True,
+ ) -> Any:
+ """
+ Helper for regular HTTP request.
+
+ Args:
+ method: HTTP method of the request.
+ url: Url of the request.
+ data: Request body or form data.
+ headers: HTTP headers of the message.
+ form: If the data should be encoded as a form.
+ check_common: If response data should be checked for errors using check_known_errors.
+ json: If data should be processed as JSON.
- def send_and_read_and_process_response(
+ Returns:
+ The JSON deserialized to a python object or bytes if ``json`` is ``False``.
+ """
+ message = self.create_message(method, url, data, headers, form)
+ return await self.send_and_read_and_process(message, check_common, json)
+
+ async def get(
self,
- message: Soup.Message,
- on_continue: Callable[[dict | bytes | None], None],
- on_fail: Callable[[ProviderError], None],
+ url: str,
+ headers: dict = {},
+ form: bool = False,
check_common: bool = True,
json: bool = True,
- ):
+ ) -> Any:
"""
- Helper packaging send_and_read and process_response.
+ Helper for GET HTTP request.
- Avoids providers having to deal with many callbacks.
+ Args:
+ url: Url of the request.
+ headers: HTTP headers of the message.
+ form: If the data should be encoded as a form.
+ check_common: If response data should be checked for errors using check_known_errors.
+ json: If data should be processed as JSON.
- message: Message to send
- on_continue: Called after data was got successfully
- on_fail: Called after any error on request or in check_known_errors
- check_common: If response data should be checked for errors using check_known_errors
- json: If data should be processed as JSON using read_response
+ Returns:
+ The JSON deserialized to a python object or bytes if ``json`` is ``False``.
"""
+ return await self.request("GET", url, headers=headers, form=form, check_common=check_common, json=json)
- def on_response(session: Session, result: Gio.AsyncResult):
- self.process_response(session, result, message, on_continue, on_fail, check_common, json)
+ async def post(
+ self,
+ url: str,
+ data: Any = {},
+ headers: dict = {},
+ form: bool = False,
+ check_common: bool = True,
+ json: bool = True,
+ ) -> Any:
+ """
+ Helper for POST HTTP request.
- self.send_and_read(message, on_response)
+ Args:
+ url: Url of the request.
+ data: Request body or form data.
+ headers: HTTP headers of the message.
+ form: If the data should be encoded as a form.
+ check_common: If response data should be checked for errors using check_known_errors.
+ json: If data should be processed as JSON.
+
+ Returns:
+ The JSON deserialized to a python object or bytes if ``json`` is ``False``.
+ """
+ return await self.request("POST", url, data, headers, form, check_common, json)
diff --git a/dialect/search_provider/search_provider.in b/dialect/search_provider/search_provider.in
index 27320e03..84db8786 100755
--- a/dialect/search_provider/search_provider.in
+++ b/dialect/search_provider/search_provider.in
@@ -9,7 +9,9 @@
import gettext
import locale
import sys
-from typing import Callable
+import inspect
+import logging
+from typing import Any, Callable, Coroutine
import gi
@@ -17,8 +19,15 @@ gi.require_version("Secret", "1")
gi.require_version("Soup", "3.0")
from gi.repository import Gio, GLib
-from dialect.providers import TRANSLATORS
-from dialect.providers.base import ProviderErrorCode
+from dialect.asyncio import create_background_task, glib_event_loop_policy
+from dialect.providers import (
+ TRANSLATORS,
+ TranslationRequest,
+ ProviderError,
+ RequestError,
+ APIKeyInvalid,
+ APIKeyRequired,
+)
from dialect.settings import Settings
CLIPBOARD_PREFIX = "copy-to-clipboard"
@@ -77,17 +86,12 @@ class TranslateServiceApplication(Gio.Application):
self.search_interface = Gio.DBusNodeInfo.new_for_xml(dbus_interface_description).interfaces[0]
self.loaded = False
- self.load_failed = False
-
- # Translations store
- self.translations = {}
+ self.translations = {} # Translations store
self.src_language = "auto"
self.dest_language = None
# Translator
- self._load_translator()
- Settings.get().connect("changed", self._on_settings_changed)
- Settings.get().connect("translator-changed", self._on_translator_changed)
+ Settings.get().connect("provider-changed::translator", self._on_translator_changed)
def do_dbus_register(self, connection, object_path):
try:
@@ -112,7 +116,8 @@ class TranslateServiceApplication(Gio.Application):
parameters: GLib.Variant,
invocation: Gio.DBusMethodInvocation,
):
- def return_value(results):
+
+ def wrap_results(results: Any) -> GLib.Variant:
results = (results,)
if results == (None,):
results = ()
@@ -126,78 +131,82 @@ class TranslateServiceApplication(Gio.Application):
)
+ ")"
)
- wrapped_results = GLib.Variant(results_type, results)
- invocation.return_value(wrapped_results)
+ return GLib.Variant(results_type, results)
+
+ async def return_async_value(method: Callable[..., Coroutine], *args):
+ results = wrap_results(await method(*args))
+ self.release()
+ invocation.return_value(results)
method = getattr(self, method_name)
- arguments = list(parameters.unpack())
- arguments.append(return_value)
+ args = list(parameters.unpack())
- method(*arguments)
+ if inspect.iscoroutinefunction(method): # Async methods
+ create_background_task(return_async_value(method, *args))
+ self.hold()
+ else: # Sync methods
+ results = wrap_results(method(*args))
+ invocation.return_value(results)
@property
def live_enabled(self) -> bool:
return Settings.get().live_translation and Settings.get().sp_translation
- def GetInitialResultSet(self, terms: list[str], callback: Callable[[list[str]], None]):
+ async def GetInitialResultSet(self, terms: list[str]) -> list[str]:
"""
Join separate terms in one ID line, start translation and send this line back
on start of input
"""
-
- def on_done(translation):
- self.translations[text] = translation.text
- callback([text, CLIPBOARD_PREFIX + text])
- self.release()
-
- def on_fail(error):
- match error.code:
- case ProviderErrorCode.NETWORK:
- self.translations[error_id] = _("Translation failed, check for network issues")
- case ProviderErrorCode.API_KEY_INVALID:
- self.translations[error_id] = _("The provided API key is invalid")
- case ProviderErrorCode.API_KEY_REQUIRED:
- self.translations[error_id] = _("API key is required to use the service")
- case _:
- self.translations[error_id] = _("Translation failed")
- callback([error_id])
- self.release()
-
text = " ".join(terms)
if self.live_enabled:
error_id = ERROR_PREFIX + text
- if self.load_failed:
- self.translations[error_id] = _("Failed loading the translation service")
- callback([error_id])
- elif not self.loaded:
- return self.GetInitialResultSet(terms, callback)
+ # Load the translator if needed
+ # TODO: Verify API key when needed
+ if not self.loaded:
+ try:
+ await self._load_translator()
+ except Exception:
+ self.translations[error_id] = _("Failed loading the translation service")
+ return [error_id]
# If the two languages are the same, nothing is done
if self.dest_language and self.src_language != self.dest_language and text != "":
src, dest = self.translator.denormalize_lang(self.src_language, self.dest_language)
- self.translator.translate(text, src, dest, on_done, on_fail)
- self.hold()
-
+ request = TranslationRequest(text, src, dest)
+
+ try:
+ translation = await self.translator.translate(request)
+ self.translations[text] = translation.text
+ return [text, CLIPBOARD_PREFIX + text]
+ except (RequestError, ProviderError) as exc:
+ logging.error(exc)
+
+ if isinstance(exc, RequestError):
+ self.translations[error_id] = _("Translation failed, check for network issues")
+ elif isinstance(exc, APIKeyInvalid):
+ self.translations[error_id] = _("The provided API key is invalid")
+ elif isinstance(exc, APIKeyRequired):
+ self.translations[error_id] = _("API key is required to use the service")
+ else:
+ self.translations[error_id] = _("Translation failed")
+
+ return [error_id]
+ else:
+ return []
else:
- provider = Settings.get().active_translator
-
- callback(
- [
- _("Translate “{text}” with {provider_name}").format(
- text=text, provider_name=TRANSLATORS[provider].prettyname
- )
- ]
- )
+ return [
+ _("Translate “{text}” with {provider_name}").format(
+ text=text, provider_name=TRANSLATORS[Settings.get().active_translator].prettyname
+ )
+ ]
- def GetSubsearchResultSet(
- self, _previous_results: list[str], new_terms: list[str], callback: Callable[[list[str]], None]
- ):
- self.GetInitialResultSet(new_terms, callback)
+ async def GetSubsearchResultSet(self, _previous_results: list[str], new_terms: list[str]) -> list[str]:
+ return await self.GetInitialResultSet(new_terms)
- def GetResultMetas(self, ids: list[str], callback: Callable[[list[dict[str, GLib.Variant]]], None]):
+ def GetResultMetas(self, ids: list[str]) -> list[dict[str, GLib.Variant]]:
"""Send translated text"""
translate_id = ids[0]
@@ -207,14 +216,12 @@ class TranslateServiceApplication(Gio.Application):
if translate_id in self.translations:
text = self.translations[translate_id]
- callback(
- [
- {
- "id": GLib.Variant("s", translate_id),
- "name": GLib.Variant("s", text),
- }
- ]
- )
+ return [
+ {
+ "id": GLib.Variant("s", translate_id),
+ "name": GLib.Variant("s", text),
+ }
+ ]
elif len(ids) == 2 and translate_id in self.translations and ids[1] == CLIPBOARD_PREFIX + ids[0]:
text = self.translations[translate_id]
@@ -227,80 +234,73 @@ class TranslateServiceApplication(Gio.Application):
self.translations.clear()
- callback(
- [
- {
- "id": GLib.Variant("s", translate_id),
- "name": GLib.Variant("s", text),
- "description": GLib.Variant("s", description),
- },
- {
- "id": GLib.Variant("s", ids[1]),
- "name": GLib.Variant("s", _("Copy")),
- "description": GLib.Variant("s", _("Copy translation to clipboard")),
- "clipboardText": GLib.Variant("s", text),
- },
- ]
- )
+ return [
+ {
+ "id": GLib.Variant("s", translate_id),
+ "name": GLib.Variant("s", text),
+ "description": GLib.Variant("s", description),
+ },
+ {
+ "id": GLib.Variant("s", ids[1]),
+ "name": GLib.Variant("s", _("Copy")),
+ "description": GLib.Variant("s", _("Copy translation to clipboard")),
+ "clipboardText": GLib.Variant("s", text),
+ },
+ ]
else:
# Probably never needed, just in case
- callback(
- [
- dict(
- id=GLib.Variant("s", id),
- name=GLib.Variant("s", id),
- )
- for id in ids
- ]
- )
+ return [
+ dict(
+ id=GLib.Variant("s", id),
+ name=GLib.Variant("s", id),
+ )
+ for id in ids
+ ]
- def ActivateResult(self, result_id: str, terms: list[str], timestamp: int, callback: Callable):
+ def ActivateResult(self, result_id: str, terms: list[str], timestamp: int):
if not result_id.startswith(CLIPBOARD_PREFIX):
- self.LaunchSearch(terms, timestamp, callback)
- else:
- callback((None,))
+ self.LaunchSearch(terms, timestamp)
- def LaunchSearch(self, terms: list[str], _timestamp: int, callback: Callable):
+ def LaunchSearch(self, terms: list[str], _timestamp: int):
text = " ".join(terms)
GLib.spawn_async_with_pipes(None, ["@BIN@", "--text", text], None, GLib.SpawnFlags.SEARCH_PATH, None)
- callback((None,))
+ async def _load_translator(self):
+ if self.loaded:
+ return
+
+ self.translator = TRANSLATORS[Settings.get().active_translator]()
+
+ # Init translator
+ try:
+ await self.translator.init_trans()
- def _load_translator(self):
- def on_done():
self.loaded = True
- self.load_failed = False
self.dest_language = self.translator.recent_dest_langs[0]
-
self.translator.settings.connect("changed", self._on_translator_settings_changed)
- def on_fail(_error):
- self.loaded = False
- self.load_failed = True
+ except Exception:
self.dest_language = None
-
- self.loaded = False
- provider = Settings.get().active_translator
- self.translator = TRANSLATORS[provider]()
-
- # Init translator
- self.translator.init_trans(on_done, on_fail)
-
- def _on_settings_changed(self, _settings, key: str):
- if key.startswith("translator-"):
- self._load_translator()
+ raise
def _on_translator_changed(self, *args):
- self._load_translator()
+ self.loaded = False
def _on_translator_settings_changed(self, _settings, key: str):
if key == "src-langs" or key == "dest-langs":
self.dest_language = self.translator.recent_dest_langs[0]
else:
- self._load_translator()
+ self.loaded = False
-if __name__ == "__main__":
+def main():
app = TranslateServiceApplication()
- sys.exit(app.run(None))
+ exit_code = 0
+ with glib_event_loop_policy():
+ exit_code = app.run(None)
+ return exit_code
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/dialect/settings.py b/dialect/settings.py
index b2cea08c..08cf6586 100644
--- a/dialect/settings.py
+++ b/dialect/settings.py
@@ -43,11 +43,8 @@ def get() -> Settings:
Settings.instance = Settings.new()
return Settings.instance
- @GObject.Signal(arg_types=(str,))
- def translator_changed(self, _name: str): ...
-
- @GObject.Signal(arg_types=(str,))
- def tts_changed(self, _name: str): ...
+ @GObject.Signal(flags=GObject.SignalFlags.DETAILED, arg_types=(str, str))
+ def provider_changed(self, name: str): ...
@property
def translators_list(self) -> list[str]:
@@ -70,7 +67,7 @@ def active_translator(self) -> str:
@active_translator.setter
def active_translator(self, translator: str):
self._translators.set_string("active", translator)
- self.emit("translator-changed", translator)
+ self.emit("provider-changed::translator", "translator", translator)
@property
def window_size(self) -> tuple[int, int]:
@@ -143,7 +140,7 @@ def active_tts(self) -> str:
def active_tts(self, tts: str):
"""Set the user's preferred TTS service."""
self._tts.set_string("active", tts)
- self.emit("tts-changed", tts)
+ self.emit("provider-changed::tts", "tts", tts)
@property
def color_scheme(self) -> str:
diff --git a/dialect/widgets/provider_preferences.py b/dialect/widgets/provider_preferences.py
index e498b886..3baaacab 100644
--- a/dialect/widgets/provider_preferences.py
+++ b/dialect/widgets/provider_preferences.py
@@ -4,13 +4,15 @@
from __future__ import annotations
+import logging
import re
import typing
from gi.repository import Adw, GObject, Gtk
+from dialect.asyncio import create_background_task
from dialect.define import RES_PATH
-from dialect.providers import ProviderCapability, ProviderFeature
+from dialect.providers import ProviderCapability, RequestError
if typing.TYPE_CHECKING:
from dialect.window import DialectWindow
@@ -64,50 +66,37 @@ def __init__(self, scope: str, dialog: Adw.PreferencesDialog, window: DialectWin
self.window.connect("notify::translator-loading", self._on_translator_loading)
def _check_settings(self):
- def on_usage(usage, limit):
+ if not self.provider:
+ return
+
+ self.instance_entry.props.visible = self.provider.supports_instances
+ self.api_key_entry.props.visible = self.provider.supports_api_key
+
+ self.api_usage_group.props.visible = False
+ if self.provider.supports_api_usage:
+ create_background_task(self._load_api_usage())
+
+ async def _load_api_usage(self):
+ if not self.provider:
+ return
+
+ try:
+ usage, limit = await self.provider.api_char_usage()
level = usage / limit
label = _("{usage:n} of {limit:n} characters").format(usage=usage, limit=limit)
self.api_usage.props.value = level
self.api_usage_label.props.label = label
self.api_usage_group.props.visible = True
-
- def on_usage_fail(_error):
- pass
-
- if self.provider:
- self.instance_entry.props.visible = ProviderFeature.INSTANCES in self.provider.features
- self.api_key_entry.props.visible = ProviderFeature.API_KEY in self.provider.features
-
- self.api_usage_group.props.visible = False
- if ProviderFeature.API_KEY_USAGE in self.provider.features:
- self.provider.api_char_usage(on_usage, on_usage_fail)
+ except Exception as exc:
+ logging.error(exc)
@Gtk.Template.Callback()
def _on_instance_apply(self, _row):
"""Called on self.instance_entry::apply signal"""
+ create_background_task(self._instance_apply())
- def on_done(valid):
- if not self.provider:
- return
-
- if valid:
- self.provider.instance_url = self.new_instance_url
- self.provider.reset_src_langs()
- self.provider.reset_dest_langs()
- self.instance_entry.remove_css_class("error")
- self.instance_entry.props.text = self.provider.instance_url
- else:
- self.instance_entry.add_css_class("error")
- error_text = _("Not a valid {provider} instance")
- error_text = error_text.format(provider=self.provider.prettyname)
- toast = Adw.Toast(title=error_text)
- self.dialog.add_toast(toast)
-
- self.instance_entry.props.sensitive = True
- self.api_key_entry.props.sensitive = True
- self.instance_stack.props.visible_child_name = "reset"
-
+ async def _instance_apply(self):
if not self.provider:
return
@@ -124,8 +113,27 @@ def on_done(valid):
self.api_key_entry.props.sensitive = False
self.instance_stack.props.visible_child_name = "spinner"
- # TODO: Use on_fail to notify network error
- self.provider.validate_instance(self.new_instance_url, on_done, lambda _: on_done(False))
+ try:
+ if await self.provider.validate_instance(self.new_instance_url):
+ self.provider.instance_url = self.new_instance_url
+ self.provider.reset_src_langs()
+ self.provider.reset_dest_langs()
+ self.instance_entry.remove_css_class("error")
+ self.instance_entry.props.text = self.provider.instance_url
+ else:
+ self.instance_entry.add_css_class("error")
+ error_text = _("Not a valid {provider} instance")
+ error_text = error_text.format(provider=self.provider.prettyname)
+ toast = Adw.Toast(title=error_text)
+ self.dialog.add_toast(toast)
+ except RequestError as exc:
+ logging.error(exc)
+ toast = Adw.Toast(title=_("Failed validating instance, check for network issues"))
+ self.dialog.add_toast(toast)
+ finally:
+ self.instance_entry.props.sensitive = True
+ self.api_key_entry.props.sensitive = True
+ self.instance_stack.props.visible_child_name = "reset"
else:
self.instance_entry.remove_css_class("error")
@@ -154,26 +162,9 @@ def _on_reset_instance(self, _button):
@Gtk.Template.Callback()
def _on_api_key_apply(self, _row):
"""Called on self.api_key_entry::apply signal"""
+ create_background_task(self._api_key_apply())
- def on_done(valid):
- if not self.provider:
- return
-
- if valid:
- self.provider.api_key = self.new_api_key
- self.api_key_entry.remove_css_class("error")
- self.api_key_entry.props.text = self.provider.api_key
- else:
- self.api_key_entry.add_css_class("error")
- error_text = _("Not a valid {provider} API key")
- error_text = error_text.format(provider=self.provider.prettyname)
- toast = Adw.Toast(title=error_text)
- self.dialog.add_toast(toast)
-
- self.instance_entry.props.sensitive = True
- self.api_key_entry.props.sensitive = True
- self.api_key_stack.props.visible_child_name = "reset"
-
+ async def _api_key_apply(self):
if not self.provider:
return
@@ -187,8 +178,25 @@ def on_done(valid):
self.api_key_entry.props.sensitive = False
self.api_key_stack.props.visible_child_name = "spinner"
- # TODO: Use on_fail to notify network error
- self.provider.validate_api_key(self.new_api_key, on_done, lambda _: on_done(False))
+ try:
+ if await self.provider.validate_api_key(self.new_api_key):
+ self.provider.api_key = self.new_api_key
+ self.api_key_entry.remove_css_class("error")
+ self.api_key_entry.props.text = self.provider.api_key
+ else:
+ self.api_key_entry.add_css_class("error")
+ error_text = _("Not a valid {provider} API key")
+ error_text = error_text.format(provider=self.provider.prettyname)
+ toast = Adw.Toast(title=error_text)
+ self.dialog.add_toast(toast)
+ except RequestError as exc:
+ logging.error(exc)
+ toast = Adw.Toast(title=_("Failed validating API key, check for network issues"))
+ self.dialog.add_toast(toast)
+ finally:
+ self.instance_entry.props.sensitive = True
+ self.api_key_entry.props.sensitive = True
+ self.api_key_stack.props.visible_child_name = "reset"
else:
self.api_key_entry.remove_css_class("error")
diff --git a/dialect/window.blp b/dialect/window.blp
index 20129dec..d7512178 100644
--- a/dialect/window.blp
+++ b/dialect/window.blp
@@ -90,7 +90,7 @@ template $DialectWindow : Adw.ApplicationWindow {
Button {
label: _("Retry");
- clicked => $retry_load_translator();
+ clicked => $_on_retry_load_translator_clicked();
styles [
"pill",
@@ -113,7 +113,7 @@ template $DialectWindow : Adw.ApplicationWindow {
orientation: vertical;
Button error_retry_btn {
- clicked => $retry_load_translator();
+ clicked => $_on_retry_load_translator_clicked();
styles [
"pill",
@@ -165,7 +165,7 @@ template $DialectWindow : Adw.ApplicationWindow {
Button rmv_key_btn {
visible: false;
label: _("Remove Key and Retry");
- clicked => $remove_key_and_reload();
+ clicked => $_on_remove_key_and_reload_clicked();
styles [
"pill",
@@ -189,7 +189,7 @@ template $DialectWindow : Adw.ApplicationWindow {
Button error_api_key_btn {
visible: false;
- clicked => $remove_key_and_reload();
+ clicked => $_on_remove_key_and_reload_clicked();
styles [
"pill",
@@ -240,7 +240,7 @@ template $DialectWindow : Adw.ApplicationWindow {
$LangSelector src_lang_selector {
notify::selected => $_on_src_lang_changed();
- user-selection-changed => $translation();
+ user-selection-changed => $_on_translation();
tooltip-text: _("Change Source Language");
}
@@ -252,7 +252,7 @@ template $DialectWindow : Adw.ApplicationWindow {
$LangSelector dest_lang_selector {
notify::selected => $_on_dest_lang_changed();
- user-selection-changed => $translation();
+ user-selection-changed => $_on_translation();
tooltip-text: _("Change Destination Language");
}
};
@@ -565,7 +565,7 @@ template $DialectWindow : Adw.ApplicationWindow {
selected: bind src_lang_selector.selected bidirectional;
tooltip-text: _("Change Source Language");
- user-selection-changed => $translation();
+ user-selection-changed => $_on_translation();
}
[center]
@@ -581,7 +581,7 @@ template $DialectWindow : Adw.ApplicationWindow {
selected: bind dest_lang_selector.selected bidirectional;
tooltip-text: _("Change Destination Language");
- user-selection-changed => $translation();
+ user-selection-changed => $_on_translation();
}
}
}
diff --git a/dialect/window.py b/dialect/window.py
index b3dbedde..09b59120 100644
--- a/dialect/window.py
+++ b/dialect/window.py
@@ -5,20 +5,24 @@
# SPDX-License-Identifier: GPL-3.0-or-later
import logging
-from typing import IO, Literal, TypedDict
+from typing import Literal, TypedDict
from gi.repository import Adw, Gdk, Gio, GLib, GObject, Gst, Gtk
+from dialect.asyncio import background_task
from dialect.define import APP_ID, PROFILE, RES_PATH, TRANS_NUMBER
from dialect.languages import LanguagesListModel
from dialect.providers import (
TRANSLATORS,
TTS,
+ APIKeyInvalid,
+ APIKeyRequired,
+ BaseProvider,
ProviderError,
- ProviderErrorCode,
- ProviderFeature,
+ RequestError,
+ Translation,
+ TranslationRequest,
)
-from dialect.providers.base import BaseProvider, Translation
from dialect.settings import Settings
from dialect.shortcuts import DialectShortcutsWindow
from dialect.utils import find_item_match, first_exclude
@@ -107,14 +111,11 @@ class DialectWindow(Adw.ApplicationWindow):
current_history = 0 # for history management
# Translation-related variables
- next_trans = {} # for ongoing translation
- ongoing_trans = False # for ongoing translation
- trans_mistakes: tuple[str | None, str | None] = (None, None) # "mistakes" suggestions
- # Pronunciations
- trans_src_pron = None
- trans_dest_pron = None
+ next_translation: TranslationRequest | None = None # for ongoing translation
+ translation_loading = False # for ongoing translation
+
# Suggestions
- before_suggest = None
+ before_suggest: str | None = None
def __init__(self, **kwargs):
super().__init__(**kwargs)
@@ -136,74 +137,74 @@ def __init__(self, **kwargs):
def setup_actions(self):
back = Gio.SimpleAction(name="back")
back.props.enabled = False
- back.connect("activate", self.ui_return)
+ back.connect("activate", self._on_back_action)
self.add_action(back)
forward_action = Gio.SimpleAction(name="forward")
forward_action.props.enabled = False
- forward_action.connect("activate", self.ui_forward)
+ forward_action.connect("activate", self._on_forward_action)
self.add_action(forward_action)
switch_action = Gio.SimpleAction(name="switch")
- switch_action.connect("activate", self.ui_switch)
+ switch_action.connect("activate", self._on_switch_action)
self.add_action(switch_action)
from_action = Gio.SimpleAction(name="from")
- from_action.connect("activate", self.ui_from)
+ from_action.connect("activate", self._on_from_action)
self.add_action(from_action)
to_action = Gio.SimpleAction(name="to")
- to_action.connect("activate", self.ui_to)
+ to_action.connect("activate", self._on_to_action)
self.add_action(to_action)
clear_action = Gio.SimpleAction(name="clear")
clear_action.props.enabled = False
- clear_action.connect("activate", self.ui_clear)
+ clear_action.connect("activate", self._on_clear_action)
self.add_action(clear_action)
font_size_inc_action = Gio.SimpleAction(name="font-size-inc")
- font_size_inc_action.connect("activate", self.ui_font_size_inc)
+ font_size_inc_action.connect("activate", self._on_font_size_inc_action)
self.add_action(font_size_inc_action)
font_size_dec_action = Gio.SimpleAction(name="font-size-dec")
- font_size_dec_action.connect("activate", self.ui_font_size_dec)
+ font_size_dec_action.connect("activate", self._on_font_size_dec_action)
self.add_action(font_size_dec_action)
paste_action = Gio.SimpleAction(name="paste")
- paste_action.connect("activate", self.ui_paste)
+ paste_action.connect("activate", self._on_paste_action)
self.add_action(paste_action)
copy_action = Gio.SimpleAction(name="copy")
copy_action.props.enabled = False
- copy_action.connect("activate", self.ui_copy)
+ copy_action.connect("activate", self._on_copy_action)
self.add_action(copy_action)
listen_dest_action = Gio.SimpleAction(name="listen-dest")
- listen_dest_action.connect("activate", self.ui_dest_listen)
+ listen_dest_action.connect("activate", self._on_dest_listen_action)
listen_dest_action.props.enabled = False
self.add_action(listen_dest_action)
suggest_action = Gio.SimpleAction(name="suggest")
suggest_action.props.enabled = False
- suggest_action.connect("activate", self.ui_suggest)
+ suggest_action.connect("activate", self._on_suggest_action)
self.add_action(suggest_action)
suggest_ok_action = Gio.SimpleAction(name="suggest-ok")
- suggest_ok_action.connect("activate", self.ui_suggest_ok)
+ suggest_ok_action.connect("activate", self._on_suggest_ok_action)
self.add_action(suggest_ok_action)
suggest_cancel_action = Gio.SimpleAction(name="suggest-cancel")
- suggest_cancel_action.connect("activate", self.ui_suggest_cancel)
+ suggest_cancel_action.connect("activate", self._on_suggest_cancel_action)
self.add_action(suggest_cancel_action)
listen_src_action = Gio.SimpleAction(name="listen-src")
- listen_src_action.connect("activate", self.ui_src_listen)
+ listen_src_action.connect("activate", self._on_src_listen_action)
listen_src_action.props.enabled = False
self.add_action(listen_src_action)
translation_action = Gio.SimpleAction(name="translation")
translation_action.props.enabled = False
- translation_action.connect("activate", self.translation)
+ translation_action.connect("activate", self._on_translation)
self.add_action(translation_action)
def setup(self):
@@ -231,8 +232,7 @@ def setup(self):
self.load_tts()
# Listen to active providers changes
- Settings.get().connect("translator-changed", self._on_active_provider_changed, "trans")
- Settings.get().connect("tts-changed", self._on_active_provider_changed, "tts")
+ Settings.get().connect("provider-changed", self._on_active_provider_changed)
# Bind text views font size
self.src_text.bind_property("font-size", self.dest_text, "font-size", GObject.BindingFlags.BIDIRECTIONAL)
@@ -272,47 +272,67 @@ def lang_names_func(code: str):
def setup_translation(self):
# Src buffer
self.src_buffer = self.src_text.props.buffer
- self.src_buffer.connect("changed", self.on_src_text_changed)
- self.src_buffer.connect("end-user-action", self.user_action_ended)
+ self.src_buffer.connect("changed", self._on_src_text_changed)
+ self.src_buffer.connect("end-user-action", self._on_user_action_ended)
# Dest buffer
self.dest_buffer = self.dest_text.props.buffer
self.dest_buffer.props.text = ""
- self.dest_buffer.connect("changed", self.on_dest_text_changed)
- # Translation progress spinner
+ self.dest_buffer.connect("changed", self._on_dest_text_changed)
+
+ # Translation progress
self.trans_spinner.hide()
self.trans_warning.hide()
- def load_translator(self):
- def on_done():
- if not self.provider["trans"]:
- return
+ def reload_provider(self, kind: str):
+ match kind:
+ case "translator":
+ self.load_translator()
+ case "tts":
+ self.load_tts()
- # Mistakes support
- if ProviderFeature.MISTAKES not in self.provider["trans"].features:
- self.mistakes.props.reveal_child = False
+ @background_task
+ async def load_translator(self):
+ self.translator_loading = True
- # Suggestions support
- self.ui_suggest_cancel(None, None)
- if ProviderFeature.SUGGESTIONS not in self.provider["trans"].features:
- self.edit_btn.props.visible = False
- else:
- self.edit_btn.props.visible = True
+ provider = Settings.get().active_translator
- # Pronunciation support
- if ProviderFeature.PRONUNCIATION not in self.provider["trans"].features:
- self.src_pron_revealer.props.reveal_child = False
- self.dest_pron_revealer.props.reveal_child = False
- self.app.lookup_action("pronunciation").props.enabled = False # type: ignore
- else:
- self.app.lookup_action("pronunciation").props.enabled = True # type: ignore
+ # Show loading view
+ self.main_stack.props.visible_child_name = "loading"
+
+ # Translator object
+ self.provider["trans"] = TRANSLATORS[provider]()
+ # Get saved languages
+ self.src_langs = self.provider["trans"].recent_src_langs
+ self.dest_langs = self.provider["trans"].recent_dest_langs
+ # Connect to provider settings changes
+ self.provider["trans"].settings.connect(
+ "changed::instance-url", self._on_provider_changed, self.provider["trans"].name
+ )
+ self.provider["trans"].settings.connect(
+ "changed::api-key", self._on_provider_changed, self.provider["trans"].name
+ )
+
+ try:
+ # Do provider init
+ await self.provider["trans"].init_trans()
+
+ # Update navigation UI
+ self._check_navigation_enabled()
+ # Check mistakes support
+ self._check_mistakes()
+ # Check pronunciation support
+ self._check_pronunciation()
+ # Check suggestions support and update UI
+ self._on_suggest_cancel_action()
+ self.edit_btn.props.visible = self.provider["trans"].supports_suggestions
# Update langs
self.src_lang_model.set_langs(self.provider["trans"].src_languages)
self.dest_lang_model.set_langs(self.provider["trans"].dest_languages)
# Update selected langs
- set_auto = Settings.get().src_auto and ProviderFeature.DETECTION in self.provider["trans"].features
+ set_auto = Settings.get().src_auto and self.provider["trans"].supports_detection
src_lang = self.provider["trans"].src_languages[0]
if self.src_langs and self.src_langs[0] in self.provider["trans"].src_languages:
src_lang = self.src_langs[0]
@@ -330,106 +350,81 @@ def on_done():
count = f"{str(self.src_buffer.get_char_count())}/{self.provider['trans'].chars_limit}"
self.char_counter.props.label = count
- self.translator_loading = False
-
- self.check_apikey()
-
- def on_fail(error: ProviderError):
- self.translator_loading = False
- self.loading_failed(error)
-
- provider = Settings.get().active_translator
-
- # Show loading view
- self.main_stack.props.visible_child_name = "loading"
-
- # Translator object
- self.provider["trans"] = TRANSLATORS[provider]()
- # Get saved languages
- self.src_langs = self.provider["trans"].recent_src_langs
- self.dest_langs = self.provider["trans"].recent_dest_langs
- # Do provider init
- self.provider["trans"].init_trans(on_done, on_fail)
-
- # Connect to provider settings changes
- self.provider["trans"].settings.connect(
- "changed::instance-url", self._on_provider_changed, self.provider["trans"].name
- )
- self.provider["trans"].settings.connect(
- "changed::api-key", self._on_provider_changed, self.provider["trans"].name
- )
-
- def check_apikey(self):
- def on_done(valid: bool):
- if valid:
- self.main_stack.props.visible_child_name = "translate"
- else:
- self.api_key_failed()
-
- def on_fail(error: ProviderError):
- self.loading_failed(error)
-
- if not self.provider["trans"]:
- return
-
- if ProviderFeature.API_KEY in self.provider["trans"].features:
- if self.provider["trans"].api_key:
- self.provider["trans"].validate_api_key(self.provider["trans"].api_key, on_done, on_fail)
- elif (
- not self.provider["trans"].api_key
- and ProviderFeature.API_KEY_REQUIRED in self.provider["trans"].features
- ):
- self.api_key_failed(required=True)
+ # Check API key
+ if self.provider["trans"].supports_api_key:
+ if self.provider["trans"].api_key:
+ try:
+ if await self.provider["trans"].validate_api_key(self.provider["trans"].api_key):
+ self.main_stack.props.visible_child_name = "translate"
+ else:
+ self.show_translator_api_key_view()
+ except ProviderError or RequestError as exc:
+ self.show_translator_error_view(detail=str(exc))
+ elif not self.provider["trans"].api_key and self.provider["trans"].api_key_required:
+ self.show_translator_api_key_view(required=True)
+ else:
+ self.main_stack.props.visible_child_name = "translate"
else:
self.main_stack.props.visible_child_name = "translate"
- else:
- self.main_stack.props.visible_child_name = "translate"
-
- def loading_failed(self, error: ProviderError):
- if not self.provider["trans"]:
- return
- # Api Key error
- if error.code in (ProviderErrorCode.API_KEY_INVALID, ProviderErrorCode.API_KEY_REQUIRED):
- self.api_key_failed(error.code == ProviderErrorCode.API_KEY_REQUIRED)
+ # Loading failed
+ except (RequestError, ProviderError) as exc:
+ logging.error(exc)
- # Other errors
- else:
- self.main_stack.props.visible_child_name = "error"
-
- service = self.provider["trans"].prettyname
- url = self.provider["trans"].instance_url
-
- title = _("Failed loading the translation service")
- description = _("Please report this in the Dialect bug tracker if the issue persists.")
- if ProviderFeature.INSTANCES in self.provider["trans"].features:
- description = _(
- (
- 'Failed loading "{url}", check if the instance address is correct or report in the Dialect bug tracker'
- " if the issue persists."
- )
- )
- description = description.format(url=url)
+ # API key error
+ if isinstance(exc, APIKeyRequired):
+ self.show_translator_api_key_view(required=True)
+ elif isinstance(exc, APIKeyInvalid):
+ self.show_translator_api_key_view()
- if error.code == ProviderErrorCode.NETWORK:
- title = _("Couldn’t connect to the translation service")
- description = _("We can’t connect to the server. Please check for network issues.")
- if ProviderFeature.INSTANCES in self.provider["trans"].features:
+ # Other errors
+ else:
+ service = self.provider["trans"].prettyname
+ url = self.provider["trans"].instance_url
+ detail = str(exc)
+
+ if isinstance(exc, RequestError):
+ title = _("Couldn’t connect to the translation service")
+ description = _("We can’t connect to the server. Please check for network issues.")
+ if self.provider["trans"].supports_instances:
+ description = _(
+ (
+ "We can’t connect to the {service} instance “{url}“.\n"
+ "Please check for network issues or if the address is correct."
+ )
+ ).format(service=service, url=url)
+ self.show_translator_error_view(title, description, detail)
+ elif self.provider["trans"].supports_instances:
description = _(
(
- "We can’t connect to the {service} instance “{url}”.\n"
- "Please check for network issues or if the address is correct."
+ "Failed loading “{url}“, check if the instance address is correct or report in the Dialect bug tracker"
+ " if the issue persists."
)
- )
- description = description.format(service=service, url=url)
+ ).format(url=url)
+ self.show_translator_error_view(description=description, detail=detail)
+ else:
+ self.show_translator_error_view(detail=detail)
+
+ finally:
+ self.translator_loading = False
- if error.message:
- description = description + "\n\n" + error.message + ""
+ def show_translator_error_view(
+ self,
+ title: str = _("Failed loading the translation service"),
+ description: str = _("Please report this in the Dialect bug tracker if the issue persists."),
+ detail: str | None = None,
+ ):
+ if detail: # Add detail bellow description
+ if description:
+ description += "\n\n"
+ description += "" + detail + ""
- self.error_page.props.title = title
- self.error_page.props.description = description
+ self.error_page.props.title = title
+ self.error_page.props.description = description
- def api_key_failed(self, required=False):
+ self.main_stack.props.visible_child_name = "error"
+
+ def show_translator_api_key_view(self, required=False):
if not self.provider["trans"]:
return
@@ -439,7 +434,7 @@ def api_key_failed(self, required=False):
else:
self.key_page.props.title = _("The provided API key is invalid")
- if ProviderFeature.API_KEY_REQUIRED in self.provider["trans"].features:
+ if self.provider["trans"].api_key_required:
self.key_page.props.description = _("Please set a valid API key in the preferences.")
else:
self.key_page.props.description = _(
@@ -450,35 +445,8 @@ def api_key_failed(self, required=False):
self.main_stack.props.visible_child_name = "api-key"
- @Gtk.Template.Callback()
- def retry_load_translator(self, _button):
- self.load_translator()
-
- @Gtk.Template.Callback()
- def remove_key_and_reload(self, _button):
- if self.provider["trans"]:
- self.provider["trans"].reset_api_key()
- self.load_translator()
-
- def load_tts(self):
- def on_done():
- self.speech_provider_failed = False
- self.src_speech_btn.ready()
- self.dest_speech_btn.ready()
- self._check_speech_enabled()
-
- def on_fail(error: ProviderError):
- button_text = _("Failed loading the text-to-speech service. Retry?")
- toast_text = _("Failed loading the text-to-speech service")
- if error.code == ProviderErrorCode.NETWORK:
- toast_text = _("Failed loading the text-to-speech service, check for network issues")
-
- self.speech_provider_failed = True
- self.src_speech_btn.error(button_text)
- self.dest_speech_btn.error(button_text)
- self.send_notification(toast_text)
- self._check_speech_enabled()
-
+ @background_task
+ async def load_tts(self):
self.src_speech_btn.loading()
self.dest_speech_btn.loading()
@@ -492,8 +460,6 @@ def on_fail(error: ProviderError):
# TTS Object
self.provider["tts"] = TTS[provider]()
- self.provider["tts"].init_tts(on_done, on_fail)
-
# Connect to provider settings changes
self.provider["tts"].settings.connect(
"changed::instance-url", self._on_provider_changed, self.provider["tts"].name
@@ -501,6 +467,31 @@ def on_fail(error: ProviderError):
self.provider["tts"].settings.connect(
"changed::api-key", self._on_provider_changed, self.provider["tts"].name
)
+
+ try:
+ # Do TTS init
+ await self.provider["tts"].init_tts()
+
+ self.speech_provider_failed = False
+ self.src_speech_btn.ready()
+ self.dest_speech_btn.ready()
+ self._check_speech_enabled()
+
+ # Loading failed
+ except (RequestError, ProviderError) as exc:
+ logging.error(exc)
+
+ button_text = _("Failed loading the text-to-speech service. Retry?")
+ toast_text = _("Failed loading the text-to-speech service")
+ if isinstance(exc, RequestError):
+ toast_text = _("Failed loading the text-to-speech service, check for network issues")
+
+ self.speech_provider_failed = True
+ self.src_speech_btn.error(button_text)
+ self.dest_speech_btn.error(button_text)
+ self.send_notification(toast_text)
+ self._check_speech_enabled()
+
else:
self.provider["tts"] = None
self.src_speech_btn.props.visible = False
@@ -525,15 +516,15 @@ def translate(self, text: str, src_lang: str | None, dest_lang: str | None):
# Set text to src buffer
self.src_buffer.props.text = text
# Run translation
- self.translation()
-
- def translate_selection(self, src_lang: str | None, dest_lang: str | None):
- def on_paste(clipboard, result):
- text = clipboard.read_text_finish(result)
- self.translate(text, src_lang, dest_lang)
+ self._on_translation()
+ @background_task
+ async def translate_selection(self, src_lang: str | None, dest_lang: str | None):
+ """Runs `translate` with the selection clipboard text"""
if display := Gdk.Display.get_default():
- display.get_primary_clipboard().read_text_async(None, on_paste)
+ clipboard = display.get_primary_clipboard()
+ if text := await clipboard.read_text_async(): # type: ignore
+ self.translate(text, src_lang, dest_lang)
def save_settings(self, *args, **kwargs):
if not self.is_maximized():
@@ -576,6 +567,73 @@ def toast_dismissed(_toast: Adw.Toast):
self.toast.props.priority = priority
self.toast_overlay.add_toast(self.toast)
+ def set_font_size(self, size: int):
+ self.src_text.font_size = size
+
+ def add_history_entry(self, translation: Translation):
+ """Add a history entry to the history list."""
+ if not self.provider["trans"]:
+ return
+
+ if self.current_history > 0:
+ del self.provider["trans"].history[: self.current_history]
+ self.current_history = 0
+ if len(self.provider["trans"].history) == TRANS_NUMBER:
+ self.provider["trans"].history.pop()
+ self.provider["trans"].history.insert(0, translation)
+ self._check_navigation_enabled()
+
+ @property
+ def current_translation(self) -> Translation | None:
+ """Get the current active translation, respecting the history navigation"""
+ if not self.provider["trans"]:
+ return None
+
+ try:
+ return self.provider["trans"].history[self.current_history]
+ except IndexError:
+ return None
+
+ def _check_navigation_enabled(self):
+ self.lookup_action("back").props.enabled = self.current_history < len(self.provider["trans"].history) - 1 # type: ignore
+ self.lookup_action("forward").props.enabled = self.current_history > 0 # type: ignore
+
+ def _check_mistakes(self):
+ if not self.provider["trans"]:
+ return
+
+ translation = self.current_translation
+ if self.provider["trans"].supports_mistakes and translation and translation.mistakes:
+ self.mistakes_label.set_markup(_("Did you mean: ") + f'{translation.mistakes.markup}')
+ self.mistakes.props.reveal_child = True
+ elif self.mistakes.props.reveal_child:
+ self.mistakes.props.reveal_child = False
+
+ def _check_pronunciation(self):
+ if not self.provider["trans"]:
+ return
+
+ if not self.provider["trans"].supports_pronunciation:
+ self.src_pron_revealer.props.reveal_child = False
+ self.dest_pron_revealer.props.reveal_child = False
+ self.app.lookup_action("pronunciation").props.enabled = False # type: ignore
+ else:
+ self.app.lookup_action("pronunciation").props.enabled = True # type: ignore
+ reveal = Settings.get().show_pronunciation
+ translation = self.current_translation
+
+ if translation and translation.pronunciation.src and not translation.mistakes:
+ self.src_pron_label.props.label = translation.pronunciation.src
+ self.src_pron_revealer.props.reveal_child = reveal
+ elif self.src_pron_revealer.props.reveal_child:
+ self.src_pron_revealer.props.reveal_child = False
+
+ if translation and translation.pronunciation.dest:
+ self.dest_pron_label.props.label = translation.pronunciation.dest
+ self.dest_pron_revealer.props.reveal_child = reveal
+ elif self.dest_pron_revealer.props.reveal_child:
+ self.dest_pron_revealer.props.reveal_child = False
+
def _check_speech_enabled(self):
if not self.provider["tts"]:
return
@@ -603,79 +661,6 @@ def _check_speech_enabled(self):
and not src_playing
)
- @Gtk.Template.Callback()
- def _on_src_lang_changed(self, _obj, _param):
- """Called on self.src_lang_selector::notify::selected signal"""
- if not self.provider["trans"]:
- return
-
- code = self.src_lang_selector.selected
- dest_code = self.dest_lang_selector.selected
-
- if self.provider["trans"].cmp_langs(code, dest_code):
- # Get first lang from saved src langs that is not current dest
- if valid := first_exclude(self.src_langs, dest_code):
- # Check if it's a valid dest lang
- valid = find_item_match([valid], self.provider["trans"].dest_languages)
- if not valid: # If not, just get the first lang from the list that is not selected
- valid = first_exclude(self.provider["trans"].dest_languages, dest_code)
-
- self.dest_lang_selector.selected = valid or ""
-
- if code in self.provider["trans"].src_languages:
- # Update saved src langs list
- if code in self.src_langs:
- # Bring lang to the top
- self.src_langs.remove(code)
- elif code.lower() in self.src_langs:
- # Bring lang to the top
- self.src_langs.remove(code.lower())
- elif len(self.src_langs) == 4:
- self.src_langs.pop()
- self.src_langs.insert(0, code)
-
- # Rewrite recent langs
- self.src_recent_lang_model.set_langs(self.src_langs, auto=True)
-
- self._check_switch_enabled()
- self._check_speech_enabled()
-
- @Gtk.Template.Callback()
- def _on_dest_lang_changed(self, _obj, _param):
- """Called on self.dest_lang_selector::notify::selected signal"""
- if not self.provider["trans"]:
- return
-
- code = self.dest_lang_selector.selected
- src_code = self.src_lang_selector.selected
-
- if self.provider["trans"].cmp_langs(code, src_code):
- # Get first lang from saved dest langs that is not current src
- if valid := first_exclude(self.dest_langs, src_code):
- # Check if it's a valid src lang
- valid = find_item_match([valid], self.provider["trans"].src_languages)
- if not valid: # If not, just get the first lang from the list that is not selected
- valid = first_exclude(self.provider["trans"].src_languages, src_code)
-
- self.src_lang_selector.selected = valid or ""
-
- # Update saved dest langs list
- if code in self.dest_langs:
- # Bring lang to the top
- self.dest_langs.remove(code)
- elif code.lower() in self.dest_langs:
- # Bring lang to the top
- self.dest_langs.remove(code.lower())
- elif len(self.src_langs) == 4:
- self.dest_langs.pop()
- self.dest_langs.insert(0, code)
-
- # Rewrite recent langs
- self.dest_recent_lang_model.set_langs(self.dest_langs)
-
- self._check_switch_enabled()
- self._check_speech_enabled()
-
def _check_switch_enabled(self):
if not self.provider["trans"]:
return
@@ -690,43 +675,33 @@ def _check_switch_enabled(self):
User interface functions
"""
- def ui_return(self, _action, _param):
+ def _on_back_action(self, *_args):
"""Go back one step in history."""
if self.current_history != TRANS_NUMBER:
self.current_history += 1
- self.history_update()
+ self._history_update()
- def ui_forward(self, _action, _param):
+ def _on_forward_action(self, *_args):
"""Go forward one step in history."""
if self.current_history != 0:
self.current_history -= 1
- self.history_update()
+ self._history_update()
- def add_history_entry(self, translation: Translation):
- """Add a history entry to the history list."""
+ def _history_update(self):
if not self.provider["trans"]:
return
- if self.current_history > 0:
- del self.provider["trans"].history[: self.current_history]
- self.current_history = 0
- if len(self.provider["trans"].history) == TRANS_NUMBER:
- self.provider["trans"].history.pop()
- self.provider["trans"].history.insert(0, translation)
- GLib.idle_add(self.reset_return_forward_btns)
+ if translation := self.current_translation:
+ self.src_lang_selector.selected = translation.original.src
+ self.dest_lang_selector.selected = translation.original.dest
+ self.src_buffer.props.text = translation.original.text
+ self.dest_buffer.props.text = translation.text
- def switch_all(self, src_language: str, dest_language: str, src_text: str, dest_text: str):
- self.src_lang_selector.selected = dest_language
- self.dest_lang_selector.selected = src_language
- self.src_buffer.props.text = dest_text
- self.dest_buffer.props.text = src_text
- self.add_history_entry(Translation(src_text, (dest_text, src_language, dest_language)))
-
- # Re-enable widgets
- self.langs_button_box.props.sensitive = True
- self.lookup_action("translation").props.enabled = self.src_buffer.get_char_count() != 0 # type: ignore
+ self._check_navigation_enabled()
+ self._check_mistakes()
+ self._check_pronunciation()
- def ui_switch(self, _action, _param):
+ def _on_switch_action(self, *_args):
# Get variables
self.langs_button_box.props.sensitive = False
self.lookup_action("translation").props.enabled = False # type: ignore
@@ -738,125 +713,158 @@ def ui_switch(self, _action, _param):
return
# Switch all
- self.switch_all(src_language, dest_language, src_text, dest_text)
+ self.src_lang_selector.selected = dest_language
+ self.dest_lang_selector.selected = src_language
+ self.src_buffer.props.text = dest_text
+ self.dest_buffer.props.text = src_text
+ self.add_history_entry(Translation(src_text, TranslationRequest(dest_text, src_language, dest_language)))
+
+ # Re-enable widgets
+ self.langs_button_box.props.sensitive = True
+ self.lookup_action("translation").props.enabled = self.src_buffer.get_char_count() != 0 # type: ignore
- def ui_from(self, _action, _param):
+ def _on_from_action(self, *_args):
self.src_lang_selector.button.popup()
- def ui_to(self, _action, _param):
+ def _on_to_action(self, *_args):
self.dest_lang_selector.button.popup()
- def ui_clear(self, _action, _param):
+ def _on_clear_action(self, *_args):
self.src_buffer.props.text = ""
self.src_buffer.emit("end-user-action")
- def set_font_size(self, size: int):
- self.src_text.font_size = size
-
- def ui_font_size_inc(self, _action, _param):
+ def _on_font_size_inc_action(self, *_args):
self.src_text.font_size_inc()
- def ui_font_size_dec(self, _action, _param):
+ def _on_font_size_dec_action(self, *_args):
self.src_text.font_size_dec()
- def ui_copy(self, _action, _param):
+ def _on_copy_action(self, *_args):
dest_text = self.dest_buffer.get_text(self.dest_buffer.get_start_iter(), self.dest_buffer.get_end_iter(), True)
if display := Gdk.Display.get_default():
display.get_clipboard().set(dest_text)
self.send_notification(_("Copied to clipboard"), timeout=1)
- def ui_paste(self, _action, _param):
- def on_paste(clipboard: Gdk.Clipboard, result: Gio.AsyncResult):
- text = clipboard.read_text_finish(result)
- if text is not None:
+ @background_task
+ async def _on_paste_action(self, *_args):
+ if display := Gdk.Display.get_default():
+ clipboard = display.get_clipboard()
+ if text := await clipboard.read_text_async(): # type: ignore
end_iter = self.src_buffer.get_end_iter()
self.src_buffer.insert(end_iter, text)
self.src_buffer.emit("end-user-action")
- if display := Gdk.Display.get_default():
- display.get_clipboard().read_text_async(None, on_paste)
-
- def ui_suggest(self, _action, _param):
+ def _on_suggest_action(self, *_args):
self.dest_toolbar_stack.props.visible_child_name = "edit"
self.before_suggest = self.dest_buffer.get_text(
self.dest_buffer.get_start_iter(), self.dest_buffer.get_end_iter(), True
)
self.dest_text.props.editable = True
- def ui_suggest_ok(self, _action, _param):
- def on_done(success):
- self.dest_toolbar_stack.props.visible_child_name = "default"
-
- if success:
- self.send_notification(_("New translation has been suggested!"))
- else:
- self.send_notification(_("Suggestion failed."))
-
- self.dest_text.props.editable = False
-
- def on_fail(error: ProviderError):
- self.dest_toolbar_stack.props.visible_child_name = "default"
- self.send_notification(_("Suggestion failed."))
- self.dest_text.props.editable = False
-
+ @background_task
+ async def _on_suggest_ok_action(self, *_args):
if not self.provider["trans"]:
return
- dest_text = self.dest_buffer.get_text(self.dest_buffer.get_start_iter(), self.dest_buffer.get_end_iter(), True)
-
- src, dest = self.provider["trans"].denormalize_lang(
- self.provider["trans"].history[self.current_history].original[1],
- self.provider["trans"].history[self.current_history].original[2],
- )
+ try:
+ dest_text = self.dest_buffer.get_text(
+ self.dest_buffer.get_start_iter(), self.dest_buffer.get_end_iter(), True
+ )
+ if translation := self.current_translation:
+ if await self.provider["trans"].suggest(
+ translation.original.text, translation.original.src, translation.original.dest, dest_text
+ ):
+ self.send_notification(_("New translation has been suggested!"))
+ else:
+ self.send_notification(_("Suggestion failed."))
- self.provider["trans"].suggest(
- self.provider["trans"].history[self.current_history].original[0], src, dest, dest_text, on_done, on_fail
- )
+ except (RequestError, ProviderError) as exc:
+ logging.error(exc)
+ self.send_notification(_("Suggestion failed."))
- self.before_suggest = None
+ finally:
+ self.dest_toolbar_stack.props.visible_child_name = "default"
+ self.dest_text.props.editable = False
+ self.before_suggest = None
- def ui_suggest_cancel(self, _action, _param):
+ def _on_suggest_cancel_action(self, *_args):
self.dest_toolbar_stack.props.visible_child_name = "default"
if self.before_suggest is not None:
self.dest_buffer.props.text = self.before_suggest
self.before_suggest = None
self.dest_text.props.editable = False
- def ui_src_listen(self, _action, _param):
+ def _on_src_listen_action(self, *_args):
if self.current_speech:
self._speech_reset()
return
src_text = self.src_buffer.get_text(self.src_buffer.get_start_iter(), self.src_buffer.get_end_iter(), True)
src_language = self.src_lang_selector.selected
- self._pre_speech(src_text, src_language, "src")
+ self._on_speech(src_text, src_language, "src")
- def ui_dest_listen(self, _action, _param):
+ def _on_dest_listen_action(self, *_args):
if self.current_speech:
self._speech_reset()
return
dest_text = self.dest_buffer.get_text(self.dest_buffer.get_start_iter(), self.dest_buffer.get_end_iter(), True)
dest_language = self.dest_lang_selector.selected
- self._pre_speech(dest_text, dest_language, "dest")
+ self._on_speech(dest_text, dest_language, "dest")
- def _pre_speech(self, text: str, lang: str, called_from: Literal["src", "dest"]):
- if text != "":
- self.speech_loading = True
- self.current_speech = {"text": text, "lang": lang, "called_from": called_from}
- self._check_speech_enabled()
+ @background_task
+ async def _on_speech(self, text: str, lang: str, called_from: Literal["src", "dest"]):
+ # Retry loading TTS provider
+ if self.speech_provider_failed:
+ self.load_tts()
+ return
- if self.speech_provider_failed:
- self.load_tts()
- else:
- self._download_speech()
+ if not text or not self.provider["tts"] or not self.player:
+ return
- if called_from == "src": # Show spinner on button
- self.src_speech_btn.loading()
- else:
- self.dest_speech_btn.loading()
- elif self.speech_provider_failed:
- self.load_tts()
+ # Set loading state and current speech to update UI
+ self.speech_loading = True
+ self.current_speech = {"text": text, "lang": lang, "called_from": called_from}
+ self._check_speech_enabled()
+
+ if called_from == "src": # Show spinner on button
+ self.src_speech_btn.loading()
+ else:
+ self.dest_speech_btn.loading()
+
+ # Download speech
+ try:
+ file_ = await self.provider["tts"].speech(self.current_speech["text"], self.current_speech["lang"])
+ uri = "file://" + file_.name
+ self.player.set_property("uri", uri)
+ self.player.set_state(Gst.State.PLAYING)
+ self.add_tick_callback(self._gst_progress_timeout)
+ file_.close()
+
+ except (RequestError, ProviderError) as exc:
+ logging.error(exc)
+
+ text = _("Text-to-Speech failed")
+ action: _NotificationAction | None = None
+
+ if isinstance(exc, RequestError):
+ text = _("Text-to-Speech failed, check for network issues")
+
+ if self.current_speech:
+ called_from = self.current_speech["called_from"]
+ action = {
+ "label": _("Retry"),
+ "name": "win.listen-src" if called_from == "src" else "win.listen-dest",
+ }
+
+ button_text = _("Text-to-Speech failed. Retry?")
+ if called_from == "src":
+ self.src_speech_btn.error(button_text)
+ else:
+ self.dest_speech_btn.error(button_text)
+
+ self.send_notification(text, action=action)
+ self._speech_reset(False)
def _speech_reset(self, set_ready: bool = True):
if not self.player:
@@ -871,57 +879,6 @@ def _speech_reset(self, set_ready: bool = True):
self.src_speech_btn.ready()
self.dest_speech_btn.ready()
- def _download_speech(self):
- def on_done(file: IO):
- try:
- self._play_audio(file.name)
- file.close()
- except Exception as exc:
- logging.error(exc)
- self._on_speech_failed()
-
- def on_fail(error: ProviderError):
- self._on_speech_failed(error)
-
- if not self.provider["tts"]:
- return
-
- if self.current_speech:
- lang: str = self.provider["tts"].denormalize_lang(self.current_speech["lang"]) # type: ignore
- self.provider["tts"].speech(self.current_speech["text"], lang, on_done, on_fail)
-
- def _on_speech_failed(self, error: ProviderError | None = None):
- text = _("Text-to-Speech failed")
- action: _NotificationAction | None = None
-
- if error and error.code == ProviderErrorCode.NETWORK:
- text = _("Text-to-Speech failed, check for network issues")
-
- if self.current_speech:
- called_from = self.current_speech["called_from"]
- action = {
- "label": _("Retry"),
- "name": "win.listen-src" if called_from == "src" else "win.listen-dest",
- }
-
- button_text = _("Text-to-Speech failed. Retry?")
- if called_from == "src":
- self.src_speech_btn.error(button_text)
- else:
- self.dest_speech_btn.error(button_text)
-
- self.send_notification(text, action=action)
- self._speech_reset(False)
-
- def _play_audio(self, path: str):
- if not self.player:
- return
-
- uri = "file://" + path
- self.player.set_property("uri", uri)
- self.player.set_state(Gst.State.PLAYING)
- self.add_tick_callback(self._gst_progress_timeout)
-
def _on_gst_message(self, _bus, message: Gst.Message):
if message.type == Gst.MessageType.EOS or message.type == Gst.MessageType.ERROR:
if message.type == Gst.MessageType.ERROR:
@@ -950,42 +907,7 @@ def _gst_progress_timeout(self, _widget, _clock):
return False
- @Gtk.Template.Callback()
- def _on_key_event(self, _ctrl, keyval: int, _keycode: int, state: Gdk.ModifierType):
- """Called on self.win_key_ctrlr::key-pressed signal"""
- modifiers = state & Gtk.accelerator_get_default_mod_mask()
- shift_mask = Gdk.ModifierType.SHIFT_MASK
- unicode_key_val = Gdk.keyval_to_unicode(keyval)
- if (
- GLib.unichar_isgraph(chr(unicode_key_val))
- and modifiers in (shift_mask, 0)
- and not self.dest_text.props.editable
- and not self.src_text.is_focus()
- ):
- self.src_text.grab_focus()
- end_iter = self.src_buffer.get_end_iter()
- self.src_buffer.insert(end_iter, chr(unicode_key_val))
- return Gdk.EVENT_STOP
- return Gdk.EVENT_PROPAGATE
-
- @Gtk.Template.Callback()
- def _on_src_activated(self, _texview):
- """Called on self.src_text::active signal"""
- if not Settings.get().live_translation:
- self.translation()
-
- @Gtk.Template.Callback()
- def _on_mistakes_clicked(self, _button, _data):
- """Called on self.mistakes_label::activate-link signal"""
- self.mistakes.props.reveal_child = False
- if self.trans_mistakes[1]:
- self.src_buffer.props.text = self.trans_mistakes[1]
- # Run translation again
- self.translation()
-
- return Gdk.EVENT_STOP
-
- def on_src_text_changed(self, buffer: Gtk.TextBuffer):
+ def _on_src_text_changed(self, buffer: Gtk.TextBuffer):
if not self.provider["trans"]:
return
@@ -1007,213 +929,284 @@ def on_src_text_changed(self, buffer: Gtk.TextBuffer):
self.lookup_action("clear").props.enabled = sensitive # type: ignore
self._check_speech_enabled()
- def on_dest_text_changed(self, buffer: Gtk.TextBuffer):
+ def _on_dest_text_changed(self, buffer: Gtk.TextBuffer):
if not self.provider["trans"]:
return
sensitive = buffer.get_char_count() != 0
self.lookup_action("copy").props.enabled = sensitive # type: ignore
self.lookup_action("suggest").set_enabled( # type: ignore
- ProviderFeature.SUGGESTIONS in self.provider["trans"].features and sensitive
+ self.provider["trans"].supports_suggestions and sensitive
)
self._check_speech_enabled()
- def user_action_ended(self, _buffer):
+ def _on_user_action_ended(self, _buffer):
if Settings.get().live_translation:
- self.translation()
+ self._on_translation()
- # The history part
- def reset_return_forward_btns(self):
- self.lookup_action("back").props.enabled = self.current_history < len(self.provider["trans"].history) - 1 # type: ignore
- self.lookup_action("forward").props.enabled = self.current_history > 0 # type: ignore
+ @Gtk.Template.Callback()
+ def _on_retry_load_translator_clicked(self, *_args):
+ self.reload_provider("translator")
- # Retrieve translation history
- def history_update(self):
+ @Gtk.Template.Callback()
+ def _on_remove_key_and_reload_clicked(self, *_args):
+ if self.provider["trans"]:
+ self.provider["trans"].reset_api_key()
+ self.reload_provider("translator")
+
+ @Gtk.Template.Callback()
+ def _on_src_lang_changed(self, *_args):
+ """Called on self.src_lang_selector::notify::selected signal"""
if not self.provider["trans"]:
return
- self.reset_return_forward_btns()
- translation = self.provider["trans"].history[self.current_history]
- self.src_lang_selector.selected = translation.original[1]
- self.dest_lang_selector.selected = translation.original[2]
- self.src_buffer.props.text = translation.original[0]
- self.dest_buffer.props.text = translation.text
+ code = self.src_lang_selector.selected
+ dest_code = self.dest_lang_selector.selected
- def appeared_before(self):
- if not self.provider["trans"]:
- return
+ if self.provider["trans"].cmp_langs(code, dest_code):
+ # Get first lang from saved src langs that is not current dest
+ if valid := first_exclude(self.src_langs, dest_code):
+ # Check if it's a valid dest lang
+ valid = find_item_match([valid], self.provider["trans"].dest_languages)
+ if not valid: # If not, just get the first lang from the list that is not selected
+ valid = first_exclude(self.provider["trans"].dest_languages, dest_code)
- src_language = self.src_lang_selector.selected
- dest_language = self.dest_lang_selector.selected
- src_text = self.src_buffer.get_text(self.src_buffer.get_start_iter(), self.src_buffer.get_end_iter(), True)
- if (
- len(self.provider["trans"].history) >= self.current_history + 1
- and (self.provider["trans"].history[self.current_history].original[1] == src_language or "auto")
- and self.provider["trans"].history[self.current_history].original[2] == dest_language
- and self.provider["trans"].history[self.current_history].original[0] == src_text
- ):
- return True
- return False
+ self.dest_lang_selector.selected = valid or ""
+
+ if code in self.provider["trans"].src_languages:
+ # Update saved src langs list
+ if code in self.src_langs:
+ # Bring lang to the top
+ self.src_langs.remove(code)
+ elif code.lower() in self.src_langs:
+ # Bring lang to the top
+ self.src_langs.remove(code.lower())
+ elif len(self.src_langs) == 4:
+ self.src_langs.pop()
+ self.src_langs.insert(0, code)
+
+ # Rewrite recent langs
+ self.src_recent_lang_model.set_langs(self.src_langs, auto=True)
+
+ self._check_switch_enabled()
+ self._check_speech_enabled()
@Gtk.Template.Callback()
- def translation(self, _action=None, _param=None):
+ def _on_dest_lang_changed(self, *_args):
+ """Called on self.dest_lang_selector::notify::selected signal"""
if not self.provider["trans"]:
return
- # If it's like the last translation then it's useless to continue
- if not self.appeared_before():
- src_text = self.src_buffer.get_text(self.src_buffer.get_start_iter(), self.src_buffer.get_end_iter(), True)
- src_language = self.src_lang_selector.selected
- dest_language = self.dest_lang_selector.selected
+ code = self.dest_lang_selector.selected
+ src_code = self.src_lang_selector.selected
- if self.ongoing_trans:
- self.next_trans = {"text": src_text, "src": src_language, "dest": dest_language}
- return
+ if self.provider["trans"].cmp_langs(code, src_code):
+ # Get first lang from saved dest langs that is not current src
+ if valid := first_exclude(self.dest_langs, src_code):
+ # Check if it's a valid src lang
+ valid = find_item_match([valid], self.provider["trans"].src_languages)
+ if not valid: # If not, just get the first lang from the list that is not selected
+ valid = first_exclude(self.provider["trans"].src_languages, src_code)
- if self.next_trans:
- src_text = self.next_trans["text"]
- src_language = self.next_trans["src"]
- dest_language = self.next_trans["dest"]
- self.next_trans = {}
+ self.src_lang_selector.selected = valid or ""
- # Show feedback for start of translation.
- self.translation_loading()
+ # Update saved dest langs list
+ if code in self.dest_langs:
+ # Bring lang to the top
+ self.dest_langs.remove(code)
+ elif code.lower() in self.dest_langs:
+ # Bring lang to the top
+ self.dest_langs.remove(code.lower())
+ elif len(self.src_langs) == 4:
+ self.dest_langs.pop()
+ self.dest_langs.insert(0, code)
- # If the two languages are the same, nothing is done
- if src_language != dest_language:
- if src_text != "":
- self.ongoing_trans = True
+ # Rewrite recent langs
+ self.dest_recent_lang_model.set_langs(self.dest_langs)
- src, dest = self.provider["trans"].denormalize_lang(src_language, dest_language)
- self.provider["trans"].translate(
- src_text, src, dest, self.on_translation_success, self.on_translation_fail
- )
- else:
- self.trans_mistakes = (None, None)
- self.trans_src_pron = None
- self.trans_dest_pron = None
- self.dest_buffer.props.text = ""
+ self._check_switch_enabled()
+ self._check_speech_enabled()
+
+ @Gtk.Template.Callback()
+ def _on_key_event(self, _ctrl, keyval: int, _keycode: int, state: Gdk.ModifierType):
+ """Called on self.win_key_ctrlr::key-pressed signal"""
+ modifiers = state & Gtk.accelerator_get_default_mod_mask()
+ shift_mask = Gdk.ModifierType.SHIFT_MASK
+ unicode_key_val = Gdk.keyval_to_unicode(keyval)
+ if (
+ GLib.unichar_isgraph(chr(unicode_key_val))
+ and modifiers in (shift_mask, 0)
+ and not self.dest_text.props.editable
+ and not self.src_text.is_focus()
+ ):
+ self.src_text.grab_focus()
+ end_iter = self.src_buffer.get_end_iter()
+ self.src_buffer.insert(end_iter, chr(unicode_key_val))
+ return Gdk.EVENT_STOP
+ return Gdk.EVENT_PROPAGATE
- if not self.ongoing_trans:
- self.translation_finish()
+ @Gtk.Template.Callback()
+ def _on_src_activated(self, _texview):
+ """Called on self.src_text::active signal"""
+ if not Settings.get().live_translation:
+ self._on_translation()
- def on_translation_success(self, translation: Translation):
- if not self.provider["trans"]:
+ @Gtk.Template.Callback()
+ def _on_mistakes_clicked(self, *_args):
+ """Called on self.mistakes_label::activate-link signal"""
+ self.mistakes.props.reveal_child = False
+
+ translation = self.current_translation
+ if translation and translation.mistakes:
+ self.src_buffer.props.text = translation.mistakes.text
+ # Ensure we're in the same languages
+ self.src_lang_selector.selected = translation.original.src
+ self.dest_lang_selector.selected = translation.original.dest
+
+ # Run translation again
+ self._on_translation()
+
+ return Gdk.EVENT_STOP
+
+ @Gtk.Template.Callback()
+ @background_task
+ async def _on_translation(self, *_args):
+ if not self.provider["trans"] or self._appeared_before():
+ # If it's like the last translation then it's useless to continue
return
- self.trans_warning.props.visible = False
+ # Run translation
+ if self.next_translation:
+ request = self.next_translation
+ self.next_translation = None
+ else:
+ text = self.src_buffer.get_text(self.src_buffer.get_start_iter(), self.src_buffer.get_end_iter(), True)
+ request = TranslationRequest(text, self.src_lang_selector.selected, self.dest_lang_selector.selected)
- if translation.detected and self.src_lang_selector.selected == "auto":
- if Settings.get().src_auto:
- self.src_lang_selector.set_insight(self.provider["trans"].normalize_lang_code(translation.detected))
- else:
- self.src_lang_selector.selected = translation.detected
+ if self.translation_loading:
+ self.next_translation = request
+ return
- self.dest_buffer.props.text = translation.text
+ # Show feedback for start of translation.
+ self.trans_spinner.show()
+ self.dest_box.props.sensitive = False
+ self.langs_button_box.props.sensitive = False
- self.trans_mistakes = translation.mistakes
- self.trans_src_pron = translation.pronunciation[0]
- self.trans_dest_pron = translation.pronunciation[1]
+ # If the two languages are the same, nothing is done
+ if request.src != request.dest or request.text != "":
+ self.translation_loading = True
- # Finally, translation is saved in history
- self.add_history_entry(translation)
+ try:
+ translation = await self.provider["trans"].translate(request)
- # Mistakes
- if ProviderFeature.MISTAKES in self.provider["trans"].features and not self.trans_mistakes == (None, None):
- self.mistakes_label.set_markup(_("Did you mean: ") + f'{self.trans_mistakes[0]}')
- self.mistakes.props.reveal_child = True
- elif self.mistakes.props.reveal_child:
- self.mistakes.props.reveal_child = False
+ if translation.detected and self.src_lang_selector.selected == "auto":
+ if Settings.get().src_auto:
+ self.src_lang_selector.set_insight(
+ self.provider["trans"].normalize_lang_code(translation.detected)
+ )
+ else:
+ self.src_lang_selector.selected = translation.detected
+
+ self.dest_buffer.props.text = translation.text
+
+ # Finally, translation is saved in history
+ self.add_history_entry(translation)
+
+ self._check_mistakes()
+ self._check_pronunciation()
+
+ # Translation failed
+ except (RequestError, ProviderError) as exc:
+ self.trans_warning.props.visible = True
+ self.lookup_action("copy").props.enabled = False # type: ignore
+ self.lookup_action("listen-src").props.enabled = False # type: ignore
+ self.lookup_action("listen-dest").props.enabled = False # type: ignore
+
+ if isinstance(exc, RequestError):
+ self.send_notification(
+ _("Translation failed, check for network issues"),
+ action={
+ "label": _("Retry"),
+ "name": "win.translation",
+ },
+ )
+ elif isinstance(exc, APIKeyInvalid):
+ self.send_notification(
+ _("The provided API key is invalid"),
+ action={
+ "label": _("Retry"),
+ "name": "win.translation",
+ },
+ )
+ elif isinstance(exc, APIKeyRequired):
+ self.send_notification(
+ _("API key is required to use the service"),
+ action={
+ "label": _("Preferences"),
+ "name": "app.preferences",
+ },
+ )
+ else:
+ self.send_notification(
+ _("Translation failed"),
+ action={
+ "label": _("Retry"),
+ "name": "win.translation",
+ },
+ )
- # Pronunciation
- reveal = Settings.get().show_pronunciation
- if ProviderFeature.PRONUNCIATION in self.provider["trans"].features:
- if self.trans_src_pron is not None and self.trans_mistakes == (None, None):
- self.src_pron_label.props.label = self.trans_src_pron
- self.src_pron_revealer.props.reveal_child = reveal
- elif self.src_pron_revealer.props.reveal_child:
- self.src_pron_revealer.props.reveal_child = False
+ else:
+ self.trans_warning.props.visible = False
- if self.trans_dest_pron is not None:
- self.dest_pron_label.props.label = self.trans_dest_pron
- self.dest_pron_revealer.props.reveal_child = reveal
- elif self.dest_pron_revealer.props.reveal_child:
- self.dest_pron_revealer.props.reveal_child = False
+ finally:
+ self.translation_loading = False
- self.ongoing_trans = False
- if self.next_trans:
- self.translation()
+ if self.next_translation:
+ self._on_translation()
+ else:
+ self._translation_finish()
else:
- self.translation_finish()
-
- def on_translation_fail(self, error: ProviderError):
- if not self.next_trans:
- self.translation_finish()
- self.trans_warning.props.visible = True
- self.ongoing_trans = False
-
- match error.code:
- case ProviderErrorCode.NETWORK:
- self.send_notification(
- _("Translation failed, check for network issues"),
- action={
- "label": _("Retry"),
- "name": "win.translation",
- },
- )
- case ProviderErrorCode.API_KEY_INVALID:
- self.send_notification(
- _("The provided API key is invalid"),
- action={
- "label": _("Retry"),
- "name": "win.translation",
- },
- )
- case ProviderErrorCode.API_KEY_REQUIRED:
- self.send_notification(
- _("API key is required to use the service"),
- action={
- "label": _("Preferences"),
- "name": "app.preferences",
- },
- )
- case _:
- self.send_notification(
- _("Translation failed"),
- action={
- "label": _("Retry"),
- "name": "win.translation",
- },
- )
+ self.trans_mistakes = None
+ self.dest_buffer.props.text = ""
- def translation_loading(self):
- self.trans_spinner.show()
- self.dest_box.props.sensitive = False
- self.langs_button_box.props.sensitive = False
+ if not self.translation_loading:
+ self._translation_finish()
+
+ def _appeared_before(self):
+ if not self.provider["trans"]:
+ return
+
+ src_language = self.src_lang_selector.selected
+ dest_language = self.dest_lang_selector.selected
+ src_text = self.src_buffer.get_text(self.src_buffer.get_start_iter(), self.src_buffer.get_end_iter(), True)
+ translation = self.current_translation
+ if (
+ len(self.provider["trans"].history) >= self.current_history + 1
+ and translation
+ and (translation.original.src == src_language or "auto")
+ and translation.original.dest == dest_language
+ and translation.original.text == src_text
+ ):
+ return True
+ return False
- def translation_finish(self):
+ def _translation_finish(self):
self.trans_spinner.hide()
self.dest_box.props.sensitive = True
self.langs_button_box.props.sensitive = True
- def reload_translator(self):
- self.translator_loading = True
-
- # Load translator
- self.load_translator()
+ """
+ Provider changes functions
+ """
- def _on_active_provider_changed(self, _settings: Gio.Settings, _provider: str, kind: str):
+ def _on_active_provider_changed(self, _settings: Settings, kind: str, _name: str):
self.save_settings()
- match kind:
- case "trans":
- self.reload_translator()
- case "tts":
- self.load_tts()
+ self.reload_provider(kind)
def _on_provider_changed(self, _settings: Gio.Settings, _key: str, name: str):
if not self.translator_loading:
if self.provider["trans"] and name == self.provider["trans"].name:
- self.reload_translator()
+ self.reload_provider("translator")
if self.provider["tts"] and name == self.provider["tts"].name:
- self.load_tts()
+ self.reload_provider("tts")