# coding=utf-8
# Copyright 2018 The Batfish Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import re
from abc import ABCMeta, abstractmethod
from typing import Any, Dict, Iterable, List, Optional, Sequence, Text # noqa: F401
import attr
from pybatfish.util import escape_html, escape_name
from .primitives import DataModelElement, Edge
from .route import NextHop
__all__ = [
"ArpErrorStepDetail",
"DelegatedToNextVrf",
"DeliveredStepDetail",
"Discarded",
"EnterInputIfaceStepDetail",
"ExitOutputIfaceStepDetail",
"FilterStepDetail",
"ForwardedIntoVxlanTunnel",
"ForwardedOutInterface",
"ForwardingDetail",
"Flow",
"HeaderConstraints",
"Hop",
"InboundStepDetail",
"MatchSessionStepDetail",
"MatchTcpFlags",
"OriginateStepDetail",
"RoutingStepDetail",
"SetupSessionStepDetail",
"PathConstraints",
"TcpFlags",
"Trace",
"TransformationStepDetail",
]
def _optional_int(x):
# type: (Any) -> Optional[int]
if x is None:
return None
return int(x)
[docs]@attr.s(frozen=True)
class Flow(DataModelElement):
"""A concrete IPv4 flow.
Noteworthy attributes for flow inspection/filtering:
:ivar srcIP: Source IP of the flow
:ivar dstIP: Destination IP of the flow
:ivar srcPort: Source port of the flow
:ivar dstPort: Destination port of the flow
:ivar ipProtocol: the IP protocol of the flow either as its name (e.g., TCP) for well-known protocols or a string like UNNAMED_168
:ivar ingressNode: the node where the flow started (or entered the network)
:ivar ingressInterface: the interface name where the flow started (or entered the network)
:ivar ingressVrf: the VRF name where the flow started (or entered the network)
"""
dscp = attr.ib(type=int, converter=int)
dstIp = attr.ib(type=str, converter=str)
dstPort = attr.ib(type=Optional[int], converter=_optional_int)
ecn = attr.ib(type=int, converter=int)
fragmentOffset = attr.ib(type=int, converter=int)
icmpCode = attr.ib(type=Optional[int], converter=_optional_int)
icmpVar = attr.ib(type=Optional[int], converter=_optional_int)
ingressInterface = attr.ib(type=Optional[str])
ingressNode = attr.ib(type=Optional[str])
ingressVrf = attr.ib(type=Optional[str])
ipProtocol = attr.ib(type=str)
packetLength = attr.ib(type=str)
srcIp = attr.ib(type=str, converter=str)
srcPort = attr.ib(type=Optional[int], converter=_optional_int)
tcpFlagsAck = attr.ib(type=Optional[int], converter=_optional_int)
tcpFlagsCwr = attr.ib(type=Optional[int], converter=_optional_int)
tcpFlagsEce = attr.ib(type=Optional[int], converter=_optional_int)
tcpFlagsFin = attr.ib(type=Optional[int], converter=_optional_int)
tcpFlagsPsh = attr.ib(type=Optional[int], converter=_optional_int)
tcpFlagsRst = attr.ib(type=Optional[int], converter=_optional_int)
tcpFlagsSyn = attr.ib(type=Optional[int], converter=_optional_int)
tcpFlagsUrg = attr.ib(type=Optional[int], converter=_optional_int)
IP_PROTOCOL_PATTERN = re.compile("^UNNAMED_([0-9]+)$", flags=re.IGNORECASE)
@classmethod
def from_dict(cls, json_dict: Dict[str, Any]) -> "Flow":
return Flow(
json_dict["dscp"],
json_dict["dstIp"],
json_dict.get("dstPort"),
json_dict["ecn"],
json_dict["fragmentOffset"],
json_dict.get("icmpCode"),
json_dict.get("icmpVar"),
json_dict.get("ingressInterface"),
json_dict.get("ingressNode"),
json_dict.get("ingressVrf"),
json_dict["ipProtocol"],
json_dict["packetLength"],
json_dict["srcIp"],
json_dict.get("srcPort"),
json_dict.get("tcpFlagsAck"),
json_dict.get("tcpFlagsCwr"),
json_dict.get("tcpFlagsEce"),
json_dict.get("tcpFlagsFin"),
json_dict.get("tcpFlagsPsh"),
json_dict.get("tcpFlagsRst"),
json_dict.get("tcpFlagsSyn"),
json_dict.get("tcpFlagsUrg"),
)
def __str__(self):
# type: () -> str
iface_str = self._iface_str()
vrf_str = self._vrf_str()
return (
"start={node}{iface}{vrf} [{src}->{dst}"
" {ip_proto}{dscp}{ecn}{offset}{length}]".format(
node=self.ingressNode,
iface=iface_str,
vrf=vrf_str,
src=self._ip_port(self.srcIp, self.srcPort),
dst=self._ip_port(self.dstIp, self.dstPort),
ip_proto=self.get_ip_protocol_str(),
dscp=(" dscp={}".format(self.dscp) if self.dscp != 0 else ""),
ecn=(" ecn={}".format(self.ecn) if self.ecn != 0 else ""),
offset=(
" fragmentOffset={}".format(self.fragmentOffset)
if self.fragmentOffset != 0
else ""
),
length=(
" length={}".format(self.packetLength)
if self.packetLength != 512 # Batfish default
else ""
),
)
)
def _vrf_str(self):
vrf_str = (
" vrf={}".format(self.ingressVrf)
if self.ingressVrf not in ["default", None]
else ""
)
return vrf_str
def _iface_str(self):
iface_str = (
" interface={}".format(self.ingressInterface)
if self.ingressInterface is not None
else ""
)
return iface_str
[docs] def get_flag_str(self):
# type: () -> str
"""
Returns a print friendly version of all set TCP flags.
"""
flags = []
# ordering heuristics: common flags first, common combinations (SYN-ACK, FIN-ACK) print nicely
if self.tcpFlagsSyn:
flags.append("SYN")
if self.tcpFlagsFin:
flags.append("FIN")
if self.tcpFlagsAck:
flags.append("ACK")
if self.tcpFlagsRst:
flags.append("RST")
if self.tcpFlagsCwr:
flags.append("CWR")
if self.tcpFlagsEce:
flags.append("ECE")
if self.tcpFlagsPsh:
flags.append("PSH")
if self.tcpFlagsUrg:
flags.append("URG")
return "-".join(flags) if len(flags) > 0 else "no flags set"
[docs] def get_ip_protocol_str(self):
# type: () -> str
"""Returns a print-friendly version of IP protocol and any protocol-specific information (e.g., flags for TCP, type/code for ICMP."""
match = self.IP_PROTOCOL_PATTERN.match(self.ipProtocol)
if match:
return "ipProtocol=" + match.group(1)
if self.ipProtocol.lower() == "tcp":
return "TCP ({})".format(self.get_flag_str())
if self.ipProtocol.lower() == "icmp":
return "ICMP (type={}, code={})".format(self.icmpVar, self.icmpCode)
return self.ipProtocol
def _has_ports(self):
# type: () -> bool
return (
self.ipProtocol in ["TCP", "UDP", "DCCP", "SCTP"]
and self.srcPort is not None
and self.dstPort is not None
)
def _repr_html_(self):
# type: () -> str
return "<br>".join(self._repr_html_lines())
def _repr_html_lines(self):
# type: () -> List[str]
lines = []
lines.append(
"Start Location: {node}{iface}{vrf}".format(
node=self.ingressNode, iface=self._iface_str(), vrf=self._vrf_str()
)
)
lines.append("Src IP: %s" % self.srcIp)
if self._has_ports():
assert self.srcPort is not None
lines.append("Src Port: %d" % self.srcPort)
lines.append("Dst IP: %s" % self.dstIp)
if self._has_ports():
assert self.dstPort is not None
lines.append("Dst Port: %d" % self.dstPort)
lines.append("IP Protocol: %s" % self.get_ip_protocol_str())
if self.dscp != 0:
lines.append("DSCP: %s" % self.dscp)
if self.ecn != 0:
lines.append("ECN: %s" % self.ecn)
if self.fragmentOffset != 0:
lines.append("Fragment Offset: %d" % self.fragmentOffset)
if self.packetLength != 512:
lines.append("Packet Length: %s" % self.packetLength)
return lines
def _ip_port(self, ip, port):
# type: (str, Optional[int]) -> str
if self._has_ports():
assert port is not None
return "{ip}:{port}".format(ip=ip, port=port)
else:
return ip
@attr.s(frozen=True)
class FlowDiff(DataModelElement):
"""A difference between two Flows.
:ivar fieldName: A Flow field name that has changed.
:ivar oldValue: The old value of the field.
:ivar newValue: The new value of the field.
"""
fieldName = attr.ib(type=str)
oldValue = attr.ib(type=str)
newValue = attr.ib(type=str)
@classmethod
def from_dict(cls, json_dict):
# type: (Dict) -> FlowDiff
return FlowDiff(
json_dict["fieldName"], json_dict["oldValue"], json_dict["newValue"]
)
def __str__(self):
# type: () -> str
return "{fieldName}: {oldValue} -> {newValue}".format(
fieldName=self.fieldName, oldValue=self.oldValue, newValue=self.newValue
)
@attr.s(frozen=True)
class FlowTrace(DataModelElement):
"""A trace of a flow through the network.
A flowTrace is a combination of hops and flow fate (i.e., disposition).
:ivar disposition: Flow disposition
:ivar hops: A list of hops (:py:class:`FlowTraceHop`) the flow took
:ivar notes: Additional notes that help explain the disposition, if applicable.
"""
disposition = attr.ib(type=str)
hops = attr.ib(type=Sequence)
notes = attr.ib(type=Any)
@classmethod
def from_dict(cls, json_dict):
# type: (Dict) -> FlowTrace
return FlowTrace(
json_dict["disposition"],
[FlowTraceHop.from_dict(hop) for hop in json_dict.get("hops", [])],
json_dict.get("notes"),
)
def __str__(self):
# type: () -> str
return "{hops}\n{notes}".format(
hops="\n".join(
["{} {}".format(num, hop) for num, hop in enumerate(self.hops, start=1)]
),
notes=self.notes,
)
def __len__(self):
return len(self.hops)
def __getitem__(self, item):
return self.hops[item]
def _repr_html_(self):
# type: () -> str
return "{notes}<br>{hops}".format(
notes=self.format_notes_html(),
hops="<br><br>".join(
[
"<strong>{num}</strong> {hop}".format(
num=num, hop=hop._repr_html_()
)
for num, hop in enumerate(self.hops, start=1)
]
),
)
def format_notes_html(self):
# type: () -> str
return '<span style="color:{color}; text-weight:bold;">{notes}</span>'.format(
color=_get_color_for_disposition(self.disposition),
notes=escape_html(self.notes),
)
@attr.s(frozen=True)
class FlowTraceHop(DataModelElement):
"""A single hop in a flow trace.
:ivar edge: The :py:class:`~Edge` identifying the hop/link
:ivar routes: The routes which caused this hop
:ivar transformedFlow: The transformed version of the flow (if NAT is present)
"""
edge = attr.ib(type=Edge)
routes = attr.ib(type=List[Any])
transformedFlow = attr.ib(type=Optional[Flow])
@classmethod
def from_dict(cls, json_dict):
# type: (Dict) -> FlowTraceHop
transformed_flow = json_dict.get("transformedFlow")
return FlowTraceHop(
Edge.from_dict(json_dict["edge"]),
list(json_dict.get("routes", [])),
Flow.from_dict(transformed_flow) if transformed_flow else None,
)
def __str__(self):
# type: () -> str
ret_str = "{}\n Route(s):\n {}".format(
self.edge, "\n ".join(self.routes)
)
if self.transformedFlow:
ret_str += "\n Transformed flow: {}".format(self.transformedFlow)
return ret_str
def _repr_html_(self):
# type: () -> str
indent = " " * 4
result = "{edge}<br>Route(s):<br>{routes}".format(
edge=self.edge._repr_html_(),
routes=indent
+ ("<br>" + indent).join([escape_html(r) for r in self.routes]),
)
if self.transformedFlow:
result += "<br>Transformed flow: {}".format(
self.transformedFlow._repr_html_()
)
return result
class SessionAction(DataModelElement):
"""An action that a firewall session takes for return traffic matching the session."""
@classmethod
def from_dict(cls, json_dict):
# type: (Dict) -> SessionAction
action = json_dict.get("type")
if action == "Accept":
return Accept()
if action == "PreNatFibLookup":
return PreNatFibLookup()
if action == "PostNatFibLookup" or action == "FibLookup":
# action == "FibLookup" supported for backwards compatibility
return PostNatFibLookup()
if action == "ForwardOutInterface":
return ForwardOutInterface.from_dict(json_dict)
raise ValueError("Invalid session action type: {}".format(action))
@attr.s(frozen=True)
class Accept(SessionAction):
"""A SessionAction whereby return traffic is accepted by the node from which it
originated.
"""
def __str__(self):
# type: () -> str
return "Accept"
@attr.s(frozen=True)
class PreNatFibLookup(SessionAction):
"""A SessionAction whereby return traffic is forwarded according to the result of a lookup
on the FIB of the interface on which the return traffic is received before NAT is applied.
"""
def __str__(self):
# type: () -> str
return "PreNatFibLookup"
@attr.s(frozen=True)
class PostNatFibLookup(SessionAction):
"""A SessionAction whereby return traffic is forwarded according to the result of a lookup
on the FIB of the interface on which the return traffic is received after NAT is applied.
"""
def __str__(self):
# type: () -> str
return "PostNatFibLookup"
@attr.s(frozen=True)
class ForwardOutInterface(SessionAction):
"""A SessionAction whereby a return flow is forwarded out a specified interface to a
specified next hop with neither FIB resolution nor ARP lookup.
:ivar nextHopHostname: Hostname of the next hop
:ivar nextHopInterface: Interface that the next hop receives
:ivar outgoingInterface: Interface of the outgoing traffic from this hop
"""
nextHopHostname = attr.ib(type=str)
nextHopInterface = attr.ib(type=str)
outgoingInterface = attr.ib(type=str)
@classmethod
def from_dict(cls, json_dict):
# type: (Dict) -> ForwardOutInterface
next_hop = json_dict.get("nextHop", {})
return ForwardOutInterface(
next_hop.get("hostname", ""),
next_hop.get("interface", ""),
json_dict.get("outgoingInterface", ""),
)
def __str__(self):
# type: () -> str
return "ForwardOutInterface(Next Hop: {}, Next Hop Interface: {}, Outgoing Interface: {})".format(
self.nextHopHostname, self.nextHopInterface, self.outgoingInterface
)
@attr.s(frozen=True)
class SessionMatchExpr(DataModelElement):
"""
Represents a match criteria for a firewall session.
:ivar ipProtocol: IP protocol of the flow
:ivar srcIp: Source IP of the flow
:ivar dstIp: Destination IP of the flow
:ivar srcPort: Source port of the flow
:ivar dstPort: Destination port of the flow
"""
ipProtocol = attr.ib(type=str)
srcIp = attr.ib(type=str)
dstIp = attr.ib(type=str)
srcPort = attr.ib(type=Optional[int], default=None)
dstPort = attr.ib(type=Optional[int], default=None)
@classmethod
def from_dict(cls, json_dict):
# type: (Dict) -> SessionMatchExpr
return SessionMatchExpr(
json_dict.get("ipProtocol", ""),
json_dict.get("srcIp", ""),
json_dict.get("dstIp", ""),
json_dict.get("srcPort"),
json_dict.get("dstPort"),
)
def __str__(self):
# type: () -> str
matchers = ["ipProtocol", "srcIp", "dstIp"]
if self.srcPort is not None and self.dstPort is not None:
matchers.extend(["srcPort", "dstPort"])
strings = ["{}={}".format(field, getattr(self, field)) for field in matchers]
return "[{}]".format(", ".join(strings))
class SessionScope(DataModelElement):
"""
Represents the scope of a firewall session.
"""
@classmethod
def from_dict(cls, json_dict):
# type: (Dict) -> SessionScope
if "incomingInterfaces" in json_dict:
return IncomingSessionScope.from_dict(json_dict)
elif "originatingVrf" in json_dict:
return OriginatingSessionScope.from_dict(json_dict)
raise ValueError("Invalid session scope: {}".format(json_dict))
@attr.s(frozen=True)
class IncomingSessionScope(SessionScope):
"""
Represents scope of a firewall session established by traffic leaving specific interfaces.
:ivar incomingInterfaces: Interfaces where exiting traffic can cause a session to be established
"""
incomingInterfaces = attr.ib(type=List[str])
@classmethod
def from_dict(cls, json_dict):
# type: (Dict) -> IncomingSessionScope
return IncomingSessionScope(json_dict.get("incomingInterfaces", ""))
def __str__(self):
# type: () -> str
return "Incoming Interfaces: [{}]".format(", ".join(self.incomingInterfaces))
@attr.s(frozen=True)
class OriginatingSessionScope(SessionScope):
"""
Represents scope of a firewall session established by traffic accepted into a specific VRF.
:ivar originatingVrf: VRF where accepted traffic can cause a session to be established
"""
originatingVrf = attr.ib(type=str)
@classmethod
def from_dict(cls, json_dict):
# type: (Dict) -> OriginatingSessionScope
return OriginatingSessionScope(json_dict.get("originatingVrf", ""))
def __str__(self):
# type: () -> str
return "Originating VRF: {}".format(self.originatingVrf)
[docs]@attr.s(frozen=True)
class ArpErrorStepDetail(DataModelElement):
"""Details of a step representing the arp error of a flow when sending out of a Hop.
:ivar outputInterface: Interface of the Hop from which the flow exits
:ivar resolvedNexthopIp: Resolve next hop Ip address
"""
outputInterface = attr.ib(type=Optional[str])
resolvedNexthopIp = attr.ib(type=Optional[str])
@classmethod
def from_dict(cls, json_dict):
# type: (Dict) -> ArpErrorStepDetail
return ArpErrorStepDetail(
json_dict.get("outputInterface", {}).get("interface"),
json_dict.get("resolvedNexthopIp"),
)
def __str__(self):
# type: () -> str
detail_info = []
if self.outputInterface:
detail_info.append("Output Interface: {}".format(self.outputInterface))
if self.resolvedNexthopIp:
detail_info.append(
"Resolved Next Hop IP: {}".format(self.resolvedNexthopIp)
)
return ", ".join(detail_info)
[docs]@attr.s(frozen=True)
class DeliveredStepDetail(DataModelElement):
"""Details of a step representing the flow is delivered or exiting the network.
:ivar outputInterface: Interface of the Hop from which the flow exits
:ivar resolvedNexthopIp: Resolve next hop Ip address
"""
outputInterface = attr.ib(type=Optional[str])
resolvedNexthopIp = attr.ib(type=Optional[str])
@classmethod
def from_dict(cls, json_dict):
# type: (Dict) -> DeliveredStepDetail
return DeliveredStepDetail(
json_dict.get("outputInterface", {}).get("interface"),
json_dict.get("resolvedNexthopIp"),
)
def __str__(self):
# type: () -> str
detail_info = []
if self.outputInterface:
detail_info.append("Output Interface: {}".format(self.outputInterface))
if self.resolvedNexthopIp:
detail_info.append(
"Resolved Next Hop IP: {}".format(self.resolvedNexthopIp)
)
return ", ".join(detail_info)
[docs]@attr.s(frozen=True)
class ExitOutputIfaceStepDetail(DataModelElement):
"""Details of a step representing the exiting of a flow out of a Hop.
:ivar outputInterface: Interface of the Hop from which the flow exits
:ivar transformedFlow: Transformed Flow if a source NAT was applied on the Flow
"""
outputInterface = attr.ib(type=str)
transformedFlow = attr.ib(type=Optional[str])
@classmethod
def from_dict(cls, json_dict):
# type: (Dict) -> ExitOutputIfaceStepDetail
return ExitOutputIfaceStepDetail(
json_dict.get("outputInterface", {}).get("interface"),
json_dict.get("transformedFlow"),
)
def __str__(self):
# type: () -> str
return str(self.outputInterface)
[docs]@attr.s(frozen=True)
class InboundStepDetail(DataModelElement):
"""Details of a step representing the receiving (acceptance) of a flow into a Hop.
:ivar interface: interface that owns the destination IP
"""
interface = attr.ib(type=str)
@classmethod
def from_dict(cls, json_dict):
# type: (Dict) -> InboundStepDetail
return InboundStepDetail(json_dict.get("interface", ""))
def __str__(self):
return str(self.interface)
@attr.s(frozen=True)
class LoopStepDetail(DataModelElement):
"""Details of a step representing a forwarding loop being detected."""
@classmethod
def from_dict(cls, json_dict):
return LoopStepDetail()
def __str__(self):
return ""
[docs]@attr.s(frozen=True)
class MatchSessionStepDetail(DataModelElement):
"""Details of a step for when a flow matches a firewall session.
:ivar sessionScope: Scope of flows session can match (incoming interfaces or originating VRF)
:ivar sessionAction: A SessionAction that the firewall takes for a matching session
:ivar matchCriteria: A SessionMatchExpr that describes the match criteria of the session
:ivar transformation: List of FlowDiffs that will be applied after session match
"""
sessionScope = attr.ib(type=SessionScope)
sessionAction = attr.ib(type=SessionAction)
matchCriteria = attr.ib(type=SessionMatchExpr)
transformation = attr.ib(type=Optional[List[FlowDiff]], factory=list)
@classmethod
def from_dict(cls, json_dict):
# type: (Dict) -> MatchSessionStepDetail
# backward compatibility: if sessionScope is missing, look for
# incomingInterfaces instead
if "sessionScope" in json_dict:
sessionScope = SessionScope.from_dict(json_dict.get("sessionScope", {}))
else:
sessionScope = IncomingSessionScope.from_dict(json_dict)
return MatchSessionStepDetail(
sessionScope,
SessionAction.from_dict(json_dict.get("sessionAction", {})),
SessionMatchExpr.from_dict(json_dict.get("matchCriteria", {})),
[FlowDiff.from_dict(diff) for diff in json_dict.get("transformation", [])],
)
def __str__(self):
# type: () -> str
strings = [
"{}".format(self.sessionScope),
"Action: {}".format(self.sessionAction),
"Match Criteria: {}".format(self.matchCriteria),
]
if self.transformation:
strings.append(
"Transformation: [{}]".format(", ".join(map(str, self.transformation)))
)
return ", ".join(strings)
class ForwardingDetail(DataModelElement, metaclass=ABCMeta):
def _repr_html_(self) -> str:
return escape_html(str(self))
@abstractmethod
def __str__(self) -> str:
raise NotImplementedError("ForwardingDetail elements must implement __str__")
@classmethod
def from_dict(cls, json_dict: Dict) -> "ForwardingDetail":
if "type" not in json_dict:
raise ValueError(
"Unknown type of ForwardingDetail, missing the type property in: {}".format(
json.dumps(json_dict)
)
)
fd_type = json_dict["type"]
if fd_type == "DelegatedToNextVrf":
return DelegatedToNextVrf.from_dict(json_dict)
elif fd_type == "ForwardedIntoVxlanTunnel":
return ForwardedIntoVxlanTunnel.from_dict(json_dict)
elif fd_type == "ForwardedOutInterface":
return ForwardedOutInterface.from_dict(json_dict)
elif fd_type == "Discarded":
return Discarded.from_dict(json_dict)
else:
raise ValueError(
"Unhandled ForwardingDetail type: {} in: {}".format(
json.dumps(fd_type), json.dumps(json_dict)
)
)
[docs]@attr.s(frozen=True)
class DelegatedToNextVrf(ForwardingDetail):
"""A flow being delegated to a different VRF for further processing."""
nextVrf = attr.ib(type=str)
type = attr.ib(type=str, default="DelegatedToNextVrf")
@type.validator
def check(self, _attribute, value):
if value != "DelegatedToNextVrf":
raise ValueError('type must be "DelegatedToNextVrf"')
def __str__(self) -> str:
return "Delegated to next VRF: {}".format(escape_name(self.nextVrf))
@classmethod
def from_dict(cls, json_dict: Dict[str, Any]) -> "DelegatedToNextVrf":
assert set(json_dict.keys()) == {"type", "nextVrf"}
assert json_dict["type"] == "DelegatedToNextVrf"
next_vrf = json_dict["nextVrf"]
assert isinstance(next_vrf, str)
return DelegatedToNextVrf(next_vrf)
[docs]@attr.s(frozen=True)
class ForwardedIntoVxlanTunnel(ForwardingDetail):
"""A flow being forwarded into a VXLAN tunnel."""
vni = attr.ib(type=int)
vtep = attr.ib(type=str)
type = attr.ib(type=str, default="ForwardedIntoVxlanTunnel")
@type.validator
def check(self, _attribute, value):
if value != "ForwardedIntoVxlanTunnel":
raise ValueError('type must be "ForwardedIntoVxlanTunnel"')
def __str__(self) -> str:
return "Forwarded into VXLAN tunnel with VNI: {vni} and VTEP: {vtep}".format(
vni=self.vni, vtep=self.vtep
)
@classmethod
def from_dict(cls, json_dict: Dict[str, Any]) -> "ForwardedIntoVxlanTunnel":
assert set(json_dict.keys()) == {"type", "vni", "vtep"}
assert json_dict["type"] == "ForwardedIntoVxlanTunnel"
vni = json_dict["vni"]
vtep = json_dict["vtep"]
assert isinstance(vni, int)
assert isinstance(vtep, str)
return ForwardedIntoVxlanTunnel(vni, vtep)
[docs]@attr.s(frozen=True)
class ForwardedOutInterface(ForwardingDetail):
"""A flow being forwarded out an interface.
If there is no resolved next-hop IP and this is the final step on this node, the destination IP of the flow will be used as the next gateway IP."""
outputInterface = attr.ib(type=str)
resolvedNextHopIp = attr.ib(type=Optional[str], default=None)
type = attr.ib(type=str, default="ForwardedOutInterface")
@type.validator
def check(self, _attribute, value):
if value != "ForwardedOutInterface":
raise ValueError('type must be "ForwardedOutInterface"')
def __str__(self) -> str:
return (
"Forwarded out interface: {iface} with resolved next-hop IP: {nhip}".format(
iface=escape_name(self.outputInterface), nhip=self.resolvedNextHopIp
)
if self.resolvedNextHopIp
else "Forwarded out interface: {iface}".format(
iface=escape_name(self.outputInterface)
)
)
@classmethod
def from_dict(cls, json_dict: Dict[str, Any]) -> "ForwardedOutInterface":
assert set(json_dict.keys()) == {
"type",
"outputInterface",
"resolvedNextHopIp",
} or set(json_dict.keys()) == {"type", "outputInterface"}
assert json_dict["type"] == "ForwardedOutInterface"
output_interface = json_dict["outputInterface"]
resolved_next_hop_ip = None
assert isinstance(output_interface, str)
if "resolvedNextHopIp" in json_dict:
resolved_next_hop_ip = json_dict["resolvedNextHopIp"]
assert resolved_next_hop_ip is None or isinstance(resolved_next_hop_ip, str)
return ForwardedOutInterface(output_interface, resolved_next_hop_ip)
[docs]@attr.s(frozen=True)
class Discarded(ForwardingDetail):
"""A flow being discarded."""
type = attr.ib(type=str, default="Discarded")
@type.validator
def check(self, _attribute, value):
if value != "Discarded":
raise ValueError('type must be "Discarded"')
def __str__(self) -> str:
return "Discarded"
@classmethod
def from_dict(cls, json_dict: Dict[str, Any]) -> "Discarded":
assert json_dict == {"type": "Discarded"}
return Discarded()
[docs]@attr.s(frozen=True)
class OriginateStepDetail(DataModelElement):
"""Details of a step representing the originating of a flow in a Hop.
:ivar originatingVrf: VRF from which the Flow originates
"""
originatingVrf = attr.ib(type=str)
@classmethod
def from_dict(cls, json_dict):
# type: (Dict) -> OriginateStepDetail
return OriginateStepDetail(json_dict.get("originatingVrf", ""))
def __str__(self):
# type: () -> str
return str(self.originatingVrf)
@attr.s(frozen=True)
class RouteInfo(DataModelElement):
"""Contains information about the routes which led to the selection of the forwarding action for the ExitOutputIfaceStep"""
protocol = attr.ib(type=str)
network = attr.ib(type=str)
# TODO: make nextHop mandatory after sufficient period
nextHop = attr.ib(type=Optional[NextHop])
# nextHopIp populated only in absence of nextHop
# TODO: remove nextHopIp after sufficient period
nextHopIp = attr.ib(type=Optional[str])
admin = attr.ib(type=int)
metric = attr.ib(type=int)
def _old_str(self) -> str:
return "{protocol} (Network: {network}, Next Hop IP:{next_hop_ip})".format(
protocol=self.protocol,
network=self.network,
next_hop_ip=self.nextHopIp,
)
def __str__(self) -> str:
if not self.nextHop:
return self._old_str()
return "{protocol} (Network: {network}, Next Hop: {next_hop})".format(
protocol=self.protocol,
network=self.network,
next_hop=str(self.nextHop),
)
@classmethod
def from_dict(cls, json_dict: Dict[str, Any]) -> "RouteInfo":
assert set(json_dict.keys()) - {"nextHop", "nextHopIp", "nextVrf"} == {
"protocol",
"network",
"admin",
"metric",
}
protocol = json_dict.get("protocol")
assert isinstance(protocol, str)
network = json_dict.get("network")
assert isinstance(network, str)
next_hop = None
next_hop_ip = None
if "nextHop" in json_dict:
next_hop_dict = json_dict.get("nextHop")
assert isinstance(next_hop_dict, Dict)
next_hop = NextHop.from_dict(next_hop_dict)
else:
# legacy
assert "nextHopIp" in json_dict
next_hop_ip = json_dict.get("nextHopIp")
assert isinstance(next_hop_ip, str)
admin = json_dict.get("admin")
assert isinstance(admin, int)
metric = json_dict.get("metric")
assert isinstance(metric, int)
return RouteInfo(protocol, network, next_hop, next_hop_ip, admin, metric)
[docs]@attr.s(frozen=True)
class RoutingStepDetail(DataModelElement):
"""Details of a step representing the routing from input interface to output interface.
:ivar routes: List of routes which were considered to select the forwarding action
"""
routes = attr.ib(type=List[RouteInfo])
# TODO: make forwardingDetail mandatory after sufficient period
forwardingDetail = attr.ib(type=Optional[ForwardingDetail])
# TODO: remove arpIp after sufficient period
arpIp = attr.ib(type=Optional[str])
# TODO: remove outputInteface after sufficient period
outputInterface = attr.ib(type=Optional[str])
@classmethod
def from_dict(cls, json_dict: Dict) -> "RoutingStepDetail":
routes = []
routes_json_list = json_dict.get("routes", [])
assert isinstance(routes_json_list, List)
for route_json in routes_json_list:
assert isinstance(route_json, Dict)
routes.append(RouteInfo.from_dict(route_json))
forwarding_detail = None
if "forwardingDetail" in json_dict:
forwarding_detail_json = json_dict.get("forwardingDetail")
assert isinstance(forwarding_detail_json, Dict)
forwarding_detail = ForwardingDetail.from_dict(forwarding_detail_json)
arp_ip = json_dict.get("arpIp")
if arp_ip is not None:
assert isinstance(arp_ip, str)
output_interface = json_dict.get("outputInterface")
if output_interface is not None:
assert isinstance(output_interface, str)
return RoutingStepDetail(
routes,
forwarding_detail,
arp_ip,
output_interface,
)
def _old_str(self) -> str:
output = []
if self.arpIp is not None:
output.append("ARP IP: " + self.arpIp)
if self.outputInterface is not None:
output.append("Output Interface: " + self.outputInterface)
if self.routes:
output.append(
"Routes: " + "[" + ",".join([str(route) for route in self.routes]) + "]"
)
return ", ".join(output)
def __str__(self) -> str:
if not self.forwardingDetail:
return self._old_str()
output = [str(self.forwardingDetail)]
if self.routes:
output.append(
"Routes: " + "[" + ",".join([str(route) for route in self.routes]) + "]"
)
return ", ".join(output)
[docs]@attr.s(frozen=True)
class SetupSessionStepDetail(DataModelElement):
"""Details of a step for when a firewall session is created.
:ivar sessionScope: Scope of flows session can match (incoming interfaces or originating VRF)
:ivar sessionAction: A SessionAction that the firewall takes for a return traffic matching the session
:ivar matchCriteria: A SessionMatchExpr that describes the match criteria of the session
:ivar transformation: List of FlowDiffs that will be applied on the return traffic matching the session
"""
sessionScope = attr.ib(type=SessionScope)
sessionAction = attr.ib(type=SessionAction)
matchCriteria = attr.ib(type=SessionMatchExpr)
transformation = attr.ib(type=Optional[List[FlowDiff]], factory=list)
@classmethod
def from_dict(cls, json_dict):
# type: (Dict) -> SetupSessionStepDetail
# backward compatibility: if sessionScope is missing, look for
# incomingInterfaces instead
if "sessionScope" in json_dict:
sessionScope = SessionScope.from_dict(json_dict.get("sessionScope", {}))
else:
sessionScope = IncomingSessionScope.from_dict(json_dict)
return SetupSessionStepDetail(
sessionScope,
SessionAction.from_dict(json_dict.get("sessionAction", {})),
SessionMatchExpr.from_dict(json_dict.get("matchCriteria", {})),
[FlowDiff.from_dict(diff) for diff in json_dict.get("transformation", [])],
)
def __str__(self):
# type: () -> str
strings = [
"{}".format(self.sessionScope),
"Action: {}".format(self.sessionAction),
"Match Criteria: {}".format(self.matchCriteria),
]
if self.transformation:
strings.append(
"Transformation: [{}]".format(", ".join(map(str, self.transformation)))
)
return ", ".join(strings)
[docs]@attr.s(frozen=True)
class FilterStepDetail(DataModelElement):
"""Details of a step representing a filter step.
:ivar filter: filter name
:ivar type: filter type
:ivar inputInterface: input interface of the flow
:ivar flow: current flow
"""
filter = attr.ib(type=str)
filterType = attr.ib(type=str)
inputInterface = attr.ib(type=str)
flow = attr.ib(type=Optional[Flow])
@classmethod
def from_dict(cls, json_dict):
# type: (Dict) -> FilterStepDetail
flowObj = json_dict.get("flow", {})
return FilterStepDetail(
json_dict.get("filter", ""),
json_dict.get("type", ""),
json_dict.get("inputInterface", ""),
Flow.from_dict(flowObj) if flowObj else None,
)
def __str__(self):
# type: () -> str
return "{} ({})".format(self.filter, self.filterType)
@attr.s(frozen=True)
class PolicyStepDetail(DataModelElement):
"""Details of a step representing a generic policy processing step
(e.g., PBR or equivalent).
:ivar policy: policy name
:ivar type: filter type
"""
policy = attr.ib(type=str)
@classmethod
def from_dict(cls, json_dict: Dict[str, Any]) -> "PolicyStepDetail":
return PolicyStepDetail(json_dict.get("policy", ""))
def __str__(self) -> str:
return "{}".format(self.policy)
@attr.s(frozen=True)
class Step(DataModelElement):
"""Represents a step in a hop.
:ivar detail: Details about the step
:ivar action: Action taken in this step
"""
detail = attr.ib(type=Any)
action = attr.ib(type=str, converter=str)
@classmethod
def from_dict(cls, json_dict):
# type: (Dict) -> Optional[Step]
from_dicts = {
"ArpError": ArpErrorStepDetail.from_dict,
"Delivered": DeliveredStepDetail.from_dict,
"EnterInputInterface": EnterInputIfaceStepDetail.from_dict,
"ExitOutputInterface": ExitOutputIfaceStepDetail.from_dict,
"Inbound": InboundStepDetail.from_dict,
"Loop": LoopStepDetail.from_dict,
"MatchSession": MatchSessionStepDetail.from_dict,
"Originate": OriginateStepDetail.from_dict,
"Routing": RoutingStepDetail.from_dict,
"SetupSession": SetupSessionStepDetail.from_dict,
"Transformation": TransformationStepDetail.from_dict,
"Policy": PolicyStepDetail.from_dict,
"Filter": FilterStepDetail.from_dict,
}
action = json_dict.get("action")
detail = json_dict.get("detail", {})
type = json_dict.get("type")
if type not in from_dicts:
return None
else:
return Step(from_dicts[type](detail), action)
def __str__(self):
# type: () -> str
action_str = str(self.action)
detail_str = str(self.detail) if self.detail else None
if detail_str:
return "{}({})".format(action_str, detail_str)
else:
return action_str
def _repr_html_(self):
# type: () -> str
return str(self)
[docs]@attr.s(frozen=True)
class Hop(DataModelElement):
"""A single hop in a flow trace.
:ivar node: Name of node considered as the Hop
:ivar steps: List of steps taken at this Hop
"""
node = attr.ib(type=str)
steps = attr.ib(type=List[Step])
@classmethod
def from_dict(cls, json_dict):
# type: (Dict) -> Hop
steps = [] # type: List[Step]
for step in json_dict["steps"]:
step_obj = Step.from_dict(step)
if step_obj is not None:
steps.append(step_obj)
return Hop(json_dict.get("node", {}).get("name"), steps)
def __len__(self):
return len(self.steps)
def __getitem__(self, item):
return self.steps[item]
def __str__(self):
# type: () -> str
return "node: {node}\n {steps}".format(
node=self.node, steps="\n ".join(map(str, self.steps))
)
def _repr_html_(self):
# type: () -> str
return "node: {node}<br> {steps}".format(
node=self.node,
steps="<br> ".join([step._repr_html_() for step in self.steps]),
)
@staticmethod
def _get_routes_data(routes):
# type: (List[Dict]) -> List[str]
routes_str = [] # type: List[str]
for route in routes:
routes_str.append(
"{protocol} [Network: {network}, Next Hop IP:{next_hop_ip}]".format(
protocol=route.get("protocol"),
network=route.get("network"),
next_hop_ip=route.get("nextHopIp"),
)
)
return routes_str
[docs]@attr.s(frozen=True)
class Trace(DataModelElement):
"""A trace of a flow through the network.
A Trace is a combination of hops and flow fate (i.e., disposition).
:ivar disposition: Flow disposition
:ivar hops: A list of hops (:py:class:`Hop`) the flow took
"""
disposition = attr.ib(type=str)
hops = attr.ib(type=List[Hop])
@classmethod
def from_dict(cls, json_dict):
# type: (Dict) -> Trace
return Trace(
json_dict["disposition"],
[Hop.from_dict(hop) for hop in json_dict.get("hops", [])],
)
def __len__(self):
return len(self.hops)
def __getitem__(self, item):
return self.hops[item]
def __str__(self):
# type: () -> str
return "{disposition}\n{hops}".format(
disposition=self.disposition,
hops="\n".join(
[
"{num}. {hop}".format(num=num, hop=hop)
for num, hop in enumerate(self.hops, start=1)
]
),
)
def _repr_html_(self):
# type: () -> str
disposition_span = '<span style="color:{color}; text-weight:bold;">{disposition}</span>'.format(
color=_get_color_for_disposition(self.disposition),
disposition=self.disposition,
)
return "{disposition_span}<br>{hops}".format(
disposition_span=disposition_span,
hops="<br>".join(
[
"<strong>{num}</strong>. {hop}".format(
num=num, hop=hop._repr_html_()
)
for num, hop in enumerate(self.hops, start=1)
]
),
)
[docs]@attr.s(frozen=True)
class TcpFlags(DataModelElement):
"""
Represents a set of TCP flags in a packet.
:ivar ack:
:ivar cwr:
:ivar ece:
:ivar fin:
:ivar psh:
:ivar rst:
:ivar syn:
:ivar urg:
"""
ack = attr.ib(default=False, type=bool)
cwr = attr.ib(default=False, type=bool)
ece = attr.ib(default=False, type=bool)
fin = attr.ib(default=False, type=bool)
psh = attr.ib(default=False, type=bool)
rst = attr.ib(default=False, type=bool)
syn = attr.ib(default=False, type=bool)
urg = attr.ib(default=False, type=bool)
@classmethod
def from_dict(cls, json_dict):
return TcpFlags(
ack=json_dict["ack"],
cwr=json_dict["cwr"],
ece=json_dict["ece"],
fin=json_dict["fin"],
psh=json_dict["psh"],
rst=json_dict["rst"],
syn=json_dict["syn"],
urg=json_dict["urg"],
)
[docs]@attr.s(frozen=True)
class MatchTcpFlags(DataModelElement):
"""
Match given :py:class:`TcpFlags`.
For each bit in the TCP flags, a `useX`
must be set to true, otherwise the bit is treated as "don't care".
:ivar tcpFlags: tcp flags to match
:ivar useAck:
:ivar useCwr:
:ivar useEce:
:ivar useFin:
:ivar usePsh:
:ivar useRst:
:ivar useSyn:
:ivar useUrg:
"""
tcpFlags = attr.ib(type=TcpFlags)
useAck = attr.ib(default=True, type=bool)
useCwr = attr.ib(default=True, type=bool)
useEce = attr.ib(default=True, type=bool)
useFin = attr.ib(default=True, type=bool)
usePsh = attr.ib(default=True, type=bool)
useRst = attr.ib(default=True, type=bool)
useSyn = attr.ib(default=True, type=bool)
useUrg = attr.ib(default=True, type=bool)
@classmethod
def from_dict(cls, json_dict):
return MatchTcpFlags(
TcpFlags.from_dict(json_dict["tcpFlags"]),
json_dict["useAck"],
json_dict["useCwr"],
json_dict["useEce"],
json_dict["useFin"],
json_dict["usePsh"],
json_dict["useRst"],
json_dict["useSyn"],
json_dict["useUrg"],
)
[docs] @staticmethod
def match_ack():
# type: () -> MatchTcpFlags
"""Return match conditions checking that ACK bit is set.
Other bits may take any value.
"""
return MatchTcpFlags(TcpFlags(ack=True), useAck=True)
[docs] @staticmethod
def match_rst():
# type: () -> MatchTcpFlags
"""Return match conditions checking that RST bit is set.
Other bits may take any value.
"""
return MatchTcpFlags(TcpFlags(rst=True), useRst=True)
[docs] @staticmethod
def match_syn():
# type: () -> MatchTcpFlags
"""Return match conditions checking that the SYN bit is set.
Other bits may take any value.
"""
return MatchTcpFlags(TcpFlags(syn=True), useSyn=True)
[docs] @staticmethod
def match_synack():
# type: () -> MatchTcpFlags
"""Return match conditions checking that both the SYN and ACK bits are set.
Other bits may take any value.
"""
return MatchTcpFlags(TcpFlags(ack=True, syn=True), useAck=True, useSyn=True)
[docs] @staticmethod
def match_established():
# type: () -> List[MatchTcpFlags]
"""Return a list of match conditions matching an established flow (ACK or RST bit set).
Other bits may take any value.
"""
return [MatchTcpFlags.match_ack(), MatchTcpFlags.match_rst()]
[docs] @staticmethod
def match_not_established():
# type: () -> List[MatchTcpFlags]
"""Return a list of match conditions matching a non-established flow.
Meaning both ACK and RST bits are unset.
Other bits may take any value.
"""
return [
MatchTcpFlags(
useAck=True, useRst=True, tcpFlags=TcpFlags(ack=False, rst=False)
)
]
def _get_color_for_disposition(disposition):
# type: (str) -> str
success_dispositions = {"ACCEPTED", "DELIVERED_TO_SUBNET", "EXITS_NETWORK"}
if disposition in success_dispositions:
return "#019612"
else:
return "#7c020e"
def _normalize_phc_intspace(value):
# type: (Any) -> Optional[Text]
if value is None or isinstance(value, str):
return value
if isinstance(value, int):
return str(value)
if isinstance(value, Iterable):
result = ",".join(str(v) for v in value)
return result
raise ValueError("Invalid value {}".format(value))
def _normalize_phc_list(value):
# type: (Any) -> Optional[List[Text]]
if value is None or isinstance(value, list):
return value
elif isinstance(value, str):
# only collect truthy values
alist = [v for v in [v.strip() for v in value.split(",")] if v]
if not alist:
# reject empty list values
raise ValueError("Invalid value {}".format(value))
return alist
raise ValueError("Invalid value {}".format(value))
def _normalize_phc_tcpflags(value):
# type: (Any) -> Optional[List[MatchTcpFlags]]
if value is None or isinstance(value, list):
return value
elif isinstance(value, MatchTcpFlags):
return [value]
raise ValueError("Invalid value {}".format(value))
def _normalize_phc_strings(value):
# type: (Any) -> Optional[Text]
if value is None or isinstance(value, str):
return value
if isinstance(value, Iterable):
result = ",".join(value) # type: Text
return result
raise ValueError("Invalid value {}".format(value))
[docs]@attr.s(frozen=True)
class PathConstraints(DataModelElement):
"""
Constraints on the path of a flow.
:ivar startLocation: Location specification for where a flow is allowed to start
:ivar endLocation: Node specification for where a flow is allowed to terminate
:ivar transitLocations: Node specification for where a flow must transit
:ivar forbiddenLocations: Node specification for where a flow is *not* allowed to transit
"""
startLocation = attr.ib(default=None, type=Optional[str])
endLocation = attr.ib(default=None, type=Optional[str])
transitLocations = attr.ib(default=None, type=Optional[str])
forbiddenLocations = attr.ib(default=None, type=Optional[str])
@classmethod
def from_dict(cls, json_dict):
return PathConstraints(
startLocation=json_dict.get("startLocation"),
endLocation=json_dict.get("endLocation"),
transitLocations=json_dict.get("transitLocations"),
forbiddenLocations=json_dict.get("forbiddenLocations"),
)