Source code for bgphoria.bgpclient

# -*- coding: utf-8 -*-
#
# This file is part of BGPhoria.
#
# Copyright (C) 2025 Interstellio IO (PTY) LTD.
#
# BGPhoria is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or any later version.
#
# BGPhoria is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with BGPhoria. If not, see https://www.gnu.org/licenses/.
#
import json
import asyncio
import logging
import traceback

from bgphoria.exceptions import BGPConnectionError
from bgphoria.constants import (BGP_STATE,
                                MESSAGE_TYPE,
                                ADD_PATH)
from bgphoria.connection import TCPClient
from bgphoria.messages import (read_message,
                               parse_notification,
                               create_open_message,
                               parse_open_message,
                               create_keepalive_message,
                               parse_update_message)

log = logging.getLogger(__name__)


[docs] class BGPClient(object): # BGP FSM based on IETF RFC4271 # Under-construction / Planning Phase def __init__(self, peer_id, peer_ip, peer_as, local_as, local_bgp_id, update_callback=None, ipv4_unicast=True, ipv6_unicast=False, ipv4_unicast_add_path=None, ipv6_unicast_add_path=None): self.__conn = TCPClient() self.__state = BGP_STATE.IDLE self.__active_keepalive_loop = None self.__active_state_task = None self.__hold_time = 90 self.__peer_id = peer_id self.__peer_ip = peer_ip self.__peer_as = peer_as self.__local_as = local_as self.__local_bgp_id = local_bgp_id self.__update_callback = update_callback self.__ipv4_unicast = ipv4_unicast self.__ipv6_unicast = ipv6_unicast self.__ipv4_unicast_add_path = ipv4_unicast_add_path self.__ipv6_unicast_add_path = ipv6_unicast_add_path self.__session = {"ipv4_unicast": True, "ipv4_unicast_add_path_recv": False, "ipv6_unicast": False, "ipv6_unicast_add_path_recv": False} async def connect(self): asyncio.create_task(self.__start_event()) async def stop(self): if self.__active_state_task: self.__active_state_task.cancel() try: await self.__active_state_task except asyncio.CancelledError: pass self.__state == BGP_STATE.IDLE if self.__active_keepalive_loop: self.__active_keepalive_loop.cancel() try: await self.__active_keepalive_loop except asyncio.CancelledError: pass self._conn.close() @property def state(self): match self.__state: case BGP_STATE.IDLE: return "idle" case BGP_STATE.CONNECT: return "connect" case BGP_STATE.ACTIVE: return "active" case BGP_STATE.OPENSENT: return "opensent" case BGP_STATE.OPENCONFIRM: return "openconfirm" case BGP_STATE.ESTABLISHED: return "established" case _: return "unknown_state" def __log(self, level, message): """Logs messages with neighbor-specific context.""" full_message = ( f"BGP Neigbour: {self.__peer_ip} PEER-AS: {self.__peer_as}" f" ({self.__peer_id}): {message}" ) log.log(level, full_message) async def __start_event(self): if self.__state == BGP_STATE.IDLE: asyncio.create_task(self.__connect_state()) else: self.__log(logging.CRITICAL, "Start Event failed," " not in idle state") async def __fail_event(self, msg): self.__log(logging.ERROR, msg) await self.__conn.close() if self.__active_keepalive_loop: self.__active_keepalive_loop.cancel() try: await self.__active_keepalive_loop except asyncio.CancelledError: pass asyncio.create_task(self.__active_state()) async def __keepalive_loop(self, holdtime): keep_alive_interval = int(holdtime / 3) while 1: try: await asyncio.sleep(keep_alive_interval) except asyncio.CancelledError: break if (self.__conn.connected and self.__state == BGP_STATE.ESTABLISHED): # Send Keepalive message await self.__conn.send(create_keepalive_message()) self.__log(logging.INFO, "Sent KEEPALIVE") else: break async def __process_notification(self, payload): error_code, error_subcode, msg = parse_notification(payload) await self.__fail_event(f"Received Notification message:" f" Error Code {error_code}," f" Subcode {error_subcode}, Message: {msg}") async def __connect_state(self): self.__log(logging.INFO, f"Transitioning from {self.state} to Connect") self.__state = BGP_STATE.CONNECT for i in range(3): try: await self.__conn.connect(self.__peer_ip, 179, timeout=5) self.__conn.timeout(self.__hold_time) asyncio.create_task(self.__open_sent_state()) break except BGPConnectionError as e: self.__log(logging.ERROR, str(e)) if i == 2: self.__log(logging.INFO, "Transitioning from" " Connect to Active") asyncio.create_task(self.__active_state()) break async def __active_state(self): self.__log(logging.INFO, f"Transitioning from {self.state} to Active") self.__state = BGP_STATE.ACTIVE # Sleep 5 seconds. await asyncio.sleep(5) asyncio.create_task(self.__connect_state()) async def __open_sent_state(self): self.__log(logging.INFO, f"Transitioning from {self.state}" "to OpenSent") self.__state = BGP_STATE.OPENSENT # Send Open Message msg = create_open_message( self.__local_as, self.__hold_time, self.__local_bgp_id, ipv4_unicast=self.__ipv4_unicast, ipv6_unicast=self.__ipv6_unicast, ipv4_unicast_add_path=self.__ipv4_unicast_add_path, ipv6_unicast_add_path=self.__ipv6_unicast_add_path) try: await self.__conn.send(msg) except BGPConnectionError as e: await self.__fail_event(str(e)) return # short circuit # Receive Open Message try: length, message_type, payload = await read_message(self.__conn) except BGPConnectionError as e: await self.__fail_event(str(e)) return # short circuit if message_type == MESSAGE_TYPE.OPEN.value: self.__peer = parse_open_message(payload) # Set Socket Timeout to Hold-Time from peer in open message. self.__conn.settimeout(self.__peer['hold_time']) if (self.__ipv4_unicast and self.__peer['capabilities']['ipv4_unicast']): self.__session['ipv4_unicast'] = True if (self.__ipv4_unicast_add_path in (ADD_PATH.RECEIVE_ONLY, ADD_PATH.SEND_AND_RECEIVE) and (self.__peer['capabilities'] ['ipv4_unicast_add_path_send'])): self.__session['ipv4_unicast_add_path_recv'] = True else: self.__session['ipv4_unicast_add_path_recv'] = False else: self.__session['ipv4_unicast'] = False if (self.__ipv6_unicast and self.__peer['capabilities']['ipv6_unicast']): self.__session['ipv6_unicast'] = True if (self.__ipv6_unicast_add_path in (ADD_PATH.RECEIVE_ONLY, ADD_PATH.SEND_AND_RECEIVE) and (self.__peer['capabilities'] ['ipv6_unicast_add_path_send'])): self.__session['ipv6_unicast_add_path_recv'] = True else: self.__session['ipv6_unicast_add_path_recv'] = False else: self.__session['ipv6_unicast'] = False asyncio.create_task(self.__open_confirm_state()) elif message_type == MESSAGE_TYPE.NOTIFICATION.value: await self.__process_notification(payload) else: await self.__fail_event("Unexpected message type" " during session setup") async def __open_confirm_state(self): self.__log(logging.INFO, f"Transitioning from {self.state}" " to OpenConfirm") self.__state = BGP_STATE.OPENCONFIRM # Send Keepalive. try: await self.__conn.send(create_keepalive_message()) except BGPConnectionError as e: await self.__fail_event(f"Sending Keepalive {str(e)}") return # short circuit # Receive Keepalive. try: length, message_type, payload = await read_message(self.__conn) except BGPConnectionError as e: await self.__fail_event(f"Receive Keepalive {str(e)}") return # short circuit if message_type == MESSAGE_TYPE.KEEPALIVE.value: asyncio.create_task(self.__established_state()) elif message_type == MESSAGE_TYPE.NOTIFICATION.value: await self.__process_notification(payload) else: await self.__fail_event(f"Unexpected message type {message_type}" " during OpenConfirm received") async def __process_update(self, payload): try: update = parse_update_message( self.__peer_id, self.__peer_ip, self.__peer_as, payload, as4byte=self.__peer['capabilities']['asn_4byte'], ipv4_unicast=self.__session['ipv4_unicast'], ipv6_unicast=self.__session['ipv6_unicast'], ipv4_unicast_add_path=(self.__session ['ipv4_unicast_add_path_recv']), ipv6_unicast_add_path=(self.__session ['ipv6_unicast_add_path_recv'])) if self.__update_callback: self.__update_callback(update) else: self.__log( logging.INFO, "\n" + json.dumps( {"afi": update["afi"], "afi_name": update["afi_name"], "safi": update["safi"], "safi_name": update["safi_name"], "widthdrawn_routes": update["withdrawn_routes"], "as_path": update["as_path"], "next_hop": update["next_hop"], "communities": update["communities"], "ext_communities": update["ext_communities"], "large_communities": update["large_communities"], "multi_exit_disc": update["multi_exit_disc"], "local_pref": update["local_pref"], "nlri": update["nlri"]}, indent=4)) except Exception as e: trace = str(traceback.format_exc()) self.__log(logging.ERROR, f"Error: {e}\n{trace}") async def __established_state(self): self.__log(logging.INFO, f"Transitioning from {self.state}" " to Established") self.__state = BGP_STATE.ESTABLISHED self.__active_keepalive_loop = \ asyncio.create_task( self.__keepalive_loop(self.__peer['hold_time'])) # Receive and process messages while (self.__conn.connected and self.__state == BGP_STATE.ESTABLISHED): try: length, message_type, payload = await read_message(self.__conn) except BGPConnectionError as e: await self.__fail_event(f"{str(e)}") if message_type == MESSAGE_TYPE.UPDATE.value: # Process update... await self.__process_update(payload) continue elif message_type == MESSAGE_TYPE.NOTIFICATION.value: await self.__process_notification(payload) break elif message_type == MESSAGE_TYPE.KEEPALIVE.value: self.__log(logging.INFO, "Received keep-alive") else: self.__log(logging.ERROR, f"Received unhandled message type: {message_type}")