Skip to content

dataflow_view

This module defines the collectors for the DataFlowDiagram.

COLLECTORS: dict[modeltypes.DiagramType, cabc.Callable] = {modeltypes.DiagramType.OAIB: collector_portless, modeltypes.DiagramType.SDFB: collector_default} module-attribute 🔗

Collector registry.

collector(diagram, params, exchange_filter=only_involved) 🔗

Main collector that calls either default or portless collectors.

Source code in capellambse_context_diagrams/collectors/dataflow_view.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def collector(
    diagram: context.ContextDiagram,
    params: dict[str, t.Any],
    exchange_filter: cabc.Callable[
        [
            cabc.Iterable[fa.FunctionalExchange],
            cabc.Iterable[fa.FunctionalExchange],
            tuple[str, str],
        ],
        cabc.Iterable[fa.FunctionalExchange],
    ] = only_involved,
) -> _elkjs.ELKInputData:
    """Main collector that calls either default or portless collectors."""
    return COLLECTORS[diagram.type](diagram, params, exchange_filter)

collector_default(diagram, params, exchange_filter, attribute='involved_functions') 🔗

Collector for all other layers than operational architecture.

Source code in capellambse_context_diagrams/collectors/dataflow_view.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def collector_default(
    diagram: context.ContextDiagram,
    params: dict[str, t.Any],
    exchange_filter: cabc.Callable[
        [
            cabc.Iterable[fa.FunctionalExchange],
            cabc.Iterable[fa.FunctionalExchange],
            tuple[str, str],
        ],
        cabc.Iterable[fa.FunctionalExchange],
    ],
    attribute: str = "involved_functions",
) -> _elkjs.ELKInputData:
    """Collector for all other layers than operational architecture."""
    data = makers.make_diagram(diagram)
    functions = getattr(diagram.target, attribute)
    filter = functools.partial(
        exchange_filter,
        functions=functions,
        attributes=("source.owner", "target.owner"),
    )
    made_edges: set[str] = set()
    for fnc in functions:
        data["children"].append(fnc_box := makers.make_box(fnc))
        _ports = default.port_collector(fnc, diagram.type)
        connections = default.port_exchange_collector(_ports, filter=filter)
        in_ports: dict[str, fa.FunctionPort] = {}
        out_ports: dict[str, fa.FunctionPort] = {}
        for edge in (edges := list(chain.from_iterable(connections.values()))):
            if edge.source.owner == fnc:
                out_ports.setdefault(edge.source.uuid, edge.source)
            else:
                in_ports.setdefault(edge.target.uuid, edge.target)

        fnc_box["ports"] = [
            makers.make_port(i.uuid) for i in (in_ports | out_ports).values()
        ]
        fnc_box["height"] += (
            makers.PORT_SIZE + 2 * makers.PORT_PADDING
        ) * max(len(in_ports), len(out_ports))

        ex_datas: list[generic.ExchangeData] = []
        for ex in edges:
            if ex.uuid in made_edges:
                continue

            ex_data = generic.ExchangeData(ex, data, diagram.filters, params)
            generic.exchange_data_collector(ex_data)
            made_edges.add(ex.uuid)
            ex_datas.append(ex_data)
    return data

collector_portless(diagram, params, exchange_filter, attribute='involved_activities') 🔗

Collector function for the operational layer.

Source code in capellambse_context_diagrams/collectors/dataflow_view.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def collector_portless(
    diagram: context.ContextDiagram,
    params: dict[str, t.Any],
    exchange_filter: cabc.Callable[
        [
            cabc.Iterable[fa.FunctionalExchange],
            cabc.Iterable[fa.FunctionalExchange],
            tuple[str, str],
        ],
        cabc.Iterable[fa.FunctionalExchange],
    ],
    attribute: str = "involved_activities",
) -> _elkjs.ELKInputData:
    """Collector function for the operational layer."""
    data = makers.make_diagram(diagram)
    activities = getattr(diagram.target, attribute)
    filter = functools.partial(
        exchange_filter,
        functions=activities,
        attributes=("source", "target"),
    )
    made_edges: set[str] = set()
    for act in activities:
        data["children"].append(act_box := makers.make_box(act))
        connections = list(portless.get_exchanges(act, filter=filter))

        in_act: dict[str, oa.OperationalActivity] = {}
        out_act: dict[str, oa.OperationalActivity] = {}
        for edge in connections:
            if edge.source == act:
                out_act.setdefault(edge.source.uuid, edge.source)
            else:
                in_act.setdefault(edge.target.uuid, edge.target)

        act_box["height"] += (
            makers.PORT_SIZE + 2 * makers.PORT_PADDING
        ) * max(len(in_act), len(out_act))

        ex_datas: list[generic.ExchangeData] = []
        for ex in connections:
            if ex.uuid in made_edges:
                continue

            ex_data = generic.ExchangeData(ex, data, diagram.filters, params)
            generic.exchange_data_collector(ex_data)
            made_edges.add(ex.uuid)
            ex_datas.append(ex_data)

    return data

only_involved(exchanges, functions, attributes) 🔗

Exchange filter function for collecting edges.

Source code in capellambse_context_diagrams/collectors/dataflow_view.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def only_involved(
    exchanges: cabc.Iterable[fa.FunctionalExchange],
    functions: cabc.Iterable[fa.FunctionalExchange],
    attributes: tuple[str, str],
) -> cabc.Iterable[fa.FunctionalExchange]:
    """Exchange filter function for collecting edges."""
    src_attr, trg_attr = attributes
    src_getter = operator.attrgetter(src_attr)
    trg_getter = operator.attrgetter(trg_attr)
    return [
        ex
        for ex in exchanges
        if src_getter(ex) in functions and trg_getter(ex) in functions
    ]