implement ssh key support

This commit is contained in:
Micky 2024-12-26 13:46:44 +11:00
parent 2f155ec1e4
commit 106c3c8cd8
3 changed files with 80 additions and 64 deletions

77
app/functions/netmiko.py Normal file
View File

@ -0,0 +1,77 @@
import logging, os
from netmiko import ConnectHandler, NetmikoTimeoutException, NetmikoAuthenticationException
logger = logging.getLogger(__name__)
# Establish connection to network device
def establish_connection(device_config):
try:
return ConnectHandler(**device_config)
except Exception as e:
logger.error(f"Failed to establish connection to {device_config['host']}: {e}")
raise
# Execute command on network device
def execute_command(device, command_format, target, ip_version):
device_config = {
'device_type': device['type'],
'host': device['host'],
'port': device['port'],
'username': device['username'],
'timeout': 10,
'session_timeout': 60,
'conn_timeout': 10,
'auth_timeout': 10,
}
# Use SSH key if provided
if "ssh_key" in device:
key_path = os.path.join("/instance/ssh-keys", device['ssh_key'])
if not os.path.exists(key_path):
logger.error(f"SSH file not found: {key_path} for {device['host']}")
return {'error': True, 'message': 'Authentication failed'}
device_config['use_keys'] = True
device_config['key_file'] = os.path.join("/instance/ssh-keys", device['ssh_key'])
else:
device_config['password'] = device['password']
command_timeout = 30
try:
with establish_connection(device_config) as connection:
# Format the command
command = str(command_format.format(ip_version=ip_version, target=target).strip())
# Execute the command
output = connection.send_command(
command,
read_timeout=command_timeout,
strip_command=True,
strip_prompt=True,
max_loops=int(command_timeout * 10)
)
# Clean output
output = output.strip() if output else ""
if not output:
logger.error(f"No response from {device['host']}")
return {'error': True, 'message': 'No response from device'}
return {'error': False, 'message': output}
except NetmikoTimeoutException as e:
logger.error(f"Timeout error on {device['host']}: {e}")
return {'error': True, 'message': 'Timeout error'}
except NetmikoAuthenticationException as e:
logger.error(f"Authentication failed for {device['host']}: {e}")
return {'error': True, 'message': 'Authentication failed'}
except Exception as e:
logger.error(f"An unexpected error occurred on {device['host']}: {e}")
return {'error': True, 'message': 'Unexpected error'}

View File

@ -1,6 +1,5 @@
import os, yaml, logging, requests import os, yaml, logging, requests
from functools import wraps from functools import wraps
from netmiko import ConnectHandler, NetmikoTimeoutException, NetmikoAuthenticationException
from flask import request from flask import request
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -70,65 +69,4 @@ def send_webhook(webhook_url, text_data):
def get_client_ip(): def get_client_ip():
if not request.headers.getlist("X-Forwarded-For"): if not request.headers.getlist("X-Forwarded-For"):
return request.remote_addr return request.remote_addr
return request.headers.getlist("X-Forwarded-For")[0] return request.headers.getlist("X-Forwarded-For")[0]
# Establish connection to network device
def establish_connection(device_config):
try:
return ConnectHandler(**device_config)
except Exception as e:
logger.error(f"Failed to establish connection to {device_config['host']}: {e}")
raise
# Execute command on network device
def execute_command(device, command_format, target, ip_version):
device_config = {
'device_type': device['type'],
'host': device['host'],
'port': device['port'],
'username': device['username'],
'password': device['password'],
'timeout': 10,
'session_timeout': 60,
'conn_timeout': 10,
'auth_timeout': 10,
}
command_timeout = 30
try:
with establish_connection(device_config) as connection:
# Format the command
command = str(command_format.format(ip_version=ip_version, target=target).strip())
# Execute the command
output = connection.send_command(
command,
read_timeout=command_timeout,
strip_command=True,
strip_prompt=True,
max_loops=int(command_timeout * 10)
)
# Clean output
output = output.strip() if output else ""
if not output:
logger.error(f"No response from {device['host']}")
return {'error': True, 'message': 'No response from device'}
return {'error': False, 'message': output}
except NetmikoTimeoutException as e:
logger.error(f"Timeout error on {device['host']}: {e}")
return {'error': True, 'message': 'Timeout error'}
except NetmikoAuthenticationException as e:
logger.error(f"Authentication failed for {device['host']}: {e}")
return {'error': True, 'message': 'Authentication failed'}
except Exception as e:
logger.error(f"An unexpected error occurred on {device['host']}: {e}")
return {'error': True, 'message': 'Unexpected error'}

View File

@ -2,7 +2,8 @@ import logging
from flask import Blueprint, request, render_template from flask import Blueprint, request, render_template
from app.functions.utils import exception_handler, load_yaml, send_webhook, get_client_ip, execute_command from app.functions.utils import exception_handler, load_yaml, send_webhook, get_client_ip
from app.functions.netmiko import execute_command
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)