import os import sys import json import warnings import platform import shutil import subprocess import re from threading import Thread warnings.filterwarnings("ignore") def detect_encoding(file_path): import chardet import charset_normalizer with open(file_path, "rb") as f: raw_data = f.read() detected_encoding = None for detectors in (charset_normalizer, chardet): try: result = detectors.detect(raw_data)["encoding"] except Exception: continue if result is not None: detected_encoding = result break encoding = detected_encoding if detected_encoding else "utf-8" return encoding.lower() def get_resource_path(package, resource): """ Get the path to a resource file, with fallback to local file system. Args: package (str): Package name containing the resource (e.g., 'abogen.assets') resource (str): Resource filename (e.g., 'icon.ico') Returns: str: Path to the resource file, or None if not found """ from importlib import resources # Try using importlib.resources first try: with resources.path(package, resource) as resource_path: if os.path.exists(resource_path): return str(resource_path) except (ImportError, FileNotFoundError): pass # Always try to resolve as a relative path from this file parts = package.split(".") rel_path = os.path.join( os.path.dirname(os.path.abspath(__file__)), *parts[1:], resource ) if os.path.exists(rel_path): return rel_path # Fallback to local file system try: # Extract the subdirectory from package name (e.g., 'assets' from 'abogen.assets') subdir = package.split(".")[-1] if "." in package else package local_path = os.path.join( os.path.dirname(os.path.abspath(__file__)), subdir, resource ) if os.path.exists(local_path): return local_path except Exception: pass return None def get_version(): """Return the current version of the application.""" try: with open(get_resource_path("/", "VERSION"), "r") as f: return f.read().strip() except Exception: return "Unknown" # Define config path def get_user_config_path(): from platformdirs import user_config_dir # TODO Config directory is changed for Linux and MacOS. But if old config exists, it will be used. # On non‑Windows, prefer ~/.config/abogen if it already exists if platform.system() != "Windows": custom_dir = os.path.join(os.path.expanduser("~"), ".config", "abogen") if os.path.exists(custom_dir): config_dir = custom_dir else: config_dir = user_config_dir( "abogen", appauthor=False, roaming=True, ensure_exists=True ) else: # Windows and fallback case config_dir = user_config_dir( "abogen", appauthor=False, roaming=True, ensure_exists=True ) return os.path.join(config_dir, "config.json") # Define cache path def get_user_cache_path(folder=None): from platformdirs import user_cache_dir cache_dir = user_cache_dir( "abogen", appauthor=False, opinion=True, ensure_exists=True ) if folder: cache_dir = os.path.join(cache_dir, folder) # Ensure the directory exists os.makedirs(cache_dir, exist_ok=True) return cache_dir _sleep_procs = {"Darwin": None, "Linux": None} # Store sleep prevention processes def clean_text(text, *args, **kwargs): # Load replace_single_newlines from config cfg = load_config() replace_single_newlines = cfg.get("replace_single_newlines", False) # Collapse all whitespace (excluding newlines) into single spaces per line and trim edges lines = [re.sub(r"[^\S\n]+", " ", line).strip() for line in text.splitlines()] text = "\n".join(lines) # Standardize paragraph breaks (multiple newlines become exactly two) and trim overall whitespace text = re.sub(r"\n{3,}", "\n\n", text).strip() # Optionally replace single newlines with spaces, but preserve double newlines if replace_single_newlines: text = re.sub(r"(?>", "", text) # Ignore metadata patterns text = re.sub(r"<]*>>", "", text) # Ignore newlines text = text.replace("\n", "") # Ignore leading/trailing spaces text = text.strip() # Calculate character count char_count = len(text) return char_count def get_gpu_acceleration(enabled): """ Check GPU acceleration availability. Note: On Windows, torch DLLs must be pre-loaded in main.py before PyQt6 to avoid DLL initialization errors. """ try: import torch from torch.cuda import is_available as cuda_available if not enabled: return "GPU available but using CPU.", False # Check for Apple Silicon MPS if platform.system() == "Darwin" and platform.processor() == "arm": if torch.backends.mps.is_available(): return "MPS GPU available and enabled.", True else: return "MPS GPU not available on Apple Silicon. Using CPU.", False # Check for CUDA if cuda_available(): return "CUDA GPU available and enabled.", True # Gather CUDA diagnostic info if not available try: cuda_devices = torch.cuda.device_count() cuda_error = ( torch.cuda.get_device_name(0) if cuda_devices > 0 else "No devices found" ) except Exception as e: cuda_error = str(e) return f"CUDA GPU is not available. Using CPU. ({cuda_error})", False except Exception as e: return f"Error checking GPU: {e}", False def prevent_sleep_start(): from abogen.constants import PROGRAM_NAME system = platform.system() if system == "Windows": import ctypes ctypes.windll.kernel32.SetThreadExecutionState( 0x80000000 | 0x00000001 | 0x00000040 ) elif system == "Darwin": _sleep_procs["Darwin"] = create_process(["caffeinate"]) elif system == "Linux": # Add program name and reason for inhibition program_name = PROGRAM_NAME reason = "Prevent sleep during abogen process" # Only attempt to use systemd-inhibit if it's available on the system. if shutil.which("systemd-inhibit"): _sleep_procs["Linux"] = create_process( [ "systemd-inhibit", f"--who={program_name}", f"--why={reason}", "--what=sleep", "--mode=block", "sleep", "infinity", ] ) else: # Non-systemd distro or systemd tools not installed: skip inhibition rather than crash print( "systemd-inhibit not found: skipping sleep inhibition on this Linux system." ) def prevent_sleep_end(): system = platform.system() if system == "Windows": import ctypes ctypes.windll.kernel32.SetThreadExecutionState(0x80000000) # ES_CONTINUOUS elif system in ("Darwin", "Linux") and _sleep_procs[system]: try: _sleep_procs[system].terminate() _sleep_procs[system] = None except Exception: pass def load_numpy_kpipeline(): import numpy as np from kokoro import KPipeline return np, KPipeline class LoadPipelineThread(Thread): def __init__(self, callback): super().__init__() self.callback = callback def run(self): try: np_module, kpipeline_class = load_numpy_kpipeline() self.callback(np_module, kpipeline_class, None) except Exception as e: self.callback(None, None, str(e))