whitelist_rbl.py
| 7.4 KB | Satir:
0
| py
Geri
""" This module provides functions for exporting whitelist for Real-time Blackhole List (RBL). """ import itertools import ipaddress import logging import os import time from pathlib import Path from typing import Optional from defence360agent.contracts.config import bool_from_envvar, int_from_envvar from defence360agent.subsys import persistent_state from defence360agent.subsys.panels.base import PanelException from defence360agent.utils import ( COPY_TO_MODSEC_MAXTRIES, check_run, CheckRunError, log_failed_to_copy_to_modsec, recurring_check, retry_on, ) from defence360agent.subsys.web_server import safe_update_config from im360.subsys.int_config import generic_panel_uses_coraza from im360.subsys.panels.base import use_modsec_lock from im360.subsys.panels.hosting_panel import HostingPanel from im360.model.global_whitelist import GlobalWhitelist from im360.model.custom_lists import CustomWhitelist from im360.internals.core.ipset.ip import IPSetWhiteFullAccess, IPSetWhite logger = logging.getLogger(__name__) async def reload_wafd(): if Path("/usr/bin/imunify360-wsctl").is_file(): args = ["imunify360-wsctl", "reload"] else: args = ["systemctl", "reload", "imunify360-wafd"] try: await check_run(args) except CheckRunError: logger.warning("Failed to reload 'imunify360-wafd'") #: how often to check the rbl_whitelist file POLLING_PERIOD = 60 # seconds #: default minimum seconds between Apache/LiteSpeed reloads triggered by #: create_rbl_whitelist; tunable via IM360_RBL_RELOAD_MIN_PERIOD (<=0 disables) DEFAULT_RBL_RELOAD_MIN_PERIOD = 600 _PERSISTENT_STATE_NAME = "whitelist_rbl" _LAST_RELOAD_TS_KEY = "last_reload_ts" async def _get_whitelists_data(): global_white_list = await GlobalWhitelist.load() full_access_white_list = ( item["ip"] for item in IPSetWhiteFullAccess().query_all() ) # ignore "whitelisted by passing captcha" ips (DEF-13665) manual_white_list = IPSetWhite().get_non_captcha_passed_ips() custom_white_list = await CustomWhitelist.load() return itertools.chain( global_white_list, full_access_white_list, manual_white_list, custom_white_list, ) def _rbl_reload_throttled(rbl_whitelist_path) -> bool: """Whether to skip the rebuild within the persisted reload window. Persisted (not in-memory) to survive socket-activated restarts that reset the in-process coalescer; an empty/missing file or a bad env/state value never throttles (fail open), so updates are never blocked. """ try: min_period = int_from_envvar( "IM360_RBL_RELOAD_MIN_PERIOD", DEFAULT_RBL_RELOAD_MIN_PERIOD ) except ValueError as e: logger.warning( "%s; using default %ds", e, DEFAULT_RBL_RELOAD_MIN_PERIOD ) min_period = DEFAULT_RBL_RELOAD_MIN_PERIOD if min_period <= 0: return False try: if os.path.getsize(str(rbl_whitelist_path)) == 0: return False last_reload_ts = float( persistent_state.load_state(_PERSISTENT_STATE_NAME).get( _LAST_RELOAD_TS_KEY, 0 ) ) except (OSError, TypeError, ValueError, AttributeError): return False elapsed = time.time() - last_reload_ts if elapsed < min_period: logger.info( "RBL whitelist reload throttled: %ds since last reload < %ds," " skipping rebuild", int(elapsed), min_period, ) return True return False @use_modsec_lock @retry_on( FileNotFoundError, max_tries=COPY_TO_MODSEC_MAXTRIES, on_error=log_failed_to_copy_to_modsec, silent=True, ) async def create_rbl_whitelist(): # IM360_RBL_RELOAD_DISABLED: freeze rbl_whitelist (no write, no reload). if bool_from_envvar("IM360_RBL_RELOAD_DISABLED", False): logger.info( "create_rbl_whitelist skipped: IM360_RBL_RELOAD_DISABLED is set" ) return rbl_whitelist_path = await _get_rbl_whitelist_path() if not rbl_whitelist_path: return # String compare avoids importing cPanelCoraza (circular dependency). use_coraza = ( generic_panel_uses_coraza() or HostingPanel().__class__.__name__ == "cPanelCoraza" ) if not use_coraza and _rbl_reload_throttled(rbl_whitelist_path): return whitelist_chain = await _get_whitelists_data() whitelist_chain = _convert_ip_addresses(whitelist_chain) new_whitelist = list(whitelist_chain) current_whitelist = _read_whitelist_from_file(rbl_whitelist_path) if set(new_whitelist) != set(current_whitelist): logger.info("Create RBL whitelist: %s", rbl_whitelist_path) text = "\n".join(sorted(new_whitelist)) if await safe_update_config(rbl_whitelist_path, text): logger.info("RBL whitelist was successfully updated") if use_coraza: logger.info( "Reloading 'imunify360-wafd' as coraza ruleset is in" " action" ) await reload_wafd() else: persistent_state.save_state( _PERSISTENT_STATE_NAME, {_LAST_RELOAD_TS_KEY: time.time()}, ) else: logger.info("No changes in RBL whitelist, no restart required") @recurring_check(POLLING_PERIOD) async def ensure_rbl_whitelist(): """Make sure rbl_whitelist is not empty.""" rbl_whitelist_path = await _get_rbl_whitelist_path() if not rbl_whitelist_path: return # do nothing at this time try: empty = not os.path.getsize(str(rbl_whitelist_path)) except FileNotFoundError: return else: if empty: await create_rbl_whitelist() # recreate def _convert_ip_addresses(iterable): for ip in iterable: ip = ipaddress.ip_network(ip) # ModSec @ipMatchFromFile refuses prefix length 0 and the whole # rule file fails to load, taking Apache down on reload. if ip.prefixlen == 0: logger.warning( "Skipping wildcard whitelist entry %s in rbl_whitelist", ip ) continue # RBL whitelist can't handle /32 nets. # we need to convert /32 nets to ips if ip.num_addresses == 1: ip = ipaddress.ip_address(ip.network_address) yield str(ip) async def _get_rbl_whitelist_path() -> Optional[Path]: """RBL whitelist stored in ModSec ruleset directory, returns Path for RBL whitelist file, or None if panel errors, or modsec rulest dir doesn't exists. """ try: rbl_whitelist_path = await HostingPanel().get_rbl_whitelist_path() except PanelException as e: logging.warning("Can't create rbl whitelist: %s", e) return None if not rbl_whitelist_path: logger.info("RBL whitelist path is undefined. Creation skipped") return rbl_whitelist_path def _read_whitelist_from_file(rbl_whitelist_path): logger.info("Read RBL whitelist: %s", rbl_whitelist_path) try: # `rbl_whitelist_path.open()` does not have to raise FileNotFoundError, # it might be, for example, OSError in the case of `py.path.local` # with open(str(rbl_whitelist_path), "r") as f: yield from map(str.strip, f) except FileNotFoundError: logger.info("RBL whitelist doest not exist: %s", rbl_whitelist_path)
Kaydet
Ctrl+S ile kaydet