467 lines
18 KiB
Python
467 lines
18 KiB
Python
import re
|
|
from bot.module import Module
|
|
from bot import loaded_modules_dict
|
|
from bot.logger import get_logger
|
|
from bot.constants import TELNET_TIMEOUT_NORMAL, TELNET_TIMEOUT_RECONNECT
|
|
from time import time
|
|
from collections import deque
|
|
import telnetlib
|
|
|
|
logger = get_logger("telnet")
|
|
|
|
|
|
class Telnet(Module):
|
|
tn = object
|
|
|
|
telnet_buffer = str
|
|
valid_telnet_lines = deque
|
|
|
|
telnet_lines_to_process = deque
|
|
telnet_command_queue = deque
|
|
|
|
def __init__(self):
|
|
self.telnet_command_queue = deque()
|
|
setattr(self, "default_options", {
|
|
"module_name": self.get_module_identifier()[7:],
|
|
"host": "127.0.0.1",
|
|
"port": 8081,
|
|
"password": "thisissecret",
|
|
"web_username": "",
|
|
"web_password": "",
|
|
"max_queue_length": 100,
|
|
"run_observer_interval": 3,
|
|
"run_observer_interval_idle": 10,
|
|
"max_telnet_buffer": 16384,
|
|
"max_command_queue_execution": 6,
|
|
"match_types_generic": {
|
|
'log_start': [
|
|
r"\A(?P<datetime>\d{4}.+?)\s(?P<gametime_in_seconds>.+?)\sINF .*",
|
|
r"\ATime:\s(?P<servertime_in_minutes>.*)m\s",
|
|
],
|
|
'log_end': [
|
|
r"\r\n$",
|
|
r"\sby\sTelnet\sfrom\s(.*)\:(\d.*)\s*$"
|
|
]
|
|
}
|
|
})
|
|
setattr(self, "required_modules", [
|
|
"module_dom",
|
|
"module_webserver"
|
|
])
|
|
|
|
self.next_cycle = 0
|
|
self.telnet_response = ""
|
|
|
|
Module.__init__(self)
|
|
|
|
@staticmethod
|
|
def get_module_identifier():
|
|
return "module_telnet"
|
|
|
|
def on_socket_connect(self, steamid):
|
|
Module.on_socket_connect(self, steamid)
|
|
|
|
def on_socket_disconnect(self, steamid):
|
|
Module.on_socket_disconnect(self, steamid)
|
|
|
|
# region Standard module stuff
|
|
def setup(self, options=dict):
|
|
Module.setup(self, options)
|
|
|
|
self.telnet_lines_to_process = deque(maxlen=self.options["max_queue_length"])
|
|
self.valid_telnet_lines = deque(maxlen=self.options["max_queue_length"])
|
|
self.run_observer_interval = self.options.get(
|
|
"run_observer_interval", self.default_options.get("run_observer_interval", None)
|
|
)
|
|
self.run_observer_interval_idle = self.options.get(
|
|
"run_observer_interval_idle", self.default_options.get("run_observer_interval_idle", None)
|
|
)
|
|
self.max_command_queue_execution = self.options.get(
|
|
"max_command_queue_execution", self.default_options.get("max_command_queue_execution", None)
|
|
)
|
|
self.telnet_buffer = ""
|
|
|
|
self.last_execution_time = 0.0
|
|
|
|
setattr(self, "last_connection_loss", None)
|
|
setattr(self, "recent_telnet_response", None)
|
|
# endregion
|
|
|
|
# region Handling telnet initialization and authentication
|
|
def setup_telnet(self):
|
|
try:
|
|
connection = telnetlib.Telnet(
|
|
self.options.get("host"),
|
|
self.options.get("port"),
|
|
timeout=TELNET_TIMEOUT_NORMAL
|
|
)
|
|
self.tn = self.authenticate(connection, self.options.get("password"))
|
|
except Exception as error:
|
|
logger.error("telnet_connection_failed",
|
|
host=self.options.get("host"),
|
|
port=self.options.get("port"),
|
|
error=str(error),
|
|
error_type=type(error).__name__)
|
|
raise IOError
|
|
|
|
return True
|
|
|
|
def authenticate(self, connection, password):
|
|
try:
|
|
# Waiting for the prompt.
|
|
found_prompt = False
|
|
while found_prompt is not True:
|
|
telnet_response = connection.read_until(b"\r\n", timeout=TELNET_TIMEOUT_NORMAL).decode("utf-8")
|
|
if re.match(r"Please enter password:\r\n", telnet_response):
|
|
found_prompt = True
|
|
else:
|
|
raise IOError
|
|
|
|
# Sending password.
|
|
full_auth_response = ''
|
|
authenticated = False
|
|
connection.write(password.encode('ascii') + b"\r\n")
|
|
while authenticated is not True: # loop until authenticated, it's required
|
|
telnet_response = connection.read_until(b"\r\n").decode("utf-8")
|
|
full_auth_response += telnet_response.rstrip()
|
|
# last 'welcome' line from the games telnet. it might change with a new game-version
|
|
if re.match(r"Password incorrect, please enter password:\r\n", telnet_response) is not None:
|
|
logger.error("telnet_auth_failed",
|
|
host=self.options.get("host"),
|
|
port=self.options.get("port"),
|
|
reason="incorrect password")
|
|
raise ValueError
|
|
if re.match(r"Logon successful.\r\n", telnet_response) is not None:
|
|
authenticated = True
|
|
|
|
# Waiting for banner.
|
|
full_banner = ''
|
|
displayed_welcome = False
|
|
while displayed_welcome is not True: # loop until ready, it's required
|
|
telnet_response = connection.read_until(b"\r\n").decode("utf-8")
|
|
full_banner += telnet_response.rstrip("\r\n")
|
|
if re.match(
|
|
r"Press 'help' to get a list of all commands. Press 'exit' to end session.",
|
|
telnet_response
|
|
):
|
|
displayed_welcome = True
|
|
|
|
except Exception as e:
|
|
raise IOError
|
|
|
|
# Connection successful - no log needed
|
|
return connection
|
|
# endregion
|
|
|
|
# region handling and preparing telnet-lines
|
|
def is_a_valid_line(self, telnet_line):
|
|
telnet_response_is_a_valid_line = False
|
|
if self.has_valid_start(telnet_line) and self.has_valid_end(telnet_line):
|
|
telnet_response_is_a_valid_line = True
|
|
|
|
return telnet_response_is_a_valid_line
|
|
|
|
def has_valid_start(self, telnet_response):
|
|
telnet_response_has_valid_start = False
|
|
for match_type in self.options.get("match_types_generic").get("log_start"):
|
|
if re.match(match_type, telnet_response):
|
|
telnet_response_has_valid_start = True
|
|
|
|
return telnet_response_has_valid_start
|
|
|
|
def has_valid_end(self, telnet_response):
|
|
telnet_response_has_valid_end = False
|
|
for match_type in self.options.get("match_types_generic").get("log_end"):
|
|
if re.search(match_type, telnet_response):
|
|
telnet_response_has_valid_end = True
|
|
|
|
return telnet_response_has_valid_end
|
|
|
|
def has_mutliple_lines(self, telnet_response):
|
|
telnet_response_has_multiple_lines = False
|
|
telnet_response_count = telnet_response.count(b"\r\n")
|
|
if telnet_response_count >= 1:
|
|
telnet_response_has_multiple_lines = telnet_response_count
|
|
|
|
return telnet_response_has_multiple_lines
|
|
|
|
@staticmethod
|
|
def extract_lines(telnet_response):
|
|
return [telnet_line for telnet_line in telnet_response.splitlines(True)]
|
|
|
|
def get_a_bunch_of_lines_from_queue(self, this_many_lines):
|
|
telnet_lines = []
|
|
current_queue_length = 0
|
|
done = False
|
|
while (current_queue_length < this_many_lines) and not done:
|
|
try:
|
|
telnet_lines.append(self.telnet_lines_to_process.popleft())
|
|
current_queue_length += 1
|
|
except IndexError:
|
|
done = True
|
|
|
|
if len(telnet_lines) >= 1:
|
|
return telnet_lines
|
|
else:
|
|
return []
|
|
|
|
def add_telnet_command_to_queue(self, command):
|
|
if command not in self.telnet_command_queue:
|
|
self.telnet_command_queue.appendleft(command)
|
|
return True
|
|
|
|
return False
|
|
|
|
def execute_telnet_command_queue(self, this_many_lines):
|
|
telnet_command_list = []
|
|
current_queue_length = 0
|
|
done = False
|
|
initial_queue_length = len(self.telnet_command_queue)
|
|
while (current_queue_length < this_many_lines) and not done:
|
|
try:
|
|
telnet_command_list.append(self.telnet_command_queue.popleft())
|
|
current_queue_length += 1
|
|
except IndexError:
|
|
done = True
|
|
|
|
remaining_queue_length = len(self.telnet_command_queue)
|
|
# print(initial_queue_length, ":", remaining_queue_length)
|
|
|
|
for telnet_command in reversed(telnet_command_list):
|
|
command = "{command}{line_end}".format(command=telnet_command, line_end="\r\n")
|
|
|
|
try:
|
|
self.tn.write(command.encode('ascii'))
|
|
|
|
except Exception as error:
|
|
logger.error("telnet_command_send_failed",
|
|
command=telnet_command,
|
|
error=str(error),
|
|
error_type=type(error).__name__,
|
|
queue_size=remaining_queue_length)
|
|
# endregion
|
|
|
|
# ==================== Line Processing Helper Methods ====================
|
|
|
|
def _should_exclude_from_logs(self, telnet_line: str) -> bool:
|
|
"""Check if a telnet line should be excluded from logs."""
|
|
elements_excluded_from_logs = [
|
|
"'lp'", "'gettime'", "'listents'", # system calls
|
|
"INF Time: ", "SleeperVolume", " killed by " # irrelevant lines for now
|
|
]
|
|
return any(exclude in telnet_line for exclude in elements_excluded_from_logs)
|
|
|
|
def _store_valid_line(self, valid_line: str) -> None:
|
|
"""Store a valid telnet line in DOM."""
|
|
# Store in DOM if clients are connected and line is relevant
|
|
if not self._should_exclude_from_logs(valid_line):
|
|
if len(self.webserver.connected_clients) >= 1:
|
|
self.dom.data.append({
|
|
self.get_module_identifier(): {
|
|
"telnet_lines": valid_line
|
|
}
|
|
}, maxlen=150)
|
|
|
|
# Debug log only (disabled by default to avoid spam)
|
|
# Uncomment next line and enable debug logging if needed for troubleshooting
|
|
# logger.debug("telnet_line_received", line=valid_line[:100])
|
|
|
|
self.valid_telnet_lines.append(valid_line)
|
|
|
|
def _process_first_component(self, component: str) -> str:
|
|
"""
|
|
Process the first component of telnet response.
|
|
|
|
This might be the remainder of the previous run combined with new data.
|
|
Returns the validated line or None.
|
|
"""
|
|
if self.recent_telnet_response is not None:
|
|
# Try to combine with previous incomplete response
|
|
combined_line = f"{self.recent_telnet_response}{component}"
|
|
if self.is_a_valid_line(combined_line):
|
|
self.recent_telnet_response = None
|
|
return combined_line.rstrip("\r\n")
|
|
else:
|
|
# Combined line still doesn't make sense
|
|
stripped = combined_line.rstrip("\r\n")
|
|
logger.warn("telnet_invalid_line_combined", line=stripped)
|
|
self.recent_telnet_response = None
|
|
return None
|
|
else:
|
|
# No previous response - check if this is an incomplete line to store
|
|
if self.has_valid_start(component):
|
|
self.recent_telnet_response = component
|
|
else:
|
|
# Invalid start - warn if not empty
|
|
stripped = component.rstrip("\r\n")
|
|
if len(stripped) != 0:
|
|
logger.warn("telnet_invalid_line_start", line=stripped)
|
|
return None
|
|
|
|
def _process_last_component(self, component: str) -> str:
|
|
"""
|
|
Process the last component of telnet response.
|
|
|
|
This might be the start of the next run.
|
|
Returns the validated line or None.
|
|
"""
|
|
if self.has_valid_start(component):
|
|
# Store for next run
|
|
self.recent_telnet_response = component
|
|
# else: part of a telnet-command response, ignore
|
|
return None
|
|
|
|
def _process_middle_component(self, component: str) -> str:
|
|
"""
|
|
Process a middle component (neither first nor last).
|
|
|
|
These are usually incomplete lines or command responses.
|
|
Returns None as these are typically not valid complete lines.
|
|
"""
|
|
# Middle components are usually fragmented, ignore them
|
|
return None
|
|
|
|
def _process_line_component(
|
|
self,
|
|
component: str,
|
|
component_index: int,
|
|
total_components: int
|
|
) -> str:
|
|
"""
|
|
Process a single component of the telnet response.
|
|
|
|
Args:
|
|
component: The line component to process
|
|
component_index: 1-based index of this component
|
|
total_components: Total number of components
|
|
|
|
Returns:
|
|
Validated telnet line or None
|
|
"""
|
|
# Check if it's a complete, valid line first
|
|
if self.is_a_valid_line(component):
|
|
return component.rstrip("\r\n")
|
|
|
|
# Handle incomplete lines based on position
|
|
is_first = (component_index == 1)
|
|
is_last = (component_index == total_components)
|
|
is_single = (total_components == 1)
|
|
|
|
if is_first and is_single:
|
|
# Single incomplete component - special handling
|
|
return self._process_first_component(component)
|
|
elif is_first:
|
|
# First of multiple - might combine with previous
|
|
return self._process_first_component(component)
|
|
elif is_last:
|
|
# Last component - might be start of next
|
|
return self._process_last_component(component)
|
|
else:
|
|
# Middle component - usually fragmented
|
|
return self._process_middle_component(component)
|
|
|
|
def _process_telnet_response_lines(self) -> None:
|
|
"""
|
|
Process telnet response and extract valid lines.
|
|
|
|
Handles line fragmentation across multiple reads and stores
|
|
valid lines for further processing.
|
|
"""
|
|
telnet_response_components = self.extract_lines(self.telnet_response)
|
|
total_components = len(telnet_response_components)
|
|
|
|
for index, component in enumerate(telnet_response_components, start=1):
|
|
valid_line = self._process_line_component(
|
|
component,
|
|
index,
|
|
total_components
|
|
)
|
|
|
|
if valid_line is not None:
|
|
self.telnet_lines_to_process.append(valid_line)
|
|
self._store_valid_line(valid_line)
|
|
|
|
def _handle_connection_error(self, error: Exception) -> None:
|
|
"""Handle telnet connection errors and attempt reconnection."""
|
|
try:
|
|
self.setup_telnet()
|
|
self.dom.data.upsert({
|
|
self.get_module_identifier(): {
|
|
"server_is_online": True
|
|
}
|
|
})
|
|
except (OSError, Exception, ConnectionRefusedError) as error:
|
|
self.dom.data.upsert({
|
|
self.get_module_identifier(): {
|
|
"server_is_online": False
|
|
}
|
|
})
|
|
self.telnet_buffer = ""
|
|
self.telnet_response = ""
|
|
|
|
# Only log on first connection loss, not on every retry
|
|
if self.last_connection_loss is None:
|
|
logger.error("telnet_server_unreachable",
|
|
host=self.options.get("host"),
|
|
port=self.options.get("port"),
|
|
error=str(error),
|
|
error_type=type(error).__name__,
|
|
note="will retry every 10 seconds")
|
|
|
|
self.last_connection_loss = time()
|
|
|
|
def _update_telnet_buffer(self) -> None:
|
|
"""Update the telnet buffer with new response data."""
|
|
self.telnet_buffer += self.telnet_response.lstrip()
|
|
max_telnet_buffer = self.options.get(
|
|
"max_telnet_buffer",
|
|
self.default_options.get("max_telnet_buffer", 12288)
|
|
)
|
|
# Trim buffer to max size
|
|
self.telnet_buffer = self.telnet_buffer[-max_telnet_buffer:]
|
|
|
|
# Expose buffer to other modules via DOM
|
|
self.dom.data.upsert({
|
|
self.get_module_identifier(): {
|
|
"telnet_buffer": self.telnet_buffer
|
|
}
|
|
})
|
|
|
|
# ==================== Main Run Loop ====================
|
|
|
|
def run(self):
|
|
while not self.stopped.wait(self.next_cycle):
|
|
profile_start = time()
|
|
|
|
# Throttle connection attempts: only try if connected or timeout passed since last failure
|
|
can_attempt_connection = (
|
|
self.last_connection_loss is None or
|
|
profile_start > self.last_connection_loss + TELNET_TIMEOUT_RECONNECT
|
|
)
|
|
|
|
if can_attempt_connection:
|
|
try:
|
|
self.telnet_response = self.tn.read_very_eager().decode("utf-8")
|
|
except (AttributeError, EOFError, ConnectionAbortedError, ConnectionResetError) as error:
|
|
self._handle_connection_error(error)
|
|
except Exception as error:
|
|
logger.error("telnet_unforeseen_error",
|
|
error=str(error),
|
|
error_type=type(error).__name__,
|
|
host=self.options.get("host"),
|
|
port=self.options.get("port"))
|
|
|
|
# Process any telnet response data
|
|
if len(self.telnet_response) > 0:
|
|
self._update_telnet_buffer()
|
|
self._process_telnet_response_lines()
|
|
|
|
if self.dom.data.get(self.get_module_identifier()).get("server_is_online") is True:
|
|
self.execute_telnet_command_queue(self.max_command_queue_execution)
|
|
|
|
self.last_execution_time = time() - profile_start
|
|
self.next_cycle = self.run_observer_interval - self.last_execution_time
|
|
|
|
|
|
loaded_modules_dict[Telnet().get_module_identifier()] = Telnet()
|