mod_security.py
| 33.3 KB | Satir:
0
| py
Geri
import errno import logging import os import pwd import shutil import zipfile from contextlib import suppress from datetime import datetime, timezone from functools import lru_cache from pathlib import Path from typing import Dict, Optional from urllib.parse import urlparse from defence360agent.contracts.config import ConfigFile from defence360agent.subsys.panels.base import PanelException from defence360agent.subsys.panels.directadmin.panel import get_user_domains from defence360agent.utils import ( atomic_rewrite, check_run, ensure_line_in_file, remove_line_from_file, ) from im360.subsys.panels.base import ( APACHE, LITESPEED, MODSEC_NAME_TEMPLATE, NGINX, FilesVendor, FilesVendorList, ModSecSettingInterface, ModSecurityInterface, skip_if_not_installed_modsec, ) from defence360agent.subsys.web_server import ( apache_running, check_with_timeout, graceful_restart, litespeed_running, ) from .custombuild import CustomBuildOptions, build, custombuild2_only logger = logging.getLogger(__name__) MODSEC_CONF_DIR = "/etc/modsecurity.d" # Versioned snapshots of the live Apache modsec dir live here; MODSEC_CONF_DIR # becomes a symlink into this directory so deploys flip atomically (see # _publish_live_modsec_dir_atomic). Kept under /etc so the symlink rename in # _swap_conf_symlink_atomic stays on one filesystem. MODSEC_CONF_VERSIONS_DIR = "/etc/modsecurity.d.versions" # Recent live-dir snapshots kept after each publish so an Apache reload that # already resolved the symlink to the previous snapshot can finish reading it. _LIVE_SNAPSHOTS_RETAINED = 3 RULESET_FILENAME = "I360_RULESET" CB_MODSEC_CUSTOM_DIR = "/usr/local/directadmin/custombuild/custom/modsecurity" CB_MODSEC_CUSTOM_CONF_DIR = os.path.join(CB_MODSEC_CUSTOM_DIR, "conf") _BROKEN_CUSTOM_CONFIG_MSG = "User has a broken custom httpd config" _DIRADMIN_USER = "diradmin" _DIRECTADMIN_TASK_QUEUE = "/usr/local/directadmin/data/task.queue" _WEBSERVER_RESTART_CHECK_TIMEOUT = 60 WEB_SERVER_CHECKS = { APACHE: apache_running, LITESPEED: litespeed_running, } SUPPORTED_WEB_SERVERS = WEB_SERVER_CHECKS.keys() # type: Container[str] # DEF-41547: Atomic vendor deploy via versioned dir + symlink swap. # # Historical behavior rmtree'd CB_MODSEC_CUSTOM_CONF_DIR and re-extracted # ~37 vendor files in place, leaving `/etc/modsecurity.d/` in a half-written # state for seconds. An Apache graceful reload landing in that window (either # from our own agent or from DirectAdmin's dataskq) fails with AH00526. # We now extract into a fresh versioned sibling directory and flip a symlink # atomically, so readers always see a complete file set. _VERSIONED_CONF_PREFIX = "conf.v" def _new_versioned_conf_dir(base_dir: Optional[str] = None) -> str: """Return a fresh versioned dir path under base_dir. Name includes PID to stay unique if two apply()s ever collide at the same microsecond (belt-and-suspenders; @use_modsec_lock already serialises in-process). base_dir defaults to CB_MODSEC_CUSTOM_DIR, resolved at call time so tests can monkeypatch the module constant. """ if base_dir is None: base_dir = CB_MODSEC_CUSTOM_DIR ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%S.%fZ") return os.path.join( base_dir, "{}{}.{}".format(_VERSIONED_CONF_PREFIX, ts, os.getpid()), ) def _fsync_dir(path: str) -> None: """Open `path` as a directory fd and fsync it. Used to persist directory-entry changes (renames, symlink creates/ deletes) against an unclean shutdown — without this, the change may only live in the page cache and the pre-change state can reappear on next boot. """ dir_fd = os.open(path, os.O_RDONLY | getattr(os, "O_DIRECTORY", 0)) try: os.fsync(dir_fd) finally: os.close(dir_fd) def _extract_vendor_zip(zip_path: str, target_dir: str) -> None: """Extract flat-layout vendor zip into target_dir, fsyncing each file and the directory entry so contents are durable before the symlink flip.""" with zipfile.ZipFile(zip_path) as zf: for member in zf.namelist(): filename = os.path.basename(member) if not filename: continue out_path = os.path.join(target_dir, filename) with zf.open(member) as src, open(out_path, "wb") as dst: shutil.copyfileobj(src, dst) dst.flush() os.fsync(dst.fileno()) _fsync_dir(target_dir) def _cleanup_staging_entry(path: str) -> None: """Remove a staging file/symlink/directory at `path` so the caller can recreate something there. Handles regular files, symlinks (including dangling), and real directories. No-op if the path does not exist. Portable across Linux (unlink on a dir returns EISDIR → IsADirectoryError) and BSD/macOS (returns EPERM → PermissionError), so we route on path type rather than exception class. Only FileNotFoundError is suppressed (covers the tight race between the type check and the removal). Other errors (permissions, busy, read-only FS) propagate so the failure surfaces with its specific cause instead of the caller later hitting FileExistsError with no context. """ if os.path.islink(path): # Covers live and dangling symlinks (even ones pointing at dirs — # shutil.rmtree would raise on these since Python 3.10). with suppress(FileNotFoundError): os.unlink(path) return if os.path.isdir(path): shutil.rmtree(path) return with suppress(FileNotFoundError): os.unlink(path) def _write_ruleset_marker(target_dir: str, vendor_id: str) -> None: with open(os.path.join(target_dir, RULESET_FILENAME), "w") as f: f.write(vendor_id) def _drop_rules_conf_main(target_dir: str) -> None: with suppress(FileNotFoundError): os.unlink(os.path.join(target_dir, "rules.conf.main")) def _swap_conf_symlink_atomic( new_target_dir: str, conf_path: Optional[str] = None, ) -> None: """Flip conf_path to point at new_target_dir in one rename. On Linux, os.rename is atomic when dst is a regular file or symlink on the same filesystem. It fails with ENOTEMPTY/EEXIST if dst is a non-empty directory, which is why _migrate_conf_to_symlink_if_needed() must run first on upgrade to convert any pre-existing real directory into a symlink. After the rename we fsync the directory that holds conf_path so the swap is durable against an unclean power/VM shutdown — without it the rename may only live in the page cache and the old target could reappear on next boot. The dentry that changes is conf_path's own, so the parent to fsync is os.path.dirname(conf_path), not new_target_dir's parent — for the live dir these differ (the symlink sits in /etc, its target under the versions dir). """ if conf_path is None: conf_path = CB_MODSEC_CUSTOM_CONF_DIR tmp_link = conf_path + ".new" # Tolerate any leftover at tmp_link from a crashed previous apply. _cleanup_staging_entry(tmp_link) os.symlink(new_target_dir, tmp_link) os.rename(tmp_link, conf_path) # Past this point the swap is committed and visible to readers. The # fsync is a pure durability concern — letting its failure propagate # would land in apply()'s `except BaseException: shutil.rmtree(new_dir)` # rollback, which would delete the now-active symlink target and # dangle the live symlink (Cursor Bugbot finding on f6317514b3). try: _fsync_dir(os.path.dirname(conf_path)) except OSError as exc: logger.warning( "fsync after modsec symlink swap failed (swap is visible to " "readers but may not survive an unclean shutdown): %s", exc, ) def _migrate_conf_to_symlink_if_needed( conf_path: Optional[str] = None, custom_dir: Optional[str] = None, ) -> None: """Convert a pre-upgrade real conf_path to a symlink under custom_dir. Idempotent: no-op if the path is already a symlink or does not exist. We pre-create the replacement symlink at `tmp_link` as a *dangling* link (its target `initial` doesn't exist yet), then do two back-to-back renames: os.rename(conf, initial) # conf dentry vanishes; initial # becomes the old real dir; tmp_link # becomes a valid symlink → initial os.rename(tmp_link, conf) # conf dentry reappears as the symlink The upgrade window where conf has no dentry is exactly the user-space return from the first rename plus the syscall entry to the second — on the order of a microsecond. This is a strictly one-shot event per agent upgrade; after the first apply() runs, conf is a symlink forever and the islink() check at the top turns this function into a no-op. Making the swap fully gap-free would need renameat2(RENAME_EXCHANGE) (not in stdlib; EL7's glibc 2.17 has no wrapper so we'd need a raw syscall shim + per-arch syscall-number table + filesystem-support fallback). The migration race was judged not worth that complexity: the only readers of CB_MODSEC_CUSTOM_CONF_DIR during the window are our own apply() (serialised under @use_modsec_lock) and DirectAdmin's build() if dataskq happens to trigger it concurrently — in which case build() would log "source not found" and skip, with no effect on Apache (which reads /etc/modsecurity.d/, not this path). """ if conf_path is None: conf_path = CB_MODSEC_CUSTOM_CONF_DIR if custom_dir is None: custom_dir = CB_MODSEC_CUSTOM_DIR if os.path.islink(conf_path): return if not os.path.isdir(conf_path): return # Fresh install; apply() will create the symlink directly. os.makedirs(custom_dir, exist_ok=True) # "0000-" prefix sorts the backup before any real (year-leading) timestamp, # so retention treats it as the oldest snapshot and prunes it first once # newer publishes exist — a plain "conf.vinitial" would instead lexically # outrank every timestamp and permanently occupy a retained live slot. initial = os.path.join(custom_dir, _VERSIONED_CONF_PREFIX + "0000-initial") tmp_link = conf_path + ".new" # shutil.rmtree raises on a top-level symlink since Python 3.10, so # route through _cleanup_staging_entry which handles symlink/file/dir. _cleanup_staging_entry(initial) _cleanup_staging_entry(tmp_link) # Pre-create the replacement symlink (dangling — `initial` doesn't # exist yet) so only the two renames below separate "conf is a real # directory" from "conf is a symlink pointing at initial". os.symlink(initial, tmp_link) try: os.rename(conf_path, initial) except OSError: # conf_path could not be moved aside (e.g. it is a mount point and # rename raises EBUSY); drop the dangling replacement link so conf_path # is left exactly as we found it for the caller to handle. _cleanup_staging_entry(tmp_link) raise os.rename(tmp_link, conf_path) # Post-commit fsync: best-effort, never propagate. After the second # rename the migration layout is visible and correct; a fsync error # means we didn't achieve durability, not that we should undo the # (successful) visible swap. Sync both the dir holding conf_path (where # the symlink dentry lands) and custom_dir (where `initial` lands); for # the live dir these are different directories. for sync_dir in {os.path.dirname(conf_path), custom_dir}: try: _fsync_dir(sync_dir) except OSError as exc: logger.warning( "fsync after modsec conf migration failed (migration is " "visible but may not survive an unclean shutdown): %s", exc, ) def _tear_down_conf_dir( conf_path: Optional[str] = None, custom_dir: Optional[str] = None, ) -> None: """Remove conf_path (symlink or real dir). For a symlink, also rmtree the versioned target if it lives under custom_dir (safety-bounded so a tampered symlink can't delete arbitrary paths). Safe to call when the path does not exist. Does NOT recreate anything — the caller decides whether to `makedirs` an empty real dir or `rename` a backup into place. """ if conf_path is None: conf_path = CB_MODSEC_CUSTOM_CONF_DIR if custom_dir is None: custom_dir = CB_MODSEC_CUSTOM_DIR if os.path.islink(conf_path): target = None with suppress(OSError): target = os.readlink(conf_path) os.unlink(conf_path) if target is not None: if not os.path.isabs(target): target = os.path.join(os.path.dirname(conf_path), target) # Use realpath on both sides so the containment check is # resilient to custom_dir itself being a symlink on exotic # installs (abspath is purely lexical and would miss logical # containment through an intermediate symlink). custom_prefix = os.path.realpath(custom_dir) + os.sep if os.path.realpath(target).startswith(custom_prefix): shutil.rmtree(target, ignore_errors=True) elif os.path.isdir(conf_path): shutil.rmtree(conf_path) def _cleanup_old_versioned_conf_dirs( keep: str, base_dir: Optional[str] = None, keep_recent: int = 1 ) -> None: """Best-effort removal of stale conf.v* dirs under base_dir. Always keeps `keep` (the just-published target). When keep_recent > 1 the newest keep_recent snapshots in total are retained — needed for the live Apache dir, where an in-flight reload may have already resolved the symlink to the previous snapshot and still be reading it; deleting that snapshot out from under the reload would re-create the missing-file crash. The versioned names are timestamped, so a reverse lexical sort is newest-first. """ if base_dir is None: base_dir = CB_MODSEC_CUSTOM_DIR try: names = os.listdir(base_dir) except FileNotFoundError: return keep_abs = os.path.abspath(keep) versioned = sorted( (n for n in names if n.startswith(_VERSIONED_CONF_PREFIX)), reverse=True, ) extra_retained = 0 for name in versioned: candidate = os.path.join(base_dir, name) if os.path.islink(candidate): continue if os.path.abspath(candidate) == keep_abs: continue if extra_retained < keep_recent - 1: extra_retained += 1 continue shutil.rmtree(candidate, ignore_errors=True) def _copy_tree_durable(source_dir: str, target_dir: str) -> None: """Copy source_dir's entries into target_dir, fsyncing each file and the directory entry so the full set is durable before the symlink flip. Mirrors _extract_vendor_zip's durability discipline but for an on-disk source. Symlinks are recreated as symlinks; nested dirs are recursed; file modes are preserved. The enclosing target_dir is created 0700 by the caller, so contents stay root-only regardless of individual file mode. """ for name in os.listdir(source_dir): src = os.path.join(source_dir, name) dst = os.path.join(target_dir, name) if os.path.islink(src): os.symlink(os.readlink(src), dst) elif os.path.isdir(src): os.makedirs(dst) _copy_tree_durable(src, dst) shutil.copymode(src, dst) else: with open(src, "rb") as fsrc, open(dst, "wb") as fdst: shutil.copyfileobj(fsrc, fdst) fdst.flush() os.fsync(fdst.fileno()) shutil.copymode(src, dst) _fsync_dir(target_dir) def _publish_live_modsec_dir_in_place(source_dir: str) -> None: """Populate MODSEC_CONF_DIR in place from source_dir. Fallback for hosts where MODSEC_CONF_DIR cannot be turned into a versioned symlink because it cannot be renamed aside (e.g. it is a mount point and os.rename raises EBUSY). The atomic-swap guarantee does not hold here — a reload landing mid-publish can still observe a partial set — but the rule set installs instead of the deploy failing outright, matching the pre-existing in-place behaviour on such hosts. """ os.makedirs(MODSEC_CONF_DIR, exist_ok=True) for name in os.listdir(MODSEC_CONF_DIR): _cleanup_staging_entry(os.path.join(MODSEC_CONF_DIR, name)) _copy_tree_durable(source_dir, MODSEC_CONF_DIR) def _publish_live_modsec_dir_atomic( source_dir: Optional[str] = None, ) -> None: """Publish the complete modsec rule set into the live Apache dir (MODSEC_CONF_DIR) atomically. DirectAdmin's `build modsecurity_ruleset` populates /etc/modsecurity.d by deleting every file then re-creating it in place, leaving the directory Apache reads half-written for tens of milliseconds on each deploy. A graceful reload landing in that window fails to compile a rule whose referenced file (phrase file, ipmatch file, lua script) is not yet present and crashes Apache. We instead copy the already-complete staging set into a fresh versioned dir and flip MODSEC_CONF_DIR (a symlink) in one rename, so a reload always observes a complete rule set. Only the symlink flip is atomic: the agent's own post-publish writes into the published dir (ip-record.db, *_Disable_WP_Redirect.conf, the global-disabled link) are re-applied on every deploy rather than crash-atomic against an externally triggered reload. """ if source_dir is None: source_dir = CB_MODSEC_CUSTOM_CONF_DIR os.makedirs(MODSEC_CONF_VERSIONS_DIR, exist_ok=True) try: _migrate_conf_to_symlink_if_needed( MODSEC_CONF_DIR, MODSEC_CONF_VERSIONS_DIR ) except OSError as exc: if exc.errno != errno.EBUSY: raise # MODSEC_CONF_DIR can't be renamed aside (it is a mount point or is # otherwise pinned by the system), so it cannot become the versioned # symlink the atomic swap relies on. Publish in place rather than # failing the whole vendor install — the rule set still goes live, # only without the atomic-swap guarantee on this host. logger.warning( "%s cannot be converted to a versioned symlink (%s); publishing " "the modsec rule set in place without the atomic swap", MODSEC_CONF_DIR, exc, ) _publish_live_modsec_dir_in_place(source_dir) return new_live = _new_versioned_conf_dir(MODSEC_CONF_VERSIONS_DIR) os.makedirs(new_live) os.chmod(new_live, 0o700) try: _copy_tree_durable(source_dir, new_live) _swap_conf_symlink_atomic(new_live, MODSEC_CONF_DIR) except BaseException: shutil.rmtree(new_live, ignore_errors=True) raise # Retain a few recent snapshots: a graceful reload that resolved the live # symlink just before the swap may still be reading the previous snapshot. _cleanup_old_versioned_conf_dirs( keep=new_live, base_dir=MODSEC_CONF_VERSIONS_DIR, keep_recent=_LIVE_SNAPSHOTS_RETAINED, ) def _restore_real_live_modsec_dir() -> None: """Undo the symlink layout: turn MODSEC_CONF_DIR back into an empty real directory so DirectAdmin's build repopulates a normal directory on ruleset revert/rollback (when the agent hands the dir back to DA).""" _tear_down_conf_dir(MODSEC_CONF_DIR, MODSEC_CONF_VERSIONS_DIR) # The symlink layout is being abandoned; drop the retained snapshots and # the versions dir too so a revert/uninstall leaves nothing behind. shutil.rmtree(MODSEC_CONF_VERSIONS_DIR, ignore_errors=True) os.makedirs(MODSEC_CONF_DIR, exist_ok=True) os.chmod(MODSEC_CONF_DIR, 0o700) class DirectAdminModSecException(PanelException): pass async def run_cmd(cmd): logger.debug("Running CMD: %s", cmd) data = await check_run(cmd.split(), raise_exc=DirectAdminModSecException) return data.decode().strip() class ModSecSettings(ModSecSettingInterface): INCLUDE_CONFIG = "/etc/httpd/conf/extra/httpd-includes.conf" I360_INCLUDE = 'Include "/etc/httpd/conf/extra/modsec2.imunify.conf"' config_key = "prev_settings" @classmethod async def revert(cls, **kwargs): remove_line_from_file(cls.INCLUDE_CONFIG, cls.I360_INCLUDE) # TODO: Revert SecRuleEngine value @classmethod async def apply(cls): ensure_line_in_file(cls.INCLUDE_CONFIG, cls.I360_INCLUDE) # TODO: Set SecRuleEngine value @lru_cache(1) def get_diradmin_pwnam(): return pwd.getpwnam(_DIRADMIN_USER) def rewrite_httpd_config(user: str): action = "action=rewrite&value=httpd&user={}\n".format(user) with open(_DIRECTADMIN_TASK_QUEUE, "a") as queue: queue.write(action) async def get_custombuild_webserver(): web_server = CustomBuildOptions("webserver").get() # NGINX is handling real requests in this combination proxying it to # Apache as needed. if web_server == "nginx_apache": web_server = NGINX return web_server async def get_outer_web_server(): """Return the name of the web server that handles incoming requests. For the purposes of ModSecurity we are interested in the one web server running on the machine that handles actual requests from users. This is related to the usage of Nginx as a reverse proxy to Apache. """ web_server = await get_custombuild_webserver() if web_server in SUPPORTED_WEB_SERVERS: return web_server return None class DirectAdminModSecurity(ModSecurityInterface): GLOBAL_DISABLED_RULES_CONFIG = ( "/etc/httpd/conf/extra/i360_modsec_disable.conf" ) GLOBAL_DISABLED_RULES_LINK = "zz_i360_modsec_disable.conf" USER_INCLUDE_PATH_TMPL = ( "/usr/local/directadmin/data/users/{user}/domains/{domain}.cust_httpd" ) USER_RULES_START_MARK = "# IMUNIFY360 CONFIG START" USER_RULES_END_MARK = "# IMUNIFY360 CONFIG END" CUSTOM_RULES_BACKUP = os.path.join(CB_MODSEC_CUSTOM_DIR, "conf.i360backup") @classmethod def _get_conf_dir(cls) -> str: return MODSEC_CONF_DIR @classmethod def detect_cwaf(cls): pass CWAF_INSTALLATION_DIR = "/usr/local/cwaf" @classmethod async def sync_disabled_rules_for_domains( cls, domain_rules_map: Dict[str, list] ): for domain, rule_list in domain_rules_map.items(): user = get_user_domains().get(domain) if user is None: raise DirectAdminModSecException( "Cannot find owner of domain " + domain ) cls._write_user_custom_httpd_conf(user, domain, rule_list) rewrite_httpd_config(user) @classmethod def _write_user_custom_httpd_conf(cls, user, domain, rule_list): filename = cls.USER_INCLUDE_PATH_TMPL.format(user=user, domain=domain) our_config = [cls.USER_RULES_START_MARK] our_config.extend( cls.generate_disabled_rules_config(rule_list).split("\n") ) our_config.append(cls.USER_RULES_END_MARK) uid, gid = get_diradmin_pwnam()[2:4] if not os.path.isfile(filename): with open(filename, "w") as cust_httpd: os.chown(cust_httpd.fileno(), uid, gid) content = [] with open(filename, "r") as cust_httpd: lines = cust_httpd.read().split("\n") start_idx = end_idx = None try: start_idx = lines.index(cls.USER_RULES_START_MARK) end_idx = lines.index(cls.USER_RULES_END_MARK) except ValueError: pass if start_idx is None and end_idx is None: content = lines content.extend(our_config) elif ( start_idx is not None and end_idx is not None and start_idx < end_idx ): content = lines[:] content[start_idx : end_idx + 1] = our_config else: raise DirectAdminModSecException(_BROKEN_CUSTOM_CONFIG_MSG) if content[-1] != "": content.append("") # ensure newline at EOF atomic_rewrite( filename, "\n".join(content), backup=False, uid=uid, gid=gid ) @classmethod def write_global_disabled_rules(cls, rule_list) -> bool: """ :param list rule_list: rule ids to sync :return: True if config was changed, False otherwise """ changed = atomic_rewrite( cls.GLOBAL_DISABLED_RULES_CONFIG, cls.generate_disabled_rules_config(rule_list), backup=False, ) cls._ensure_global_disabled_rules_link_present() return changed @classmethod def _ensure_global_disabled_rules_link_present(cls): linkname = os.path.join( MODSEC_CONF_DIR, cls.GLOBAL_DISABLED_RULES_LINK ) try: os.remove(linkname) except FileNotFoundError: pass os.symlink(cls.GLOBAL_DISABLED_RULES_CONFIG, linkname) @classmethod async def sync_global_disabled_rules(cls, rule_list) -> bool: """ just alias to write_global_disabled_rules() """ return cls.write_global_disabled_rules(rule_list) @classmethod def get_audit_log_path(cls): return "/var/log/httpd/modsec_audit.log" @classmethod def get_audit_logdir_path(cls): return "/var/log/modsec_audit" @classmethod async def installed_modsec(cls): return CustomBuildOptions("modsecurity").get() == "yes" @classmethod async def _get_web_server(cls) -> Optional[str]: """ Return the name of the web server for which ModSecurity rules will be applied. """ return await get_outer_web_server() async def _rollback(self): await DirectAdminFilesVendorList.revert() CustomBuildOptions("modsecurity_ruleset").set("no") # revert_settings() below already restores the live dir back to a real # directory and runs DA's build; doing it here too would have that # restore wipe this build's output and reopen an empty-dir window, so # leave the single restore+build to revert_settings. await self.revert_settings() @custombuild2_only async def _install_settings(self, reload_wafd=True): web_server = await self._get_web_server() if web_server is None: logger.warning( "ModSecurity rules installation is not supported" " for the running web server configuration." ) return prev_settings = CustomBuildOptions("modsecurity_ruleset").get() # backup custom mod_security settings, if any vendorlist = await DirectAdminModSecurity.modsec_vendor_list() if "unknown_custom" in vendorlist and not os.path.exists( self.CUSTOM_RULES_BACKUP ): shutil.copytree( CB_MODSEC_CUSTOM_CONF_DIR, self.CUSTOM_RULES_BACKUP ) await ModSecSettings.apply() # apply() publishes the rules atomically into the live dir; add the # global-disabled-rules link first, then reload, so the install-time # reload honours the Include, the rules and the rule exclusions # together (no DA build needed — it would re-copy the live dir # non-atomically). await DirectAdminFilesVendorList.apply() self._ensure_global_disabled_rules_link_present() await graceful_restart() config = ConfigFile() config.set("MOD_SEC", ModSecSettings.config_key, prev_settings) # checking if we did not break web server configuration is_server_running = WEB_SERVER_CHECKS[web_server] if not await check_with_timeout( is_server_running, _WEBSERVER_RESTART_CHECK_TIMEOUT ): await self._rollback() logger.warning("Web server failed to start, settings reverted") async def modsec_get_directive(self, directive_name, default=None): raise NotImplementedError async def reset_modsec_directives(self): raise NotImplementedError async def reset_modsec_rulesets(self): raise NotImplementedError @custombuild2_only async def revert_settings(self, reload_wafd=True): if not await self.installed_modsec(): logger.warning( "Skipping vendor removal, because ModSecurity isn't installed" ) return config = ConfigFile() await ModSecSettings.revert() await DirectAdminFilesVendorList.revert() prev_settings = config.get("MOD_SEC", ModSecSettings.config_key) if prev_settings in ("comodo", "owasp"): CustomBuildOptions("modsecurity_ruleset").set(prev_settings) if os.path.exists(self.CUSTOM_RULES_BACKUP): # apply() may have turned CB_MODSEC_CUSTOM_CONF_DIR into a symlink; # tear it down correctly before restoring backup. _tear_down_conf_dir() os.rename(self.CUSTOM_RULES_BACKUP, CB_MODSEC_CUSTOM_CONF_DIR) # apply() also makes the live /etc/modsecurity.d a symlink; hand it # back to DirectAdmin as a real directory so its build repopulates the # reverted ruleset normally. _restore_real_live_modsec_dir() await build("modsecurity_ruleset") # Clear the config marker before the reload so bookkeeping stays in # sync with on-disk state if graceful_restart raises. config.set("MOD_SEC", ModSecSettings.config_key, None) await graceful_restart() @classmethod async def enabled_modsec_vendor_list(cls): """Return a list of enabled ModSecurity vendors.""" # seems that on DirectAdmin all rulesets are always enabled return await cls.modsec_vendor_list() @classmethod async def modsec_vendor_list(cls): """Return a list of installed ModSecurity vendors.""" vendorlist = [] ruleset_file = os.path.join(MODSEC_CONF_DIR, RULESET_FILENAME) if os.path.exists(ruleset_file): # imunify360 ruleset installed with open(ruleset_file) as f: vendorlist.append(f.read()) elif os.path.isdir(CB_MODSEC_CUSTOM_CONF_DIR) and os.listdir( CB_MODSEC_CUSTOM_CONF_DIR ): vendorlist.append("unknown_custom") ruleset = CustomBuildOptions("modsecurity_ruleset").get() if ruleset and ruleset != "no": vendorlist.append(ruleset) return vendorlist @classmethod async def build_vendor_file_path(cls, vendor: str, filename: str) -> Path: return Path(MODSEC_CONF_DIR) / filename @classmethod @skip_if_not_installed_modsec async def _apply_modsec_files_update(cls): installed = await DirectAdminFilesVendorList.install_or_update() # Skip the reload when no vendor was installed (nothing to apply). if installed: # apply() already published the rules atomically into the live # dir; refresh the global-disabled link and reload so the new # rules take effect (standalone callers such as the ruleset # checker rely on this reload happening here). cls._ensure_global_disabled_rules_link_present() await graceful_restart() class DirectAdminFilesVendor(FilesVendor): modsec_interface = DirectAdminModSecurity async def _remove_vendor(self): _tear_down_conf_dir() os.makedirs(CB_MODSEC_CUSTOM_CONF_DIR, exist_ok=True) async def apply(self): # Atomic deploy in two layers so Apache never observes a half-written # rule set on reload: # 1. extract the vendor zip into a versioned staging dir and flip the # custombuild `conf` symlink; # 2. publish that complete set into the live Apache dir # (/etc/modsecurity.d) via a versioned dir + symlink swap, instead # of DirectAdmin's non-atomic delete-then-recopy build. _migrate_conf_to_symlink_if_needed() new_dir = _new_versioned_conf_dir() os.makedirs(new_dir) try: _extract_vendor_zip(self._item["local_path"], new_dir) _drop_rules_conf_main(new_dir) _write_ruleset_marker(new_dir, self._vendor_id()) _swap_conf_symlink_atomic(new_dir) except BaseException: shutil.rmtree(new_dir, ignore_errors=True) raise _cleanup_old_versioned_conf_dirs(keep=new_dir) CustomBuildOptions("modsecurity_ruleset").set("no") _publish_live_modsec_dir_atomic() def _vendor_id(self): basename = os.path.basename(urlparse(self._item["url"]).path) basename_no_zip, _ = os.path.splitext(basename) return basename_no_zip class DirectAdminFilesVendorList(FilesVendorList): files_vendor = DirectAdminFilesVendor modsec_interface = DirectAdminModSecurity @classmethod def vendor_fit_panel(cls, item): return item["name"].endswith("plesk") @classmethod async def _get_compatible_name(cls, installed_vendors): web_server = await get_outer_web_server() if web_server is None: raise cls.CompatiblityCheckFailed( "Imunify360 mod_security vendor does not support" " a running web server" ) return MODSEC_NAME_TEMPLATE.format( ruleset_suffix=cls.get_ruleset_suffix(), webserver=web_server, panel="plesk", )
Kaydet
Ctrl+S ile kaydet