Skip to content

API Reference¤

This document is for developers of recommonmark, it contans the API functions

---
CLASS NAME
---
`Class Name`
`Function Name`
args:
return:

hal¤


a hardware abstraction layer (HAL) is a layer of programming that allows a computer operating system to interact with a hardware device at a general or abstract level rather than at a detailed hardware level.

In general, the HAL is the layer of a programming that is close to the physical hardware, but allows a device driver to be written for a specific hardware device. The HAL provides a consistent interface for hardware components, and provides a layer of protection between the operating system and the hardware.

PLC¤

PLC ¤

Hardware abstraction class for reading and writing variables from and to PLC.

Source code in src/dcs_dev/hal/plc.py
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
@define
class PLC:
    """Hardware abstraction class for reading and writing variables from and to PLC."""

    netid: str = field(validator=validators.instance_of(str))
    ip: str

    plc_vars_input: List = field(factory=list)
    plc_vars_output: List = field(factory=list)

    connection: Any = field(default=None)
    lock_dict: Lock = field(factory=Lock)
    lock_ads: Lock = field(factory=Lock)

    def __attrs_post_init__(self):
        self.connection = pyads.Connection(self.netid, pyads.PORT_TC3PLC1)

    def close(self):
        """Close the connection to the PLC."""
        if self.connection.is_open:
            self.connection.close()
        print("PLC connection closed")

    def connect(self) -> bool:
        """Connect to the PLC."""
        with self.lock_ads:
            if not self.connection.is_open:
                self.connection.open()
            try:
                self.connection.read_device_info()
            except pyads.ADSError as e:
                print(f"Error: {e}")
                return False
            else:
                print(f"Connection: {self.connection.is_open}")
                return True

    def set_plc_vars_input_list(self, plc_vars_input: List):
        """load the machine variables from the plc"""
        if not self.plc_vars_input:
            self.plc_vars_input = [vars for vars in plc_vars_input]
        else:
            self.plc_vars_input.extend([vars for vars in plc_vars_input])

    def set_plc_vars_output_list(self, plc_vars_output: List):
        """load the machine variables from the plc"""
        if not self.plc_vars_output:
            self.plc_vars_output = [vars for vars in plc_vars_output]
        else:
            self.plc_vars_output.extend([vars for vars in plc_vars_output])

    def read_variables(self):
        """Reads all structs from PLC and stores inside class."""
        if not self.connect():
            raise AdsConnectionError(
                "Could not read variable from PLC, PLC connection failed."
            )
        with self.lock_ads:
            raise NotImplementedError

    def write_variables(self):
        """Writes all variables that have been set."""
        if not self.connect():
            raise AdsConnectionError(
                "Could not read variable from PLC, PLC connection failed."
            )
        with self.lock_ads:
            raise NotImplementedError

    def check_variables_active(self):
        raise NotImplementedError

    def get_variable(self, variable_name: str) -> Any:
        """Get a variable from the PLC."""
        with self.lock_dict:
            for data in chain(self.plc_vars_output, self.plc_vars_input):
                if data.active != "false" and variable_name == str(data.var_name_IN):
                    try:
                        value = self.connection.read_by_name(data.var_name_IN)
                        print(f"Variable {variable_name}:{value} read from plc.")
                        return value
                    except KeyError:
                        error_msg = f"Error{variable_name}, Error number: {data.id}"
                        raise VariableNotFoundInRepositoryError(error_msg)

    def set_variable(self, variable_name: str, value: Any) -> Any:
        """Get a variable from the PLC."""
        with self.lock_dict:
            for data in self.plc_vars_input:
                if data.active != "false" and variable_name == str(data.var_name):
                    try:
                        value = self.connection.write_by_name(data.var_name_IN, value)
                        print(f"Variable {variable_name}:{value} write to plc.")
                        return value
                    except KeyError:
                        error_msg = f"Error{variable_name}, Error number: {data.id}"
                        raise VariableNotFoundInRepositoryError(error_msg)
close() ¤

Close the connection to the PLC.

Source code in src/dcs_dev/hal/plc.py
24
25
26
27
28
def close(self):
    """Close the connection to the PLC."""
    if self.connection.is_open:
        self.connection.close()
    print("PLC connection closed")
connect() ¤

Connect to the PLC.

Source code in src/dcs_dev/hal/plc.py
30
31
32
33
34
35
36
37
38
39
40
41
42
def connect(self) -> bool:
    """Connect to the PLC."""
    with self.lock_ads:
        if not self.connection.is_open:
            self.connection.open()
        try:
            self.connection.read_device_info()
        except pyads.ADSError as e:
            print(f"Error: {e}")
            return False
        else:
            print(f"Connection: {self.connection.is_open}")
            return True
get_variable(variable_name) ¤

Get a variable from the PLC.

Source code in src/dcs_dev/hal/plc.py
79
80
81
82
83
84
85
86
87
88
89
90
def get_variable(self, variable_name: str) -> Any:
    """Get a variable from the PLC."""
    with self.lock_dict:
        for data in chain(self.plc_vars_output, self.plc_vars_input):
            if data.active != "false" and variable_name == str(data.var_name_IN):
                try:
                    value = self.connection.read_by_name(data.var_name_IN)
                    print(f"Variable {variable_name}:{value} read from plc.")
                    return value
                except KeyError:
                    error_msg = f"Error{variable_name}, Error number: {data.id}"
                    raise VariableNotFoundInRepositoryError(error_msg)
read_variables() ¤

Reads all structs from PLC and stores inside class.

Source code in src/dcs_dev/hal/plc.py
58
59
60
61
62
63
64
65
def read_variables(self):
    """Reads all structs from PLC and stores inside class."""
    if not self.connect():
        raise AdsConnectionError(
            "Could not read variable from PLC, PLC connection failed."
        )
    with self.lock_ads:
        raise NotImplementedError
set_plc_vars_input_list(plc_vars_input) ¤

load the machine variables from the plc

Source code in src/dcs_dev/hal/plc.py
44
45
46
47
48
49
def set_plc_vars_input_list(self, plc_vars_input: List):
    """load the machine variables from the plc"""
    if not self.plc_vars_input:
        self.plc_vars_input = [vars for vars in plc_vars_input]
    else:
        self.plc_vars_input.extend([vars for vars in plc_vars_input])
set_plc_vars_output_list(plc_vars_output) ¤

load the machine variables from the plc

Source code in src/dcs_dev/hal/plc.py
51
52
53
54
55
56
def set_plc_vars_output_list(self, plc_vars_output: List):
    """load the machine variables from the plc"""
    if not self.plc_vars_output:
        self.plc_vars_output = [vars for vars in plc_vars_output]
    else:
        self.plc_vars_output.extend([vars for vars in plc_vars_output])
set_variable(variable_name, value) ¤

Get a variable from the PLC.

Source code in src/dcs_dev/hal/plc.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def set_variable(self, variable_name: str, value: Any) -> Any:
    """Get a variable from the PLC."""
    with self.lock_dict:
        for data in self.plc_vars_input:
            if data.active != "false" and variable_name == str(data.var_name):
                try:
                    value = self.connection.write_by_name(data.var_name_IN, value)
                    print(f"Variable {variable_name}:{value} write to plc.")
                    return value
                except KeyError:
                    error_msg = f"Error{variable_name}, Error number: {data.id}"
                    raise VariableNotFoundInRepositoryError(error_msg)
write_variables() ¤

Writes all variables that have been set.

Source code in src/dcs_dev/hal/plc.py
67
68
69
70
71
72
73
74
def write_variables(self):
    """Writes all variables that have been set."""
    if not self.connect():
        raise AdsConnectionError(
            "Could not read variable from PLC, PLC connection failed."
        )
    with self.lock_ads:
        raise NotImplementedError

Device¤

ConcretePump dataclass ¤

Bases: Machine

ConcretePump is a class that represents the Concrete Pump machine.

Source code in src/dcs_dev/hal/device.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
@dataclass
class ConcretePump(Machine):
    """
    ConcretePump is a class that represents the Concrete Pump machine.

    """
    machine_id: int
    machine_input: List
    machine_output: List
    machine_error_num: int = 0

    def device_id(self) -> int:
        return self.machine_id

    def parameter_id(self, param_name:str) -> int:
        for params in [self.machine_input, self.machine_output]:
            for param in params:
                if param_name == param.var_name:
                    return param.id if param.id is not None else 0

    def input_list(self) -> List[object]:
        return self.machine_input

    def output_list(self) -> List[object]:
        return self.machine_output

    def get_input_var_name(self) -> Any:
        for input in self.machine_input:
            yield input.var_name

    def get_output_var_name(self) -> Any:
        for output in self.machine_output:
            yield output.var_name

    def set_input_dict(self):
        for input in self.machine_input:
            if input.active:
                yield {input.var_name : [input.var_name_IN, "pyads."+ input.data_type, 1]}

    def set_output_dict(self):
        for output in self.machine_output:
            if output.active:
                yield {output.var_name : [output.var_name_IN, "pyads."+ output.data_type, 1]}

    def __str__(self) -> str:
        return f"Machine ID: {self.machine_id}, Machine Input: {self.input_list}, Machine Output: {self.output_list}"

Controller dataclass ¤

Bases: Machine

Controller is a class that represents the parameters in controller.

Source code in src/dcs_dev/hal/device.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
@dataclass
class Controller(Machine):
    """
    Controller is a class that represents the parameters in controller.

    """
    machine_id: int
    machine_input: List
    machine_output: List
    machine_error_num: int = 0

    def device_id(self) -> int:
        return self.machine_id

    def parameter_id(self, param_name:str) -> int:
        for params in [self.machine_input, self.machine_output]:
            for param in params:
                if param_name == param.var_name:
                    return param.id if param.id is not None else 0

    def input_list(self) -> List[object]:
        return self.machine_input

    def output_list(self) -> List[object]:
        return self.machine_output

    def get_input_var_name(self) -> Any:
        for input in self.machine_input:
            yield input.var_name

    def get_output_var_name(self) -> Any:
        for output in self.machine_output:
            yield output.var_name

    def set_input_dict(self):
        for input in self.machine_input:
            if input.active:
                yield {input.var_name : [input.var_name_IN, "pyads."+ input.data_type, 1]}

    def set_output_dict(self):
        for output in self.machine_output:
            if output.active:
                yield {output.var_name : [output.var_name_IN, "pyads."+ output.data_type, 1]}

    def __str__(self) -> str:
        return f"Machine ID: {self.machine_id}, Machine Input: {self.input_list}, Machine Output: {self.output_list}"

DosingPumpHigh dataclass ¤

Bases: Machine

DosingPumpHigh is a class that represents the Dosing pump machine with high dosing.

Source code in src/dcs_dev/hal/device.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
@dataclass
class DosingPumpHigh(Machine):
    """
    DosingPumpHigh is a class that represents the Dosing pump machine with high dosing.

    """
    machine_id: int
    machine_input: List
    machine_output: List
    machine_error_num: int = 0

    def device_id(self) -> int:
        return self.machine_id

    def parameter_id(self, param_name:str) -> int:
        for params in [self.machine_input, self.machine_output]:
            for param in params:
                if param_name == param.var_name:
                    return param.id if param.id is not None else 0

    def input_list(self) -> List[object]:
        return self.machine_input

    def output_list(self) -> List[object]:
        return self.machine_output

    def get_input_var_name(self) -> Any:
        for input in self.machine_input:
            yield input.var_name

    def get_output_var_name(self) -> Any:
        for output in self.machine_output:
            yield output.var_name

    def set_input_dict(self):
        for input in self.machine_input:
            if input.active:
                yield {input.var_name : [input.var_name_IN, "pyads."+ input.data_type, 1]}

    def set_output_dict(self):
        for output in self.machine_output:
            if output.active:
                yield {output.var_name : [output.var_name_IN, "pyads."+ output.data_type, 1]}

    def __str__(self) -> str:
        return f"Machine ID: {self.machine_id}, Machine Input: {self.input_list}, Machine Output: {self.output_list}"

DosingPumpLow dataclass ¤

Bases: Machine

DosingPumpLow is a class that represents the Dosing pump machine with low dosing.

Source code in src/dcs_dev/hal/device.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
@dataclass
class DosingPumpLow(Machine):
    """
    DosingPumpLow is a class that represents the Dosing pump machine with low dosing.

    """
    machine_id: int
    machine_input: List
    machine_output: List
    machine_error_num: int = 0

    def device_id(self) -> int:
        return self.machine_id

    def parameter_id(self, param_name:str) -> int:
        for params in [self.machine_input, self.machine_output]:
            for param in params:
                if param_name == param.var_name:
                    return param.id if param.id is not None else 0

    def input_list(self) -> List[object]:
        return self.machine_input

    def output_list(self) -> List[object]:
        return self.machine_output

    def get_input_var_name(self) -> Any:
        for input in self.machine_input:
            yield input.var_name

    def get_output_var_name(self) -> Any:
        for output in self.machine_output:
            yield output.var_name

    def set_input_dict(self):
        for input in self.machine_input:
            if input.active:
                yield {input.var_name : [input.var_name_IN, "pyads."+ input.data_type, 1]}

    def set_output_dict(self):
        for output in self.machine_output:
            if output.active:
                yield {output.var_name : [output.var_name_IN, "pyads."+ output.data_type, 1]}

    def __str__(self) -> str:
        return f"Machine ID: {self.machine_id}, Machine Input: {self.input_list}, Machine Output: {self.output_list}"

InlineMixer dataclass ¤

Bases: Machine

InlineMixer is a class that represents the Inline Mixer machine.

Source code in src/dcs_dev/hal/device.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@dataclass
class InlineMixer(Machine):
    """
    InlineMixer is a class that represents the Inline Mixer machine.

    """
    machine_id: int
    machine_input: List
    machine_output: List
    machine_error_num: int = 0

    def device_id(self) -> int:
        return self.machine_id

    def parameter_id(self, param_name:str) -> int:
        for params in [self.machine_input, self.machine_output]:
            for param in params:
                if param_name == param.var_name:
                    return param.id if param.id is not None else 0

    def input_list(self) -> List[object]:
        return self.machine_input

    def output_list(self) -> List[object]:
        return self.machine_output

    def get_input_var_name(self) -> Any:
        for input in self.machine_input:
            yield input.var_name

    def get_output_var_name(self) -> Any:
        for output in self.machine_output:
            yield output.var_name

    def set_input_dict(self):
        for input in self.machine_input:
            if input.active:
                yield {input.var_name : [input.var_name_IN, "pyads."+ input.data_type, 1]}

    def set_output_dict(self):
        for output in self.machine_output:
            if output.active:
                yield {output.var_name : [output.var_name_IN, "pyads."+ output.data_type, 1]}

    def __str__(self) -> str:
        return f"Machine ID: {self.machine_id}, Machine Input: {self.input_list}, Machine Output: {self.output_list}"

Interface¤

DeviceStruct dataclass ¤

define the paramters info from json.

Source code in src/dcs_dev/hal/interface.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
@dataclass(slots=True)
class DeviceStruct:
    """define the paramters info from json."""

    id: str = ""
    var_name: str = ""
    var_name_IN: str = ""
    type: str = ""
    active: bool = False

    def _to_dict(self) -> dict:
        return dict(asdict(self).items())

abb_rob¤


AbbConfig¤

a class to load the configuration parameters via given path json.

Source code in src/dcs_dev/abb_rob/abb_config.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class AbbConfig():
    """ a class to load the configuration parameters via given path json."""

    def __init__(self):

        config = self._load_from_json()
        self.TIMEOUT = int(config["TIMEOUT"])  #[s]    avoid freezing the main thread forever in case controller (virtual or real) is not available.
        self.TIMEOUT_LONG = int(config["TIMEOUT_LONG"])  #[s]    avoid freezing the main thread forever, for time consuming processes (slow motions)


    def _load_from_json(self):
        """ Load the configuration parameters from a json file."""

        dir_path = os.path.dirname(os.path.realpath(__file__))
        filename = os.path.join(dir_path, ".." ,"config", "setup.json")
        with open(filename, 'r') as f:
            try:
                output = json.load(f)
            except ValueError:
                print('Decoding JSON has failed')
        return output

_load_from_json() ¤

Load the configuration parameters from a json file.

Source code in src/dcs_dev/abb_rob/abb_config.py
16
17
18
19
20
21
22
23
24
25
26
def _load_from_json(self):
    """ Load the configuration parameters from a json file."""

    dir_path = os.path.dirname(os.path.realpath(__file__))
    filename = os.path.join(dir_path, ".." ,"config", "setup.json")
    with open(filename, 'r') as f:
        try:
            output = json.load(f)
        except ValueError:
            print('Decoding JSON has failed')
    return output

DcsRosClient¤

Source code in src/dcs_dev/abb_rob/ros_client.py
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
class DcsRosClient:
    def __init__(self):
        self._ros = None
        self._abb = None

    def _init_ros_client(self) -> None:
        self.ros = rrc.RosClient()
        self.ros.run()
        self.abb = rrc.AbbClient(self.ros, "/rob1")

        print("Connected:", self.ros.is_connected)
        self.abb.send_and_wait(rrc.PrintText("Wellcome to digital casting sytsem"))

    def _close_ros_client(self) -> None:
        self.ros.close()
        self.ros.terminate()

        print("Connected:", self.ros.is_connected)
        self.abb.send_and_wait(rrc.PrintText("Disconected to ROS"))

    # IO functions
    def _set_digital_output(self, io_name: str, value: int) -> None:
        set_do = self.abb.send_and_wait(rrc.SetDigital(io_name, value))
        print(f"{io_name} is set to {value}")

    def _get_digital_input(self, io_name: str) -> None:
        get_di = self.abb.send_and_wait(rrc.ReadDigital(io_name))
        print(f"{io_name} is {get_di}")

    def _set_group_output(self, io_name: str, value: int) -> None:
        get_go = self.abb.send_and_wait(rrc.SetGroup(io_name, value))
        print(f"{io_name} is {value}")

    def _get_group_input(self, io_name: str) -> None:
        get_gi = self.abb.send_and_wait(rrc.ReadGroup(io_name))
        print(f"{io_name} is {get_gi}")

    def _get_analog_output(self, io_name: str, value: int) -> None:
        get_ao = self.abb.send_and_wait(rrc.SetAnalog(io_name, value))
        print(f"{io_name} is {get_ao}")

    def _get_analog_input(self, io_name: str) -> None:
        get_ai = self.abb.send_and_wait(rrc.ReadAnalog(io_name))
        print(f"{io_name} is {get_ai}")

    # movement functions
    def _move_to_frame(self, frame, speed: int, zone: int) -> None:
        move_to_frame = self.abb.send(rrc.MoveToFrame(frame, speed, zone, rrc.Motion.LINEAR))
        print(f"Robot is moving to {frame}")

    def _move_to_robotarget(self):
        raise NotImplementedError

    def _move_to_joints(self, joints: list, external_axes, speed: int, zone: int) -> None:
        move_to_joints = self.abb.send(rrc.MoveToJoints(joints, external_axes, speed, zone))
        print(f"Robot is moving to {joints}")

    def _wait(self, time:int) -> None:
        self.abb.send(rrc.WaitTime(time))

    # Robot config

    def _set_move_zone(self, zone: int) -> None:
        raise NotImplementedError

    def _set_acceleration(self, acc: int, ramp: int) -> None:
        set_acceleration = self.abb.send(rrc.SetAcceleration(acc, ramp))

    def _set_max_speed(self, overide: int, max_tcp: int) -> None:
        """
        override: Unit [%]
        max_tcp: Unit [mm/s]
        """
        set_max_speed = self.abb.send(rrc.SetMaxSpeed(overide, max_tcp))

    def _set_tool(self, tool_name: str) -> None:
        self.abb.send(rrc.SetTool(tool_name))
        print(f"Tool is set to {tool_name}")

    def _get_tool(self):
        raise NotImplementedError

    def _set_workobject(self, workobject: str) ->None:
        self.abb.send(rrc.SetWorkObject(workobject))
        print(f"Workobject is set to {workobject}")

    def _get_workobject(self):
        raise NotImplementedError

    def _get_robotarget(self) -> tuple:
        frame, external_axes = self.abb.send_and_wait(rrc.GetRobtarget())
        return frame, external_axes

    def _print_text(self, text: str) -> None:
        self.abb.send(rrc.PrintText(text))
        print(text)

_set_max_speed(overide, max_tcp) ¤

override: Unit [%] max_tcp: Unit [mm/s]

Source code in src/dcs_dev/abb_rob/ros_client.py
71
72
73
74
75
76
def _set_max_speed(self, overide: int, max_tcp: int) -> None:
    """
    override: Unit [%]
    max_tcp: Unit [mm/s]
    """
    set_max_speed = self.abb.send(rrc.SetMaxSpeed(overide, max_tcp))

Path¤

Source code in src/dcs_dev/abb_rob/path.py
2
3
4
5
6
7
8
class Path:
    """ """
    def __init__(self):
        pass

    def set_frae_list(self):
        pass

data_processing¤



gui¤



utilities¤



visualization¤