Release 0.9.0

This commit is contained in:
2025-11-21 07:26:02 +01:00
committed by ecv
commit 472f0812e7
240 changed files with 20033 additions and 0 deletions

View File

@@ -0,0 +1,202 @@
from bot.module import Module
from bot import loaded_modules_dict
from bot.logger import get_logger
from bot.constants import (
PERMISSION_LEVEL_DEFAULT,
PERMISSION_LEVEL_BUILDER,
PERMISSION_LEVEL_PLAYER,
is_moderator_or_higher,
is_builder_or_higher
)
logger = get_logger("permissions")
class Permissions(Module):
def __init__(self):
setattr(self, "default_options", {
"module_name": self.get_module_identifier()[7:],
"default_player_password": None
})
setattr(self, "required_modules", [
'module_dom',
'module_players',
'module_locations',
'module_webserver'
])
self.next_cycle = 0
self.run_observer_interval = 5
self.all_available_actions_dict = {}
self.all_available_widgets_dict = {}
Module.__init__(self)
@staticmethod
def get_module_identifier():
return "module_permissions"
# region Standard module stuff
def setup(self, options=dict):
Module.setup(self, options)
# endregion
def start(self):
""" all modules have been loaded and initialized by now. we can bend the rules here."""
self.set_permission_hooks()
self.all_available_actions_dict = self.get_all_available_actions_dict()
self.all_available_widgets_dict = self.get_all_available_widgets_dict()
Module.start(self)
# endregion
# ==================== Permission Check Helpers ====================
@staticmethod
def _is_owner(steamid: str, event_data: list) -> bool:
"""Check if user is the owner of the element being modified."""
return str(steamid) == event_data[1].get("dom_element_owner", "")
@staticmethod
def _check_toggle_flag_permission(permission_level: int, steamid: str, event_data: list) -> bool:
"""Check permission for toggle_enabled_flag action."""
if event_data[0] != "toggle_enabled_flag":
return False
# Builders and below can only edit their own elements
if permission_level >= PERMISSION_LEVEL_BUILDER:
return not Permissions._is_owner(steamid, event_data)
return False
@staticmethod
def _check_widget_options_permission(permission_level: int, event_data: list) -> bool:
"""Check permission for widget options view."""
if not (event_data[0].startswith("toggle_") and event_data[0].endswith("_widget_view")):
return False
if event_data[1].get("action") == "show_options":
# Only moderators and admins can see options
return not is_moderator_or_higher(permission_level)
return False
@staticmethod
def _check_dom_management_permission(permission_level: int, steamid: str, event_data: list) -> bool:
"""Check permissions for DOM management actions."""
action_name = event_data[0]
sub_action = event_data[1].get("action", "")
if action_name not in ["delete", "select"]:
return False
# Select/deselect: builders and below can only modify their own elements
if sub_action in ["select_dom_element", "deselect_dom_element"]:
if permission_level >= PERMISSION_LEVEL_BUILDER:
return not Permissions._is_owner(steamid, event_data)
return False
# Delete: only moderators and admins
if sub_action == "delete_selected_dom_elements":
return permission_level >= PERMISSION_LEVEL_BUILDER
return False
@staticmethod
def _check_players_permission(permission_level: int, event_data: list) -> bool:
"""Check permissions for player management actions."""
if event_data[0] == "kick_player":
# Only builder and above can kick players
return permission_level > PERMISSION_LEVEL_BUILDER
return False
@staticmethod
def _check_locations_permission(permission_level: int, steamid: str, event_data: list) -> bool:
"""Check permissions for location management actions."""
action_name = event_data[0]
sub_action = event_data[1].get("action", "")
if action_name not in ["edit_location", "management_tools", "toggle_locations_widget_view"]:
return False
# Edit/enable/disable: builders and below can only modify their own locations
if sub_action in ["edit_location_entry", "enable_location_entry", "disable_location_entry"]:
if permission_level >= PERMISSION_LEVEL_BUILDER:
return not Permissions._is_owner(steamid, event_data)
return False
# Create new: only players and above
if sub_action == "show_create_new":
return permission_level > PERMISSION_LEVEL_PLAYER
return False
@staticmethod
def _check_telnet_permission(permission_level: int, event_data: list) -> bool:
"""Check permissions for telnet actions."""
if event_data[0] == "shutdown":
# Only moderators and admins can shutdown server
return permission_level >= PERMISSION_LEVEL_BUILDER
return False
# ==================== Main Permission Check ====================
@staticmethod
def trigger_action_with_permission(*args, **kwargs):
"""
Check permissions before triggering an action.
Permissions default to allowed if no specific rule matches.
Module-specific permission checks are delegated to helper methods.
"""
module = args[0]
event_data = kwargs.get("event_data", [])
dispatchers_steamid = kwargs.get("dispatchers_steamid", None)
# Default to allowing action
permission_denied = False
if dispatchers_steamid is not None:
# Get user's permission level
permission_level = int(
module.dom.data.get("module_players", {}).get("admins", {}).get(
dispatchers_steamid, PERMISSION_LEVEL_DEFAULT
)
)
module_identifier = module.get_module_identifier()
# Run permission checks based on action and module
permission_denied = (
Permissions._check_toggle_flag_permission(permission_level, dispatchers_steamid, event_data) or
Permissions._check_widget_options_permission(permission_level, event_data) or
(module_identifier == "module_dom_management" and
Permissions._check_dom_management_permission(permission_level, dispatchers_steamid, event_data)) or
(module_identifier == "module_players" and
Permissions._check_players_permission(permission_level, event_data)) or
(module_identifier == "module_locations" and
Permissions._check_locations_permission(permission_level, dispatchers_steamid, event_data)) or
(module_identifier == "module_telnet" and
Permissions._check_telnet_permission(permission_level, event_data))
)
if permission_denied:
logger.warn("permission_denied",
action=event_data[0],
user=dispatchers_steamid,
permission_level=permission_level)
event_data[1]["has_permission"] = not permission_denied
# Execute the action
return module.trigger_action(module, event_data=event_data, dispatchers_steamid=dispatchers_steamid)
@staticmethod
def template_render_hook_with_permission(*args, **kwargs):
module = args[0]
return module.template_render(*args, **kwargs)
def set_permission_hooks(self):
for identifier, module in loaded_modules_dict.items():
module.trigger_action_hook = self.trigger_action_with_permission
module.template_render_hook = self.template_render_hook_with_permission
loaded_modules_dict[Permissions().get_module_identifier()] = Permissions()

View File

@@ -0,0 +1,75 @@
from bot import loaded_modules_dict
from os import path, pardir
module_name = path.basename(path.normpath(path.join(path.abspath(__file__), pardir, pardir)))
action_name = path.basename(path.abspath(__file__))[:-3]
def main_function(module, event_data, dispatchers_steamid):
event_data[1]["action_identifier"] = action_name
player_steamid = event_data[1].get("player_steamid", None)
dataset = event_data[1].get("dataset", None)
entered_password = event_data[1].get("entered_password", None)
default_player_password = module.default_options.get("default_player_password", None)
if all([
dataset is not None,
player_steamid is not None,
entered_password is not None,
default_player_password is not None
]):
if default_player_password == entered_password:
is_authenticated = True
else:
is_authenticated = False
module.dom.data.upsert({
"module_players": {
"elements": {
dataset: {
player_steamid: {
"is_authenticated": is_authenticated
}
}
}
}
}, dispatchers_steamid=player_steamid)
if is_authenticated is True:
module.callback_success(callback_success, module, event_data, dispatchers_steamid)
return
module.callback_fail(callback_fail, module, event_data, dispatchers_steamid)
def callback_success(module, event_data, dispatchers_steamid, match=None):
player_steamid = event_data[1].get("player_steamid", None)
if player_steamid is not None:
event_data = ['say_to_player', {
'steamid': player_steamid,
'message': '[66FF66]Thank you for playing along[-][FFFFFF], you may now leave the Lobby-area[-]'
}]
module.trigger_action_hook(module.players, event_data=event_data)
def callback_fail(module, event_data, dispatchers_steamid):
player_steamid = event_data[1].get("player_steamid", None)
if player_steamid is not None:
event_data = ['say_to_player', {
'steamid': player_steamid,
'message': '[FF6666]Could not authenticate[-][FFFFFF], wrong password perhaps?[-]'
}]
module.trigger_action_hook(module.players, event_data=event_data)
pass
action_meta = {
"description": "sets a players authenticated flag",
"main_function": main_function,
"callback_success": callback_success,
"callback_fail": callback_fail,
"requires_telnet_connection": False,
"enabled": True
}
loaded_modules_dict["module_" + module_name].register_action(action_name, action_meta)

View File

@@ -0,0 +1,89 @@
from bot import loaded_modules_dict
from os import path, pardir
module_name = path.basename(path.normpath(path.join(path.abspath(__file__), pardir, pardir)))
action_name = path.basename(path.abspath(__file__))[:-3]
def main_function(module, event_data, dispatchers_steamid):
event_data[1]["action_identifier"] = action_name
player_steamid = event_data[1].get("player_steamid", None)
active_dataset = event_data[1].get("dataset", None)
flag_player_to_be_muted = event_data[1].get("is_muted", None)
if all([
active_dataset is not None,
player_steamid is not None,
flag_player_to_be_muted is not None,
]):
player_dict = (
module.dom.data
.get("module_players", {})
.get("elements", {})
.get(active_dataset, {})
.get(player_steamid, {})
)
player_is_currently_muted = player_dict.get("is_muted", False)
if not flag_player_to_be_muted:
default_player_password = module.default_options.get("default_player_password", None)
if player_is_currently_muted or default_player_password is None:
module.callback_success(callback_success, module, event_data, dispatchers_steamid)
return
module.callback_fail(callback_fail, module, event_data, dispatchers_steamid)
def callback_success(module, event_data, dispatchers_steamid, match=None):
player_steamid = event_data[1].get("player_steamid", None)
active_dataset = event_data[1].get("dataset", None)
if player_steamid is not None:
default_player_password = module.default_options.get("default_player_password", None)
if default_player_password is not None:
event_data = ['say_to_player', {
'steamid': player_steamid,
'message': '[66FF66]Free speech[-][FFFFFF], you may now chat. Say hello ^^[-]'
}]
module.trigger_action_hook(module.players, event_data=event_data)
event_data = ['toggle_player_mute', {
'steamid': player_steamid,
'mute_status': False,
'dataset': active_dataset
}]
module.trigger_action_hook(module.players, event_data=event_data)
def callback_fail(module, event_data, dispatchers_steamid):
player_steamid = event_data[1].get("player_steamid", None)
active_dataset = event_data[1].get("dataset", None)
if player_steamid is not None:
default_player_password = module.default_options.get("default_player_password", None)
if default_player_password is not None:
event_data = ['say_to_player', {
'steamid': player_steamid,
'message': '[FF6666]You have been automatically muted[-][FFFFFF], until you have authenticated![-]'
}]
module.trigger_action_hook(module.players, event_data=event_data)
event_data = ['toggle_player_mute', {
'steamid': player_steamid,
'mute_status': True,
'dataset': active_dataset
}]
module.trigger_action_hook(module.players, event_data=event_data)
action_meta = {
"description": "tools to help managing a players ability to chat (and speak?)",
"main_function": main_function,
"callback_success": callback_success,
"callback_fail": callback_fail,
"requires_telnet_connection": False,
"enabled": True
}
loaded_modules_dict["module_" + module_name].register_action(action_name, action_meta)

View File

@@ -0,0 +1,57 @@
from bot import loaded_modules_dict
from bot import telnet_prefixes
from os import path, pardir
import re
module_name = path.basename(path.normpath(path.join(path.abspath(__file__), pardir, pardir)))
trigger_name = path.basename(path.abspath(__file__))[:-3]
def main_function(origin_module, module, regex_result):
command = regex_result.group("command")
steamid = regex_result.group("player_steamid")
result = re.match(r"^.*password\s(?P<password>.*)", command)
if result:
entered_password = result.group("password")
else:
return
event_data = ['set_authentication', {
'dataset': module.dom.data.get("module_game_environment", {}).get("active_dataset", None),
'player_steamid': steamid,
'entered_password': entered_password
}]
module.trigger_action_hook(origin_module, event_data=event_data)
triggers = {
"password": r"\'(?P<player_name>.*)\'\:\s(?P<command>\/password.*)"
}
trigger_meta = {
"description": "validates a players password",
"main_function": main_function,
"triggers": [
{
"identifier": "password (Alloc)",
"regex": (
telnet_prefixes["telnet_log"]["timestamp"] +
telnet_prefixes["Allocs"]["chat"] +
triggers["password"]
),
"callback": main_function
},
{
"identifier": "password (BCM)",
"regex": (
telnet_prefixes["telnet_log"]["timestamp"] +
telnet_prefixes["BCM"]["chat"] +
triggers["password"]
),
"callback": main_function
}
]
}
loaded_modules_dict["module_" + module_name].register_trigger(trigger_name, trigger_meta)

View File

@@ -0,0 +1,68 @@
from bot import loaded_modules_dict
from bot import telnet_prefixes
from os import path, pardir
module_name = path.basename(path.normpath(path.join(path.abspath(__file__), pardir, pardir)))
trigger_name = path.basename(path.abspath(__file__))[:-3]
def main_function(origin_module, module, regex_result):
command = regex_result.group("command")
active_dataset = module.dom.data.get("module_game_environment", {}).get("active_dataset", None)
player_steamid = regex_result.group("player_steamid")
existing_player_dict = (
module.dom.data
.get("module_players", {})
.get("elements", {})
.get(active_dataset, {})
.get(player_steamid, None)
)
if command == "connected":
# we want to mute completely new players and players that are not authenticated on login.
if existing_player_dict is not None:
default_player_password = module.default_options.get("default_player_password", None)
player_is_authenticated = existing_player_dict.get("is_authenticated", False)
if not player_is_authenticated and default_player_password is not None:
is_muted = True
else:
is_muted = False
event_data = ['set_player_mute', {
'dataset': module.dom.data.get("module_game_environment", {}).get("active_dataset", None),
'player_steamid': player_steamid,
'is_muted': is_muted
}]
module.trigger_action_hook(origin_module, event_data=event_data)
trigger_meta = {
"description": "reacts to telnets player discovery messages for real time responses!",
"main_function": main_function,
"triggers": [
{
"regex": (
telnet_prefixes["telnet_log"]["timestamp"] +
r"\[Steamworks.NET\]\s"
r"(?P<command>.*)\s"
r"player:\s(?P<player_name>.*)\s"
r"SteamId:\s(?P<player_steamid>\d+)\s(.*)"
),
"callback": main_function
}, {
"regex": (
telnet_prefixes["telnet_log"]["timestamp"] +
r"Player\s"
r"(?P<command>.*), "
r"entityid=(?P<entity_id>.*), "
r"name=(?P<player_name>.*), "
r"steamid=(?P<player_steamid>.*), "
r"steamOwner=(?P<owner_id>.*), "
r"ip=(?P<player_ip>.*)"
),
"callback": main_function
}
]
}
loaded_modules_dict["module_" + module_name].register_trigger(trigger_name, trigger_meta)

View File

@@ -0,0 +1,43 @@
from bot import loaded_modules_dict
from os import path, pardir
module_name = path.basename(path.normpath(path.join(path.abspath(__file__), pardir, pardir)))
trigger_name = path.basename(path.abspath(__file__))[:-3]
def main_function(*args, **kwargs):
module = args[0]
updated_values_dict = kwargs.get("updated_values_dict", {})
player_steamid = kwargs.get("dispatchers_steamid", None)
is_authenticated = updated_values_dict.get("is_authenticated", None)
try:
if all([
is_authenticated is not None,
player_steamid is not None
]):
event_data = ['set_player_mute', {
'dataset': module.dom.data.get("module_game_environment", {}).get("active_dataset", None),
'player_steamid': player_steamid
}]
if is_authenticated:
event_data[1]["is_muted"] = False
else:
event_data[1]["is_muted"] = True
module.trigger_action_hook(module, event_data=event_data)
except AttributeError:
pass
trigger_meta = {
"description": "reacts to a players authentication change",
"main_function": main_function,
"handlers": {
"module_players/elements/%map_identifier%/%steamid%/is_authenticated": main_function,
}
}
loaded_modules_dict["module_" + module_name].register_trigger(trigger_name, trigger_meta)

View File

@@ -0,0 +1,62 @@
from bot import loaded_modules_dict
from os import path, pardir
module_name = path.basename(path.normpath(path.join(path.abspath(__file__), pardir, pardir)))
trigger_name = path.basename(path.abspath(__file__))[:-3]
def main_function(*args, **kwargs):
module = args[0]
original_values_dict = kwargs.get("original_values_dict", {})
updated_values_dict = kwargs.get("updated_values_dict", {})
dataset = module.dom.data.get("module_game_environment", {}).get("active_dataset", None)
found_lobby = False
for lobby in module.locations.get_elements_by_type("is_lobby"):
lobby_dict = lobby
found_lobby = True
if found_lobby is False:
return
# only dive into this when not authenticated
if original_values_dict.get("is_authenticated", False) is False and any([
original_values_dict.get("pos", {}).get("x") != updated_values_dict.get("pos", {}).get("x"),
original_values_dict.get("pos", {}).get("y") != updated_values_dict.get("pos", {}).get("y"),
original_values_dict.get("pos", {}).get("z") != updated_values_dict.get("pos", {}).get("z")
]):
on_the_move_player_dict = (
module.dom.data
.get("module_players", {})
.get("elements", {})
.get(dataset, {})
.get(updated_values_dict.get("steamid"), {})
)
pos_is_inside_coordinates = module.locations.position_is_inside_boundary(updated_values_dict, lobby_dict)
if pos_is_inside_coordinates is True:
# nothing to do, we are inside the lobby
return
# no early exits, seems like the player is outside an active lobby without any authentication!
# seems like we should port ^^
event_data = ['teleport_to_coordinates', {
'location_coordinates': {
"x": lobby_dict["coordinates"]["x"],
"y": lobby_dict["coordinates"]["y"],
"z": lobby_dict["coordinates"]["z"]
},
'steamid': on_the_move_player_dict.get("steamid")
}]
module.trigger_action_hook(module.locations, event_data=event_data)
trigger_meta = {
"description": "reacts to every players move!",
"main_function": main_function,
"handlers": {
"module_players/elements/%map_identifier%/%steamid%/pos": main_function,
}
}
loaded_modules_dict["module_" + module_name].register_trigger(trigger_name, trigger_meta)