#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2018 Andy Stewart # # Author: Andy Stewart # Maintainer: Andy Stewart # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . from PyQt5 import QtCore from PyQt5.QtGui import QClipboard from PyQt5.QtWidgets import QApplication from epc.client import EPCClient from functools import wraps from ast import literal_eval import base64 import functools import os import socket import subprocess import sys import threading import re class PostGui(QtCore.QObject): through_thread = QtCore.pyqtSignal(object, object) def __init__(self, inclass=True): super(PostGui, self).__init__() self.through_thread.connect(self.on_signal_received) self.inclass = inclass def __call__(self, func): self._func = func @functools.wraps(func) def obj_call(*args, **kwargs): self.emit_signal(args, kwargs) return obj_call def emit_signal(self, args, kwargs): self.through_thread.emit(args, kwargs) def on_signal_received(self, args, kwargs): try: if self.inclass: obj, args = args[0], args[1:] self._func(obj, *args, **kwargs) else: self._func(*args, **kwargs) except Exception: import traceback traceback.print_exc() def touch(path): if not os.path.exists(path): basedir = os.path.dirname(path) if not os.path.exists(basedir): os.makedirs(basedir) with open(path, 'a'): os.utime(path) def get_free_port(): """ Determines a free port using sockets. """ free_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) free_socket.bind(('0.0.0.0', 0)) free_socket.listen(5) port = free_socket.getsockname()[1] free_socket.close() return port def is_port_in_use(port): import socket with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: return s.connect_ex(('localhost', port)) == 0 def string_to_base64(text): return str(base64.b64encode(str(text).encode("utf-8")), "utf-8") def get_local_ip(): try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) return s.getsockname()[0] except OSError: print("Network is unreachable") sys.exit() def popen_and_call(popen_args, on_exit, stdout_file=None): """ Runs the given args in a subprocess.Popen, and then calls the function on_exit when the subprocess completes. on_exit is a callable object, and popen_args is a list/tuple of args that would give to subprocess.Popen. """ def run_in_thread(on_exit, popen_args): proc = subprocess.Popen(popen_args, stdout=stdout_file) proc.wait() on_exit() return thread = threading.Thread(target=run_in_thread, args=(on_exit, popen_args)) thread.start() # returns immediately after the thread starts return thread def call_and_check_code(popen_args, on_exit, stdout_file=None): """ Runs the given args in a subprocess.Popen, and then calls the function on_exit when the subprocess completes. on_exit is a callable object, and popen_args is a list/tuple of args that would give to subprocess.Popen. """ def run_in_thread(on_exit, popen_args): retcode = subprocess.call(popen_args, stdout=stdout_file) on_exit(retcode) return thread = threading.Thread(target=run_in_thread, args=(on_exit, popen_args)) thread.start() # returns immediately after the thread starts return thread def get_clipboard_text(): ''' Get text from system clipboard.''' clipboard = QApplication.clipboard() text = clipboard.text() if text: return text if clipboard.supportsSelection(): return clipboard.text(QClipboard.Selection) return "" def set_clipboard_text(text): ''' Set text to system clipboard.''' clipboard = QApplication.clipboard() clipboard.setText(text) if clipboard.supportsSelection(): clipboard.setText(text, QClipboard.Selection) def interactive(insert_or_do = False, msg_emacs = None, new_name = None): """ Defines an interactive command invoked from Emacs. """ def wrap(f, insert_or_do = insert_or_do, msg_emacs = msg_emacs, new_name = new_name): f.interactive = True f.insert_or_do = insert_or_do f.msg_emacs = msg_emacs f.new_name = new_name @wraps(f) def wrapped_f(*args, **kwargs): return f(*args, **kwargs) return wrapped_f # Support both @interactive and @interactive() as valid syntax. if callable(insert_or_do): return wrap(insert_or_do, insert_or_do = False, msg_emacs = None, new_name = None) else: return wrap def abstract(f): """ Add a `abstract` flag to a method, We don't use abs.abstractmethod cause we don't need strict implementation check. """ f.abstract = True @wraps(f) def wrap(*args, **kwargs): return f(*args, **kwargs) return wrap epc_client = None def init_epc_client(emacs_server_port): global epc_client if epc_client == None: try: epc_client = EPCClient(("localhost", emacs_server_port), log_traceback=True) except ConnectionRefusedError: import traceback traceback.print_exc() def close_epc_client(): global epc_client if epc_client != None: epc_client.close() def convert_arg_to_str(arg): if type(arg) == str: return arg elif type(arg) == bool: arg = str(arg).upper() elif type(arg) == list: new_arg = "" for a in arg: new_arg = new_arg + " " + convert_arg_to_str(a) arg = "(" + new_arg[1:] + ")" return arg def eval_in_emacs(method_name, args): global epc_client if epc_client == None: print("Please call init_epc_client first before callling eval_in_emacs.") else: args = list(map(convert_arg_to_str, args)) # Make argument encode with Base64, avoid string quote problem pass to elisp side. args = list(map(string_to_base64, args)) args.insert(0, method_name) # Call eval-in-emacs elisp function. epc_client.call("eval-in-emacs", args) def get_emacs_theme_mode(): return get_emacs_func_result("eaf-get-theme-mode", []) def get_emacs_theme_background(): return get_emacs_func_result("eaf-get-theme-background-color", []) def get_emacs_theme_foreground(): return get_emacs_func_result("eaf-get-theme-foreground-color", []) def get_emacs_func_result(method_name, args): global epc_client if epc_client == None: print("Please call init_epc_client first before callling eval_in_emacs.") else: args = list(map(convert_arg_to_str, args)) # Make argument encode with Base64, avoid string quote problem pass to elisp side. args = list(map(string_to_base64, args)) args.insert(0, method_name) # Call eval-in-emacs elisp function synchronously and return the result result = epc_client.call_sync("eval-in-emacs", args) return result if result != [] else False def message_to_emacs(message, prefix=True, logging=True): eval_in_emacs('eaf--show-message', [message, prefix, logging]) def clear_emacs_message(): eval_in_emacs('eaf--clear-message', []) def set_emacs_var(var_name, var_value): eval_in_emacs('eaf--set-emacs-var', [var_name, var_value]) def open_url_in_background_tab(url): eval_in_emacs('eaf-open-browser-in-background', [url]) def duplicate_page_in_new_tab(url): eval_in_emacs('eaf-browser--duplicate-page-in-new-tab', [url]) def open_url_in_new_tab(url): eval_in_emacs('eaf-open-browser', [url]) def open_url_in_new_tab_other_window(url): eval_in_emacs('eaf-open-browser-other-window', [url]) def translate_text(text): eval_in_emacs('eaf-translate-text', [text]) def input_message(buffer_id, message, callback_tag, input_type, input_content): eval_in_emacs('eaf--input-message', [buffer_id, message, callback_tag, input_type, input_content]) def focus_emacs_buffer(buffer_id): eval_in_emacs('eaf-focus-buffer', [buffer_id]) def atomic_edit(buffer_id, focus_text): eval_in_emacs('eaf--atomic-edit', [buffer_id, focus_text]) def list_string_to_list(list_string): '''Convert the list string from emacs var to list type.''' list_var = list_string.removeprefix('(').removesuffix(')') quote = 0 extra_char_num = 0 for x in range(len(list_var)): x += extra_char_num if list_var[x] == '"': if quote == 1: quote = 0 else: quote = 1 if (list_var[x] == '(') and (quote == 0): list_var = list_var[:x] + '[' + list_var[x + 1:] elif (list_var[x] == ')') and (quote == 0): list_var = list_var[:x] + ']' + list_var[x + 1:] elif (list_var[x] == ' ') and (quote == 0): list_var = list_var[:x] + '{split}' + list_var[x + 1:] extra_char_num += 6 elif (list_var[x] == '.') and (quote == 0): list_var = list_var[:x] + '' + list_var[x + 2:] extra_char_num -= 2 list_var = str(list_var.split('{split}')).replace("', '[", "', ['").replace("]'", "']").replace("'[", "['").replace("'\"", "'").replace("\"'", "'") if list_var[0] == "'" and list_var[1] == "[": list_var = "['" + list_var[2:] list_var = literal_eval(str(list_var)) return list_var def convert_emacs_bool(symbol_value, symbol_is_boolean): if symbol_is_boolean == "t": return symbol_value == True else: return symbol_value def get_emacs_vars(args): global epc_client return list(map(lambda result: convert_emacs_bool(result[0], result[1]) if result != [] else False, epc_client.call_sync("get-emacs-vars", args))) def get_emacs_var(var_name): global epc_client (symbol_value, symbol_is_boolean) = epc_client.call_sync("get-emacs-var", [var_name]) return convert_emacs_bool(symbol_value, symbol_is_boolean) emacs_config_dir = "" def get_emacs_config_dir(): global emacs_config_dir if emacs_config_dir == "": emacs_config_dir = os.path.join(os.path.expanduser(get_emacs_var("eaf-config-location")), '') return emacs_config_dir def to_camel_case(string): components = string.split('_') return components[0] + ''.join(x.title() for x in components[1:])