import functools
import json
import logging
import os
import pwd
import secrets
import shutil
import string
import subprocess
from pathlib import Path
from typing import Any, Callable, Type

import yaml

from defence360agent.utils import LazyLock

logger = logging.getLogger(__name__)

MIN_TENANT_UID = 1000

# Reserved accounts that the password-reset RPCs must never target. Kept in
# sync with the proxy-side IsSystemUsername blocklist in src/proxy/auth/jwt.go.
SYSTEM_USERNAMES = frozenset(
    {
        "root",
        "daemon",
        "bin",
        "sys",
        "adm",
        "nobody",
        "_imunify",
        "mysql",
        "postgres",
        "redis",
        "memcached",
        "apache",
        "nginx",
        "httpd",
        "www-data",
        "www",
        "postfix",
        "mail",
        "mailman",
        "dovecot",
        "named",
        "bind",
        "dnsmasq",
        "nscd",
        "dbus",
        "polkitd",
        "chrony",
        "ntp",
        "sshd",
        "ftp",
        "vsftpd",
        "proftpd",
        "sync",
        "shutdown",
        "halt",
        "games",
        "operator",
        "lp",
        "news",
        "uucp",
        "man",
        "squid",
        "clamav",
        "clamupdate",
        "clamscan",
        "cpanel",
        "cpaneleximfilter",
        "cpses",
        "diradmin",
        "plesk",
        "psaadm",
        "psaftp",
        "varnish",
        "haproxy",
        "docker",
        "containerd",
        "systemd-network",
        "systemd-resolve",
        "systemd-timesync",
    }
)

_FEATURE_FLAGS_PATH = Path("/var/imunify360/feature_flags.json")
_MERGED_CONFIG_PATH = Path(
    "/etc/sysconfig/imunify360/imunify360-merged.config"
)


class RulesLock(LazyLock):
    pass


class ModSecLock(LazyLock):
    pass


def return_value_on_error(error: Type[Exception], value: Any) -> Callable:
    """
    Decorator that causes coroutine to return *value* if it raises *error*.

    For example:

       return await return_value_on_error(FileNotFoundError, X)(coro)(*args)

    is equivalent to:

       try:
           return await coro(*args)
       except FileNotFoundError:
           return X
    """

    def decorator(coro):
        @functools.wraps(coro)
        async def wrapper(*args, **kwargs):
            try:
                return await coro(*args, **kwargs)
            except error as e:
                logger.info("Replacing %s from %s with %s", e, coro, value)
                return value

        return wrapper

    return decorator


class PasswordChangeError(Exception):
    def __init__(self, stderr, returncode):
        super().__init__(
            f"Password change process exited with code {returncode}: {stderr}"
        )


def generate_strong_password(
    length=20,
    required_char_groups=(
        string.ascii_lowercase,
        string.ascii_uppercase,
        string.digits,
        string.punctuation,
    ),
):
    """
    Return password with *length* char.
    It is guaranteed that password have at least one character from
    each given *required_char_groups*
    """
    if length < len(required_char_groups):
        raise ValueError(
            f"Given {length=} is too short. "
            "Can't get chars from all required groups "
            f"{len(required_char_groups)=}"
        )
    assert all(
        map(
            len,
            required_char_groups,
        )
    ), "got empty char group"

    alphabet = "".join(required_char_groups)
    while True:
        # generate password with given *length*
        # take characters from *alphabet* randomly
        # (uniformly/with equal probability)
        password = "".join([secrets.choice(alphabet) for _ in range(length)])
        # retry if the password is missing some required char groups
        if all(
            any(c in group for c in password) for group in required_char_groups
        ):
            return password


def _assert_non_system_user(username):
    from defence360agent.rpc_tools import ValidationError

    if not username or username.lower() in SYSTEM_USERNAMES:
        logger.warning(
            "Rejected password change for reserved account %r", username
        )
        raise ValidationError(
            "Refusing to change password for reserved system "
            f"account: {username!r}"
        )
    try:
        uid = pwd.getpwnam(username).pw_uid
    except KeyError:
        return
    if uid < MIN_TENANT_UID:
        logger.warning(
            "Rejected password change for non-tenant account %r (uid %d)",
            username,
            uid,
        )
        raise ValidationError(
            "Refusing to change password for non-tenant account "
            f"(uid {uid} < {MIN_TENANT_UID}): {username!r}"
        )


def change_system_password(username, new_password, *, passwd_cmd=None):
    _assert_non_system_user(username)
    if passwd_cmd is None:
        passwd_cmd = [shutil.which("passwd", path=os.defpath)]
    assert isinstance(passwd_cmd, list)
    try:
        subprocess.run(
            [*passwd_cmd, username],
            input=b"\n".join([new_password.encode()] * 2),
            capture_output=True,
            check=True,
        )
    except subprocess.CalledProcessError as e:
        raise PasswordChangeError(e.stderr, e.returncode)


def is_apache2nginx_enabled() -> bool:
    """Check if apache2nginx is enabled by reading /var/lib/apache2nginx/state."""
    state_file = Path("/var/lib/apache2nginx/state")
    if state_file.is_file():
        try:
            content = state_file.read_text().strip()
            return content == "on"
        except OSError:
            logger.exception("Failed to read apache2nginx state file")
    return False


def _use_feature_flags_from_yaml() -> bool:
    # Read WEBSHIELD.use_feature_flags directly from the merged config YAML
    # instead of going through the Webshield.USE_FEATURE_FLAGS FromConfig
    # descriptor. Reading the descriptor is what we'd normally do, but in
    # this code path it would cause unbounded recursion at agent startup:
    #
    #     use_coraza4cpanel()
    #       -> Webshield.USE_FEATURE_FLAGS              # FromConfig.__get__
    #         -> config_to_dict()
    #           -> cerberus Validator.normalized()      # normalizes the
    #              whole schema, including FIREWALL
    #             -> default_setter lambda for the      # firewall.py:70
    #                FIREWALL.TCP_IN_IPv4 etc. ports
    #               -> get_default_ports()              # firewall.py:28
    #                 -> HostingPanel().OPEN_PORTS
    #                   -> get_hosting_panel()
    #                     -> cPanel.__new__()           # cpanel/panel.py:100
    #                       -> use_coraza4cpanel()      # <-- back to start
    #
    # The cycle exists because HostingPanel ports defaults are resolved
    # lazily through HostingPanel() at normalization time, and cPanel's
    # __new__ branches on use_coraza4cpanel() to decide whether to return
    # the Coraza-flavored subclass. Adding a config dependency inside
    # use_coraza4cpanel closed the loop.
    #
    # Reading the YAML directly skips cerberus normalization and breaks the
    # cycle. The merged config is the file the agent writes after merging
    # defaults with user overrides, so it reflects the effective value the
    # descriptor would have returned. If the file is missing (fresh install,
    # or pre-feature-flags agent that hasn't rewritten it yet), we fall back
    # to True to match the schema default.
    try:
        with _MERGED_CONFIG_PATH.open() as fp:
            merged = yaml.safe_load(fp) or {}
    except FileNotFoundError:
        return True
    except (OSError, yaml.YAMLError) as error:
        logger.warning(
            "Failed to read %s: %s; assuming use_feature_flags=True",
            _MERGED_CONFIG_PATH,
            error,
        )
        return True
    return merged.get("WEBSHIELD", {}).get("use_feature_flags", True)


def _feature_flag_enabled(flag_name: str) -> bool:
    if not _use_feature_flags_from_yaml():
        return False
    try:
        data = json.loads(_FEATURE_FLAGS_PATH.read_text())
        return data.get(flag_name, False)
    except FileNotFoundError:
        return False
    except Exception as error:
        logger.error("Failed to read feature flags file: %s", error)
        return False


def use_coraza4cpanel() -> bool:
    return _feature_flag_enabled("coraza_for_cpanel_apache")


def coraza_for_cloudways_hybrid_flag() -> bool:
    return _feature_flag_enabled("coraza_for_cloudways_hybrid")
