Source code for pybatfish.datamodel.route

# coding=utf-8
#   Copyright 2018 The Batfish Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
import json
from abc import ABCMeta, abstractmethod
from typing import Any, Dict, Iterable, List, Optional, Text  # noqa: F401

import attr

from pybatfish.datamodel.primitives import DataModelElement
from pybatfish.util import escape_html, escape_name

__all__ = [
    "BgpRoute",
    "BgpRouteConstraints",
    "BgpRouteDiff",
    "BgpRouteDiffs",
    "NextHop",
    "NextHopBgpPeerAddress",
    "NextHopConcrete",
    "NextHopDiscard",
    "NextHopInterface",
    "NextHopIp",
    "NextHopResult",
    "NextHopSelf",
    "NextHopVrf",
    "NextHopVtep",
]

# convert a string IP into a next-hop object
def _nextHop_br_converter(value):
    # type: (Any) -> Any
    if isinstance(value, str):
        return NextHopConcrete(NextHopIp(value))
    elif isinstance(value, NextHop):
        return NextHopConcrete(value)
    return value


def _nextHop_display_value(value):
    if value["type"] == "concrete":
        return value["nextHop"]
    return value


[docs]@attr.s(frozen=True) class BgpRoute(DataModelElement): """A BGP routing advertisement. :ivar network: The network prefix advertised by the route. :ivar asPath: The AS path of the route. :ivar communities: The communities of the route. :ivar localPreference: The local preference of the route. :ivar metric: The metric of the route. :ivar nextHop: The next hop of the route. :ivar protocol: The protocol of the route. :ivar originatorIp: The IP address of the originator of the route. :ivar originType: The origin type of the route. :ivar sourceProtocol: The source protocol of the route. :ivar tag: The tag of the route. :ivar weight: The weight of the route. """ # originMechanism is not included network = attr.ib(type=str) originatorIp = attr.ib(type=str) originType = attr.ib(type=str) protocol = attr.ib(type=str) asPath = attr.ib(type=list, default=[]) communities = attr.ib(type=list, default=[]) localPreference = attr.ib(type=int, default=0) metric = attr.ib(type=int, default=0) nextHop = attr.ib(type=str, default=None, converter=_nextHop_br_converter) sourceProtocol = attr.ib(type=str, default=None) tag = attr.ib(type=int, default=0) weight = attr.ib(type=int, default=0) @classmethod def from_dict(cls, json_dict): # type: (Dict) -> BgpRoute return BgpRoute( json_dict["network"], json_dict["originatorIp"], json_dict["originType"], json_dict["protocol"], json_dict.get("asPath", []), json_dict.get("communities", []), json_dict.get("localPreference", 0), json_dict.get("metric", 0), json_dict.get("nextHop", None), json_dict.get("srcProtocol", None), json_dict.get("tag", 0), json_dict.get("weight", 0), ) def dict(self): # type: () -> Dict return { # used to be needed for batfish jackson deserialization "class": "org.batfish.datamodel.questions.BgpRoute", "network": self.network, "asPath": self.asPath, "communities": self.communities, "localPreference": self.localPreference, "metric": self.metric, "nextHop": self.nextHop, "originatorIp": self.originatorIp, "originType": self.originType, "protocol": self.protocol, "srcProtocol": self.sourceProtocol, "tag": self.tag, "weight": self.weight, } def _repr_html_(self): # type: () -> str return "<br>".join(self._repr_html_lines()) def _repr_html_lines(self): # type: () -> List[str] lines = [] lines.append("Network: {node}".format(node=self.network)) lines.append("AS Path: {asPath}".format(asPath=self.asPath)) # using a join on strings removes quotes around individual communities lines.append("Communities: [%s]" % ", ".join(map(str, self.communities))) lines.append("Local Preference: %s" % self.localPreference) lines.append("Metric: %s" % self.metric) lines.append("Next Hop: %s" % _nextHop_display_value(self.nextHop)) lines.append("Originator IP: %s" % self.originatorIp) lines.append("Origin Type: %s" % self.originType) lines.append("Protocol: %s" % self.protocol) lines.append("Source Protocol: %s" % self.sourceProtocol) lines.append("Tag: %s" % self.tag) lines.append("Weight: %s" % self.weight) return lines
# convert a list of strings into a single comma-separated string def _longspace_brc_converter(value): # type: (Any) -> Optional[Text] if value is None or isinstance(value, str): return value if isinstance(value, Iterable): result = ",".join(value) # type: Text return result raise ValueError("Invalid value {}".format(value)) # convert a string into a singleton list def _string_list_brc_converter(value): # type: (Any) -> Optional[List[Text]] if value is None or isinstance(value, list): return value elif isinstance(value, str): return [value] raise ValueError("Invalid value {}".format(value))
[docs]@attr.s(frozen=True) class BgpRouteConstraints(DataModelElement): """Constraints on a BGP route announcement. Specify constraints on route announcements by specifying allowed values in each field of the announcement. :ivar prefix: Allowed prefixes as a list of prefix ranges (e.g., "0.0.0.0/0:0-32") :ivar complementPrefix: A flag indicating that all prefixes except the ones in prefix are allowed :ivar localPreference: List of allowed local preference integer ranges, as a string :ivar med: List of allowed MED integer ranges, as a string :ivar communities: List of allowed and disallowed community regexes :ivar asPath: List of allowed and disallowed AS-path regexes """ prefix = attr.ib( default=None, type=Optional[List[str]], converter=_string_list_brc_converter ) complementPrefix = attr.ib(default=None, type=Optional[bool]) localPreference = attr.ib( default=None, type=Optional[str], converter=_longspace_brc_converter ) med = attr.ib(default=None, type=Optional[str], converter=_longspace_brc_converter) communities = attr.ib( default=None, type=Optional[List[str]], converter=_string_list_brc_converter ) asPath = attr.ib( default=None, type=Optional[List[str]], converter=_string_list_brc_converter ) @classmethod def from_dict(cls, json_dict): return BgpRouteConstraints( prefix=json_dict.get("prefix"), complementPrefix=json_dict.get("complementPrefix"), localPreference=json_dict.get("localPreference"), med=json_dict.get("med"), communities=json_dict.get("communities"), asPath=json_dict.get("asPath"), )
[docs]@attr.s(frozen=True) class BgpRouteDiff(DataModelElement): """A difference between two BGP routes. :ivar fieldName: A Flow field name that has changed. :ivar oldValue: The old value of the field. :ivar newValue: The new value of the field. """ fieldName = attr.ib(type=str) oldValue = attr.ib(type=str) newValue = attr.ib(type=str) @classmethod def from_dict(cls, json_dict): # type: (Dict) -> BgpRouteDiff return BgpRouteDiff( json_dict["fieldName"], json_dict["oldValue"], json_dict["newValue"] ) def _repr_html_(self): # type: () -> str # special pretty printing for certain field names prettyNames = { "asPath": "AS Path", "localPreference": "Local Preference", "metric": "Metric", "nextHop": "Next Hop", "originatorIp": "Originator IP", "originType": "Origin Type", "sourceProtocol": "Source Protocol", "tag": "Tag", "weight": "Weight", } if self.fieldName in prettyNames: prettyFieldName = prettyNames[self.fieldName] else: # by default, just capitalize the field name prettyFieldName = self.fieldName.capitalize() return "{fieldName}: {oldValue} --> {newValue}".format( fieldName=prettyFieldName, oldValue=self.oldValue, newValue=self.newValue )
[docs]@attr.s(frozen=True) class BgpRouteDiffs(DataModelElement): """A set of differences between two BGP routes. :ivar diffs: The set of BgpRouteDiff objects. """ diffs = attr.ib(type=List[BgpRouteDiff]) @classmethod def from_dict(cls, json_dict): # type: (Dict) -> BgpRouteDiffs return BgpRouteDiffs( [ BgpRouteDiff.from_dict(route_dict) for route_dict in json_dict.get("diffs", []) ] ) def _repr_html_(self): # type: () -> str return "<br>".join(diff._repr_html_() for diff in self.diffs)
[docs]@attr.s(frozen=True) class NextHopResult(DataModelElement): """The representation of a route's next-hop for output from questions""" def _repr_html_(self) -> str: return escape_html(str(self)) @abstractmethod def __str__(self) -> str: raise NotImplementedError("NextHopResult elements must implement __str__") @classmethod def from_dict(cls, json_dict: Dict[str, Any]) -> "NextHopResult": if "type" not in json_dict: raise ValueError( "Unknown type of NextHopResult, missing the type property in: {}".format( json.dumps(json_dict) ) ) nh_type = json_dict["type"] if nh_type == "concrete": return NextHopConcrete.from_dict(json_dict) elif nh_type == "peer": return NextHopBgpPeerAddress.from_dict(json_dict) elif nh_type == "self": return NextHopSelf.from_dict(json_dict) else: raise ValueError( "Unhandled NextHopResult type: {} in: {}".format( json.dumps(nh_type), json.dumps(json_dict) ) )
[docs]@attr.s(frozen=True) class NextHopBgpPeerAddress(NextHopResult): """Indicates the next-hop should be set to the address of the BGP peer""" type = attr.ib(type=str, default="peer") @type.validator def check(self, _attribute, value): if value != "peer": raise ValueError('type must be "peer"') def dict(self) -> Dict[str, Any]: return {"type": "peer"} def __str__(self) -> str: return "peer" @classmethod def from_dict(cls, json_dict: Dict[str, Any]) -> "NextHopBgpPeerAddress": assert json_dict == {"type": "peer"} return NextHopBgpPeerAddress()
[docs]@attr.s(frozen=True) class NextHopSelf(NextHopResult): """Indicates the next-hop should be set to the local IP address""" type = attr.ib(type=str, default="self") @type.validator def check(self, _attribute, value): if value != "self": raise ValueError('type must be "self"') def dict(self) -> Dict[str, Any]: return {"type": "self"} def __str__(self) -> str: return "self" @classmethod def from_dict(cls, json_dict: Dict[str, Any]) -> "NextHopSelf": assert json_dict == {"type": "self"} return NextHopSelf()
[docs]@attr.s(frozen=True) class NextHopConcrete(NextHopResult): """A wrapper for the various concrete kinds of next-hops that a route may have""" nextHop = attr.ib(type="NextHop") type = attr.ib(type=str, default="concrete") @type.validator def check(self, _attribute, value): if value != "concrete": raise ValueError('type must be "concrete"') def __str__(self) -> str: return "nextHop {}".format(self.nextHop) @classmethod def from_dict(cls, json_dict: Dict[str, Any]) -> "NextHopConcrete": assert set(json_dict.keys()) == {"type", "nextHop"} assert json_dict["type"] == "concrete" nextHop = json_dict["nextHop"] return NextHopConcrete(nextHop)
[docs]class NextHop(DataModelElement, metaclass=ABCMeta): """A next-hop of a route""" def _repr_html_(self) -> str: return escape_html(str(self)) @abstractmethod def __str__(self) -> str: raise NotImplementedError("NextHop elements must implement __str__") @classmethod def from_dict(cls, json_dict: Dict[str, Any]) -> "NextHop": if "type" not in json_dict: raise ValueError( "Unknown type of NextHop, missing the type property in: {}".format( json.dumps(json_dict) ) ) nh_type = json_dict["type"] if nh_type == "discard": return NextHopDiscard.from_dict(json_dict) elif nh_type == "interface": return NextHopInterface.from_dict(json_dict) elif nh_type == "ip": return NextHopIp.from_dict(json_dict) elif nh_type == "vrf": return NextHopVrf.from_dict(json_dict) elif nh_type == "vtep": return NextHopVtep.from_dict(json_dict) else: raise ValueError( "Unhandled NextHop type: {} in: {}".format( json.dumps(nh_type), json.dumps(json_dict) ) )
[docs]@attr.s(frozen=True) class NextHopDiscard(NextHop): """Indicates the packet should be dropped""" type = attr.ib(type=str, default="discard") @type.validator def check(self, _attribute, value): if value != "discard": raise ValueError('type must be "discard"') def dict(self) -> Dict[str, Any]: return {"type": "discard"} def __str__(self) -> str: return "discard" @classmethod def from_dict(cls, json_dict: Dict[str, Any]) -> "NextHopDiscard": assert json_dict == {"type": "discard"} return NextHopDiscard()
[docs]@attr.s(frozen=True) class NextHopInterface(NextHop): """A next-hop of a route with a fixed output interface and optional next gateway IP. If there is no IP, the destination IP of the packet will be used as the next gateway IP.""" interface = attr.ib(type=str) ip = attr.ib(type=Optional[str], default=None) type = attr.ib(type=str, default="interface") @type.validator def check(self, _attribute, value): if value != "interface": raise ValueError('type must be "interface"') def __str__(self) -> str: return ( "interface {} ip {}".format(escape_name(self.interface), self.ip) if self.ip else "interface {}".format(escape_name(self.interface)) ) @classmethod def from_dict(cls, json_dict: Dict[str, Any]) -> "NextHopInterface": assert set(json_dict.keys()) == {"type", "interface", "ip"} or set( json_dict.keys() ) == {"type", "interface"} assert json_dict["type"] == "interface" interface = json_dict["interface"] ip = None assert isinstance(interface, str) if "ip" in json_dict: ip = json_dict["ip"] assert ip is None or isinstance(ip, str) return NextHopInterface(interface, ip)
[docs]@attr.s(frozen=True) class NextHopIp(NextHop): """A next-hop of a route including the next gateway IP""" ip = attr.ib(type=str) type = attr.ib(type=str, default="ip") @type.validator def check(self, _attribute, value): if value != "ip": raise ValueError('type must be "ip"') def __str__(self) -> str: return "ip {}".format(self.ip) @classmethod def from_dict(cls, json_dict: Dict[str, Any]) -> "NextHopIp": assert set(json_dict.keys()) == {"type", "ip"} assert json_dict["type"] == "ip" ip = json_dict["ip"] assert isinstance(ip, str) return NextHopIp(ip)
[docs]@attr.s(frozen=True) class NextHopVrf(NextHop): """A next-hop of a route indicating the destination IP should be resolved in another VRF""" vrf = attr.ib(type=str) type = attr.ib(type=str, default="vrf") @type.validator def check(self, _attribute, value): if value != "vrf": raise ValueError('type must be "vrf"') def __str__(self) -> str: return "vrf {}".format(escape_name(self.vrf)) @classmethod def from_dict(cls, json_dict: Dict[str, Any]) -> "NextHopVrf": assert set(json_dict.keys()) == {"type", "vrf"} assert json_dict["type"] == "vrf" vrf = json_dict["vrf"] assert isinstance(vrf, str) return NextHopVrf(vrf)
[docs]@attr.s(frozen=True) class NextHopVtep(NextHop): """A next-hop of a route indicating the packet should be routed through a VXLAN tunnel""" vni = attr.ib(type=int) vtep = attr.ib(type=str) type = attr.ib(type=str, default="vtep") @type.validator def check(self, _attribute, value): if value != "vtep": raise ValueError('type must be "vtep"') def __str__(self) -> str: return "vni {} vtep {}".format(self.vni, self.vtep) @classmethod def from_dict(cls, json_dict: Dict[str, Any]) -> "NextHopVtep": assert set(json_dict.keys()) == {"type", "vni", "vtep"} assert json_dict["type"] == "vtep" vni = json_dict["vni"] vtep = json_dict["vtep"] assert isinstance(vni, int) assert isinstance(vtep, str) return NextHopVtep(vni, vtep)