diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b3ec7d5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,220 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[codz] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py.cover +*.lcov +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +# Pipfile.lock + +# UV +# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# uv.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +# poetry.lock +# poetry.toml + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +# pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python. +# https://pdm-project.org/en/latest/usage/project/#working-with-version-control +# pdm.lock +# pdm.toml +.pdm-python +.pdm-build/ + +# pixi +# Similar to Pipfile.lock, it is generally recommended to include pixi.lock in version control. +# pixi.lock +# Pixi creates a virtual environment in the .pixi directory, just like venv module creates one +# in the .venv directory. It is recommended not to include this directory in version control. +.pixi/* +!.pixi/config.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule* +celerybeat.pid + +# Redis +*.rdb +*.aof +*.pid + +# RabbitMQ +mnesia/ +rabbitmq/ +rabbitmq-data/ + +# ActiveMQ +activemq-data/ + +# SageMath parsed files +*.sage.py + +# Environments +.env +.envrc +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +# .idea/ + +# Abstra +# Abstra is an AI-powered process automation framework. +# Ignore directories containing user credentials, local state, and settings. +# Learn more at https://abstra.io/docs +.abstra/ + +# Visual Studio Code +# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore +# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore +# and can be added to the global gitignore or merged into this file. However, if you prefer, +# you could uncomment the following to ignore the entire vscode folder +# .vscode/ +# Temporary file for partial code execution +tempCodeRunnerFile.py + +# Ruff stuff: +.ruff_cache/ + +# PyPI configuration file +.pypirc + +# Marimo +marimo/_static/ +marimo/_lsp/ +__marimo__/ + +# Streamlit +.streamlit/secrets.toml diff --git a/README.md b/README.md index cae61ce..00c02cd 100644 --- a/README.md +++ b/README.md @@ -1,41 +1,85 @@ # dynamixel-controller -dynamixel-controller is a Python library designed from the ground up to work with any Dynamixel motor on the market with few to no modifications. +Python library for controlling Dynamixel motors with JSON-defined control tables and a simple per-motor API. + +**This repository is a fork of [UGA-BSAIL/dynamixel-controller](https://github.com/UGA-BSAIL/dynamixel-controller)** by Hunter Halloran and the [University of Georgia Bio-Sensing and Instrumentation Lab](https://github.com/UGA-BSAIL). The original project is licensed under [Apache 2.0](LICENSE). This fork extends it with Protocol 2 group I/O (sync/bulk read and write), additional motor control tables, and related API updates. See [docs.md](docs.md) for the full API reference. ## Installation -Use the package manager [pip](https://pip.pypa.io/en/stable/) to install dynamixel-controller. +### From this repository (recommended for this fork) + +Clone and install in editable mode so local changes are picked up immediately: + +```bash +git clone https://github.com/smpdl/dynamixel-controller.git +cd dynamixel-controller +pip install -e . +``` + +Or install directly from GitHub without cloning: + +```bash +pip install git+https://github.com/smpdl/dynamixel-controller.git +``` + +**Requirements:** Python 3, [pyserial](https://pypi.org/project/pyserial/), and [deprecation](https://pypi.org/project/deprecation/) (installed automatically). + +### Original package on PyPI + +The upstream project is also published as `dynamixel-controller`: ```bash pip install dynamixel-controller ``` +That release does not include the fork-specific features below (group I/O, newer control tables). Use the install options above for this codebase. + ## Usage -#### Import: + +### Import + ```python -from dynio import * +from dynio import DynamixelIO, DynamixelMotor, DynamixelCommError, control_table_path, dxl ``` -#### Port handling: +`dxl` is an alias for `dynio.dynamixel_controller` if you prefer the original import style. + +### Open the bus + +```python +dxl_io = DynamixelIO('/dev/ttyUSB0', 57600) # port and baud rate for U2D2 or other adapter +``` + +On Windows, use a COM port (for example `COM3`) instead of `/dev/ttyUSB0`. + +### Create motors + +Pre-defined helpers load bundled control-table JSON from `dynio/DynamixelJSON/`: + ```python -dxl_io = dxl.DynamixelIO('portname') # your port for U2D2 or other serial device +ax_12 = dxl_io.new_ax12(1) # AX-12, Protocol 1 +mx_64_1 = dxl_io.new_mx64(2, protocol=1) # MX-64, Protocol 1 +mx_64_2 = dxl_io.new_mx64(3, protocol=2) # MX-64, Protocol 2 + +# Protocol 2 models added in this fork +xc330 = dxl_io.new_xc330m288t(4) +xl430 = dxl_io.new_xl430w250t(5) +xm430 = dxl_io.new_xm430w350t(6) +xm540 = dxl_io.new_xm540w270t(7) ``` -#### Pre-made motor declarations: -See [documentation](https://github.com/UGA-BSAIL/dynamixel-controller/blob/moreJSON/docs.md) for full list of motors. -See below for sample: +Custom models can use any JSON control table: + ```python -ax_12 = dxl_io.new_ax12(1) # AX-12 protocol 1 with ID 1 -mx_64_1 = dxl_io.new_mx64(2, 1) # MX-64 protocol 1 with ID 2 -mx_64_2 = dxl_io.new_mx64(3, 2) # MX-64 protocol 2 with ID 3 +motor = dxl_io.new_motor(7, control_table_path('MyMotor.json'), protocol=2) ``` -#### Motor Control: -See [documentation](https://github.com/UGA-BSAIL/dynamixel-controller/blob/moreJSON/docs.md) for full list of functions -and their usage. +See [docs.md](docs.md) for all `new_*` helpers and motor APIs. + +### Per-motor control + +Common operations are methods on `DynamixelMotor`: -The most prevalent functions are pre-implemented as methods for the motor objects. -These include: ```python motor.torque_enable() motor.torque_disable() @@ -44,23 +88,71 @@ motor.set_velocity_mode() motor.set_velocity(velocity) motor.set_position_mode() motor.set_position(position) -motor.set_angle(angle) +motor.set_angle(angle) motor.set_extended_position_mode() -motor.set_position(position) position = motor.get_position() angle = motor.get_angle() current = motor.get_current() ``` -All other values can be easily read from or written to using their [control table](http://emanual.robotis.com/) name. Example: + +Any control-table field can be read or written by name (see [ROBOTIS e-Manual](http://emanual.robotis.com/) for register definitions): + ```python -motor.write_control_table("LED", 1) # Turns the LED on -speed = motor.read_control_table("Present_Speed") # Returns present velocity +motor.write_control_table("LED", 1) +speed = motor.read_control_table("Present_Speed") ``` +### Group I/O (Protocol 2, this fork) + +Read or write the same register on multiple motors in one bus transaction with **sync** operations, or mix different registers per motor with **bulk** operations. All motors in a group transaction must use the same protocol (typically Protocol 2). + +```python +m1 = dxl_io.new_xm430w350t(1) +m2 = dxl_io.new_xm430w350t(2) +m3 = dxl_io.new_xm430w350t(3) + +# Sync read: one register, many motors +positions = dxl_io.sync_read([m1, m2, m3], "Present_Position") +# {1: 2048, 2: 1024, 3: 3072} + +# Sync write: one register, many motors +dxl_io.sync_write( + {m1: 2048, m2: 1024, m3: 3072}, + register_name="Goal_Position", +) + +# Bulk read: different registers per motor in one transaction +readings = dxl_io.bulk_read([ + (m1, "Present_Position"), + (m2, "Present_Current"), + (m3, "Hardware_Error_Status"), +]) + +# Bulk write: mixed registers (Protocol 2 only) +dxl_io.bulk_write([ + (m1, "Goal_Position", 2048), + (m2, "Goal_Position", 1024), + (m3, "LED", 1), +]) +``` + +Bus failures raise `DynamixelCommError` (for example port not open or a failed group transaction). + +## Documentation + +Full API documentation: [docs.md](docs.md) + +## Acknowledgments + +- **Original library:** [UGA-BSAIL/dynamixel-controller](https://github.com/UGA-BSAIL/dynamixel-controller) — Hunter Halloran (Jyumpp), University of Georgia Bio-Sensing and Instrumentation Lab. Copyright 2020 UGA BSAIL, Apache License 2.0. +- **Dynamixel SDK:** vendored from [ROBOTIS-GIT/DynamixelSDK](https://github.com/ROBOTIS-GIT/DynamixelSDK) (see [NOTICE.md](NOTICE.md)). + ## Contributing + Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change. -Especially encouraged is new control tables to be published as part of the package. +Especially encouraged: new control-table JSON files under `dynio/DynamixelJSON/`. ## License -[Apache 2.0](https://choosealicense.com/licenses/apache-2.0/) \ No newline at end of file + +[Apache 2.0](https://choosealicense.com/licenses/apache-2.0/) — see [LICENSE](LICENSE). diff --git a/docs.md b/docs.md index 5a4554e..3ba2218 100644 --- a/docs.md +++ b/docs.md @@ -1,10 +1,24 @@ # dynio +Public exports from `dynio`: + +- `DynamixelIO`, `DynamixelMotor` — motor bus and per-motor API +- `DynamixelCommError` — raised when group transactions or connection checks fail +- `control_table_path` — resolve bundled control-table JSON paths +- `dxl` — alias for `dynio.dynamixel_controller` + # dynio.dynamixel_controller +## control_table_path +```python +control_table_path(filename) +``` +Returns the filesystem path to a control-table JSON file bundled in `dynio/DynamixelJSON/`. Raises `FileNotFoundError` if the file is missing. + + ## DynamixelIO ```python DynamixelIO(self, device_name='/dev/ttyUSB0', baud_rate=57600) @@ -62,6 +76,117 @@ DynamixelIO.new_mx106(dxl_id, protocol=1, control_table_protocol=None) ``` Returns a new DynamixelMotor object for an MX106 +### new_xc330m288t +```python +DynamixelIO.new_xc330m288t(dxl_id, protocol=2, control_table_protocol=None) +``` +Returns a new DynamixelMotor object for an XC330-M288-T (Protocol 2) + +### new_xl430w250t +```python +DynamixelIO.new_xl430w250t(dxl_id, protocol=2, control_table_protocol=None) +``` +Returns a new DynamixelMotor object for an XL430-W250-T (Protocol 2) + +### new_xm430w350t +```python +DynamixelIO.new_xm430w350t(dxl_id, protocol=2, control_table_protocol=None) +``` +Returns a new DynamixelMotor object for an XM430-W350-T (Protocol 2) + +### new_xm540w270t +```python +DynamixelIO.new_xm540w270t(dxl_id, protocol=2, control_table_protocol=None) +``` +Returns a new DynamixelMotor object for an XM540-W270-T (Protocol 2) + +### sync_read +```python +DynamixelIO.sync_read(motors, register_name, protocol=None) +``` +Reads the same register from multiple motors in one Sync Read transaction (Protocol 2 only). All motors must share the same protocol and register layout. Returns a dict mapping {`dxl_id`: raw register value}. Raises `DynamixelCommError` on bus failure. + +Example usage: +```python +from dynio import DynamixelIO + +dxl_io = DynamixelIO('/dev/ttyUSB0', 57600) +m1 = dxl_io.new_xm430w350t(1) +m2 = dxl_io.new_xm430w350t(2) +m3 = dxl_io.new_xm430w350t(3) + +# One transaction reads the same register from every motor (Protocol 2 only) +positions = dxl_io.sync_read([m1, m2, m3], "Present_Position") +# {1: 2048, 2: 1024, 3: 3072} — raw register values keyed by motor ID +``` + +### sync_write +```python +DynamixelIO.sync_write(writes, register_name=None, protocol=None) +``` +Writes the same register on multiple motors in one Sync Write transaction. `writes` maps each `DynamixelMotor` to an integer value, or to `(register_name, value)` when `register_name` is omitted. Raises `DynamixelCommError` on bus failure. + +Example usage: +```python +# All motors, same register — pass values keyed by motor object +dxl_io.sync_write( + {m1: 2048, m2: 1024, m3: 3072}, + register_name="Goal_Position", +) + +# Or omit register_name and give (register_name, value) per motor +dxl_io.sync_write({ + m1: ("Goal_Position", 2048), + m2: ("Goal_Position", 1024), + m3: ("Goal_Position", 3072), +}) +``` + +### bulk_read +```python +DynamixelIO.bulk_read(specs, protocol=None) +``` +Reads registers via Fast Bulk Read; each motor may use a different address and length. Each entry in `specs` is `(motor, register_name)` or `(motor, address, size)`. Returns a dict mapping `dxl_id` → raw register value. Raises `DynamixelCommError` on bus failure. + +Example usage: +```python +# Each motor can read a different register in one transaction +readings = dxl_io.bulk_read([ + (m1, "Present_Position"), + (m2, "Present_Current"), + (m3, "Hardware_Error_Status"), +]) +# {1: 2048, 2: 42, 3: 0} + +# Raw address/size when you already know the control-table layout +readings = dxl_io.bulk_read([ + (m1, 132, 4), # Present_Position on XM430-W350-T + (m2, 126, 2), # Present_Current +]) +``` + +### bulk_write +```python +DynamixelIO.bulk_write(specs, protocol=None) +``` +Writes registers via Bulk Write (Protocol 2 only). Each entry is `(motor, register_name, value)` or `(motor, address, size, value)`. Raises `DynamixelCommError` on bus failure. + +Example usage: +```python +# Mixed registers across motors in one transaction (Protocol 2 only) +dxl_io.bulk_write([ + (m1, "Goal_Position", 2048), + (m2, "Goal_Position", 1024), + (m3, "LED", 1), +]) + +# Raw address/size form +dxl_io.bulk_write([ + (m1, 116, 4, 2048), # Goal_Position on XM430-W350-T + (m2, 116, 4, 1024), +]) +``` + ### new_ax12_1 ```python DynamixelIO.new_ax12_1(*args, **kwargs) @@ -200,7 +325,7 @@ Returns the motor position as an angle in degrees ```python DynamixelMotor.get_current() ``` -Returns the current motor load +Returns the current motor load. On Protocol 2, reads `Present_Current` when available, otherwise `Present_Load` (e.g. XL430-W250-T). ### torque_enable ```python @@ -213,3 +338,43 @@ Enables motor torque DynamixelMotor.torque_disable() ``` Disables motor torque + +### get_model_number +```python +DynamixelMotor.get_model_number() +``` +Returns the model number of the motor + +### get_id +```python +DynamixelMotor.get_id() +``` +Returns the ID of the motor + +### get_baud_rate +```python +DynamixelMotor.get_baud_rate() +``` +Returns the baud rate of the motor + +### get_present_temperature +```python +DynamixelMotor.get_present_temperature() +``` +Returns the present temperature of the motor + +### hardware_error_status +```python +DynamixelMotor.hardware_error_status() +``` +Returns the hardware error status of the motor + + +# dynio.group_io + + +## DynamixelCommError +```python +DynamixelCommError(message, comm_result=None) +``` +Exception raised when a group transaction fails on the bus or when `DynamixelIO` is used without an open serial port. The `comm_result` attribute holds the SDK communication result code when available. diff --git a/dynio/DynamixelJSON/XC330M288T.json b/dynio/DynamixelJSON/XC330M288T.json new file mode 100644 index 0000000..a78c36d --- /dev/null +++ b/dynio/DynamixelJSON/XC330M288T.json @@ -0,0 +1,77 @@ +{ + "Protocol_2": { + "Control_Table": { + "Model_Number": [0, 2], + "Model_Information": [2, 4], + "Firmware_Version": [6, 1], + "ID": [7, 1], + "Baud_Rate": [8, 1], + "Return_Delay_Time": [9, 1], + "Drive_Mode": [10, 1], + "Operating_Mode": [11, 1], + "Secondary_Shadow_ID": [12, 1], + "Protocol_Type": [13, 1], + "Homing_Offset": [20, 4], + "Moving_Threshold": [24, 4], + "Temperature_Limit": [31, 1], + "Max_Voltage_Limit": [32, 2], + "Min_Voltage_Limit": [34, 2], + "PWM_Limit": [36, 2], + "Current_Limit": [38, 2], + "Velocity_Limit": [44, 4], + "Max_Position_Limit": [48, 4], + "Min_Position_Limit": [52, 4], + "Startup_Configuration": [60, 1], + "PWM_Slope": [62, 1], + "Shutdown": [63, 1], + "Torque_Enable": [64, 1], + "LED": [65, 1], + "Status_Return_Level": [68, 1], + "Registered_Instruction": [69, 1], + "Hardware_Error_Status": [70, 1], + "Velocity_I_Gain": [76, 2], + "Velocity_P_Gain": [78, 2], + "Position_D_Gain": [80, 2], + "Position_I_Gain": [82, 2], + "Position_P_Gain": [84, 2], + "Feedforward_2nd_Gain": [88, 2], + "Feedforward_1st_Gain": [90, 2], + "Bus_Watchdog": [98, 1], + "Goal_PWM": [100, 2], + "Goal_Current": [102, 2], + "Goal_Velocity": [104, 4], + "Profile_Acceleration": [108, 4], + "Profile_Velocity": [112, 4], + "Goal_Position": [116, 4], + "Realtime_Tick": [120, 2], + "Moving": [122, 1], + "Moving_Status": [123, 1], + "Present_PWM": [124, 2], + "Present_Current": [126, 2], + "Present_Velocity": [128, 4], + "Present_Position": [132, 4], + "Velocity_Trajectory": [136, 4], + "Position_Trajectory": [140, 4], + "Present_Input_Voltage": [144, 2], + "Present_Temperature": [146, 1], + "Backup_Ready": [147, 1], + "Indirect_Address_1": [168, 2], + "Indirect_Address_2": [170, 2], + "Indirect_Address_3": [172, 2], + "Indirect_Address_26": [218, 2], + "Indirect_Address_27": [220, 2], + "Indirect_Address_28": [222, 2], + "Indirect_Data_1": [224, 1], + "Indirect_Data_2": [225, 1], + "Indirect_Data_3": [226, 1], + "Indirect_Data_26": [249, 1], + "Indirect_Data_27": [250, 1], + "Indirect_Data_28": [251, 1] + }, + "Values": { + "Min_Position": 0, + "Max_Position": 4095, + "Max_Angle": 360 + } + } +} \ No newline at end of file diff --git a/dynio/DynamixelJSON/XL430W250T.json b/dynio/DynamixelJSON/XL430W250T.json new file mode 100644 index 0000000..2dea37b --- /dev/null +++ b/dynio/DynamixelJSON/XL430W250T.json @@ -0,0 +1,86 @@ +{ + "Protocol_2": { + "Control_Table": { + "Model_Number": [0, 2], + "Model_Information": [2, 4], + "Firmware_Version": [6, 1], + "ID": [7, 1], + "Baud_Rate": [8, 1], + "Return_Delay_Time": [9, 1], + "Drive_Mode": [10, 1], + "Operating_Mode": [11, 1], + "Secondary_Shadow_ID": [12, 1], + "Protocol_Type": [13, 1], + "Homing_Offset": [20, 4], + "Moving_Threshold": [24, 4], + "Temperature_Limit": [31, 1], + "Max_Voltage_Limit": [32, 2], + "Min_Voltage_Limit": [34, 2], + "PWM_Limit": [36, 2], + "Velocity_Limit": [44, 4], + "Max_Position_Limit": [48, 4], + "Min_Position_Limit": [52, 4], + "Startup_Configuration": [60, 1], + "Shutdown": [63, 1], + "Torque_Enable": [64, 1], + "LED": [65, 1], + "Status_Return_Level": [68, 1], + "Registered_Instruction": [69, 1], + "Hardware_Error_Status": [70, 1], + "Velocity_I_Gain": [76, 2], + "Velocity_P_Gain": [78, 2], + "Position_D_Gain": [80, 2], + "Position_I_Gain": [82, 2], + "Position_P_Gain": [84, 2], + "Feedforward_2nd_Gain": [88, 2], + "Feedforward_1st_Gain": [90, 2], + "Bus_Watchdog": [98, 1], + "Goal_PWM": [100, 2], + "Goal_Velocity": [104, 4], + "Profile_Acceleration": [108, 4], + "Profile_Velocity": [112, 4], + "Goal_Position": [116, 4], + "Realtime_Tick": [120, 2], + "Moving": [122, 1], + "Moving_Status": [123, 1], + "Present_PWM": [124, 2], + "Present_Load": [126, 2], + "Present_Velocity": [128, 4], + "Present_Position": [132, 4], + "Velocity_Trajectory": [136, 4], + "Position_Trajectory": [140, 4], + "Present_Input_Voltage": [144, 2], + "Present_Temperature": [146, 1], + "Backup_Ready": [147, 1], + "Indirect_Address_1": [168, 2], + "Indirect_Address_2": [170, 2], + "Indirect_Address_3": [172, 2], + "Indirect_Address_26": [218, 2], + "Indirect_Address_27": [220, 2], + "Indirect_Address_28": [222, 2], + "Indirect_Data_1": [224, 1], + "Indirect_Data_2": [225, 1], + "Indirect_Data_3": [226, 1], + "Indirect_Data_26": [249, 1], + "Indirect_Data_27": [250, 1], + "Indirect_Data_28": [251, 1], + "Indirect_Address_29": [578, 2], + "Indirect_Address_30": [580, 2], + "Indirect_Address_31": [582, 2], + "Indirect_Address_54": [628, 2], + "Indirect_Address_55": [630, 2], + "Indirect_Address_56": [632, 2], + "Indirect_Data_29": [634, 1], + "Indirect_Data_30": [635, 1], + "Indirect_Data_31": [636, 1], + "Indirect_Data_54": [659, 1], + "Indirect_Data_55": [660, 1], + "Indirect_Data_56": [661, 1] + }, + "Values": { + "Min_Position": 0, + "Max_Position": 4095, + "Max_Angle": 360 + } + } +} \ No newline at end of file diff --git a/dynio/DynamixelJSON/XM430W350T.json b/dynio/DynamixelJSON/XM430W350T.json new file mode 100644 index 0000000..cf374c7 --- /dev/null +++ b/dynio/DynamixelJSON/XM430W350T.json @@ -0,0 +1,88 @@ +{ + "Protocol_2": { + "Control_Table": { + "Model_Number": [0, 2], + "Model_Information": [2, 4], + "Firmware_Version": [6, 1], + "ID": [7, 1], + "Baud_Rate": [8, 1], + "Return_Delay_Time": [9, 1], + "Drive_Mode": [10, 1], + "Operating_Mode": [11, 1], + "Secondary_Shadow_ID": [12, 1], + "Protocol_Type": [13, 1], + "Homing_Offset": [20, 4], + "Moving_Threshold": [24, 4], + "Temperature_Limit": [31, 1], + "Max_Voltage_Limit": [32, 2], + "Min_Voltage_Limit": [34, 2], + "PWM_Limit": [36, 2], + "Current_Limit": [38, 2], + "Velocity_Limit": [44, 4], + "Max_Position_Limit": [48, 4], + "Min_Position_Limit": [52, 4], + "Startup_Configuration": [60, 1], + "Shutdown": [63, 1], + "Torque_Enable": [64, 1], + "LED": [65, 1], + "Status_Return_Level": [68, 1], + "Registered_Instruction": [69, 1], + "Hardware_Error_Status": [70, 1], + "Velocity_I_Gain": [76, 2], + "Velocity_P_Gain": [78, 2], + "Position_D_Gain": [80, 2], + "Position_I_Gain": [82, 2], + "Position_P_Gain": [84, 2], + "Feedforward_2nd_Gain": [88, 2], + "Feedforward_1st_Gain": [90, 2], + "Bus_Watchdog": [98, 1], + "Goal_PWM": [100, 2], + "Goal_Current": [102, 2], + "Goal_Velocity": [104, 4], + "Profile_Acceleration": [108, 4], + "Profile_Velocity": [112, 4], + "Goal_Position": [116, 4], + "Realtime_Tick": [120, 2], + "Moving": [122, 1], + "Moving_Status": [123, 1], + "Present_PWM": [124, 2], + "Present_Current": [126, 2], + "Present_Velocity": [128, 4], + "Present_Position": [132, 4], + "Velocity_Trajectory": [136, 4], + "Position_Trajectory": [140, 4], + "Present_Input_Voltage": [144, 2], + "Present_Temperature": [146, 1], + "Backup_Ready": [147, 1], + "Indirect_Address_1": [168, 2], + "Indirect_Address_2": [170, 2], + "Indirect_Address_3": [172, 2], + "Indirect_Address_26": [218, 2], + "Indirect_Address_27": [220, 2], + "Indirect_Address_28": [222, 2], + "Indirect_Data_1": [224, 1], + "Indirect_Data_2": [225, 1], + "Indirect_Data_3": [226, 1], + "Indirect_Data_26": [249, 1], + "Indirect_Data_27": [250, 1], + "Indirect_Data_28": [251, 1], + "Indirect_Address_29": [578, 2], + "Indirect_Address_30": [580, 2], + "Indirect_Address_31": [582, 2], + "Indirect_Address_54": [628, 2], + "Indirect_Address_55": [630, 2], + "Indirect_Address_56": [632, 2], + "Indirect_Data_29": [634, 1], + "Indirect_Data_30": [635, 1], + "Indirect_Data_31": [636, 1], + "Indirect_Data_54": [659, 1], + "Indirect_Data_55": [660, 1], + "Indirect_Data_56": [661, 1] + }, + "Values": { + "Min_Position": 0, + "Max_Position": 4095, + "Max_Angle": 360 + } + } +} \ No newline at end of file diff --git a/dynio/DynamixelJSON/XM540W270.json b/dynio/DynamixelJSON/XM540W270.json index ff344ac..458e1d0 100644 --- a/dynio/DynamixelJSON/XM540W270.json +++ b/dynio/DynamixelJSON/XM540W270.json @@ -62,7 +62,12 @@ "External_Port_Data_3": [156, 2], "Indirect_Address_1": [168, 2], "Indirect_Address_2": [170, 2], - "Indirect_Address_3": [172, 2], + "Indirect_Address_3": [172, 2] + }, + "Values": { + "Min_Position": 0, + "Max_Position": 4095, + "Max_Angle": 360 } } } diff --git a/dynio/__init__.py b/dynio/__init__.py index 060cef5..ebab739 100644 --- a/dynio/__init__.py +++ b/dynio/__init__.py @@ -17,3 +17,17 @@ # Author: Hunter Halloran (Jyumpp) import dynio.dynamixel_controller as dxl +from dynio.dynamixel_controller import ( + DynamixelIO, + DynamixelMotor, + control_table_path, +) +from dynio.group_io import DynamixelCommError + +__all__ = [ + "DynamixelIO", + "DynamixelMotor", + "DynamixelCommError", + "control_table_path", + "dxl", +] diff --git a/dynio/dynamixel_controller.py b/dynio/dynamixel_controller.py index a7b22d4..a0f3ad3 100644 --- a/dynio/dynamixel_controller.py +++ b/dynio/dynamixel_controller.py @@ -18,9 +18,26 @@ from dynamixel_sdk import * import json +import os import pkg_resources from deprecation import deprecated +from dynio.group_io import ( + DynamixelCommError, + encode_register_value, + normalize_motor_list, + normalize_sync_write_targets, + parse_bulk_read_specs, + parse_bulk_write_specs, + resolve_sync_register, +) + +def control_table_path(filename): + """Resolve a control-table JSON file bundled with this package.""" + path = pkg_resources.resource_filename(__name__, f"DynamixelJSON/{filename}") + if not os.path.isfile(path): + raise FileNotFoundError(f"Control table JSON not found: {path}") + return path class DynamixelIO: """Creates communication handler for Dynamixel motors""" @@ -29,6 +46,7 @@ def __init__(self, device_name='/dev/ttyUSB0', baud_rate=57600): if device_name is None: + self._sync_read_groups = {} return self.port_handler = PortHandler(device_name) self.packet_handler = [PacketHandler(1), PacketHandler(2)] @@ -38,6 +56,17 @@ def __init__(self, if not self.port_handler.openPort(): raise (NameError("PortOpenError")) + self._sync_read_groups = {} + + def _require_connected(self): + if not getattr(self, "port_handler", None): + raise DynamixelCommError("DynamixelIO is not connected to a serial port.") + + def _raise_on_comm_failure(self, protocol, comm_result): + if comm_result != COMM_SUCCESS: + handler = self.packet_handler[protocol - 1] + raise DynamixelCommError(handler.getTxRxResult(comm_result), comm_result=comm_result) + def __check_error(self, protocol, dxl_comm_result, dxl_error): """Prints the error message when not successful""" if dxl_comm_result != COMM_SUCCESS: @@ -117,6 +146,156 @@ def new_mx106(self, dxl_id, protocol=1, control_table_protocol=None): pkg_resources.resource_filename(__name__, "DynamixelJSON/MX106.json"), protocol=protocol, control_table_protocol=control_table_protocol) + def new_xc330m288t(self, dxl_id, protocol=2, control_table_protocol=None): + """Returns a new DynamixelMotor object for an XC330-M288-T (Protocol 2).""" + return self.new_motor( + dxl_id, pkg_resources.resource_filename(__name__, "DynamixelJSON/XC330M288T.json"), protocol, control_table_protocol + ) + + def new_xl430w250t(self, dxl_id, protocol=2, control_table_protocol=None): + """Returns a new DynamixelMotor object for an XL430-W250-T (Protocol 2).""" + return self.new_motor( + dxl_id, pkg_resources.resource_filename(__name__, "DynamixelJSON/XL430W250T.json"), protocol, control_table_protocol + ) + + def new_xm430w350t(self, dxl_id, protocol=2, control_table_protocol=None): + """Returns a new DynamixelMotor object for an XM430-W350-T (Protocol 2).""" + return self.new_motor( + dxl_id, pkg_resources.resource_filename(__name__, "DynamixelJSON/XM430W350T.json"), protocol, control_table_protocol + ) + + def new_xm540w270t(self, dxl_id, protocol=2, control_table_protocol=None): + """Returns a new DynamixelMotor object for an XM540-W270-T (Protocol 2).""" + return self.new_motor( + dxl_id, pkg_resources.resource_filename(__name__, "DynamixelJSON/XM540W270.json"), protocol, control_table_protocol + ) + + def sync_read(self, motors, register_name, protocol=None): + """Read the same register from multiple motors in one Sync Read (Protocol 2). + + Returns a dict mapping dxl_id -> raw register value. + """ + self._require_connected() + motors = normalize_motor_list(motors) + resolved_protocol, address, size = resolve_sync_register(motors, register_name) + if protocol is not None and protocol != resolved_protocol: + raise ValueError( + f"Requested protocol {protocol} does not match motors (use {resolved_protocol})." + ) + protocol = resolved_protocol + if protocol != 2: + raise ValueError("sync_read requires Protocol 2 motors.") + + motor_ids = tuple(motor.dxl_id for motor in motors) + cache_key = (protocol, address, size, motor_ids) + group = self._sync_read_groups.get(cache_key) + if group is None: + handler = self.packet_handler[protocol - 1] + group = GroupSyncRead(self.port_handler, handler, address, size) + self._sync_read_groups[cache_key] = group + + group.clearParam() + for motor in motors: + group.addParam(motor.dxl_id) + + comm_result = group.txRxPacket() + self._raise_on_comm_failure(protocol, comm_result) + + return { + motor.dxl_id: group.getData(motor.dxl_id, address, size) for motor in motors + } + + def sync_write(self, writes, register_name=None, protocol=None): + """Write the same register on multiple motors in one Sync Write. + + ``writes`` maps each :class:`DynamixelMotor` to an integer value, or to + ``(register_name, value)`` when ``register_name`` is omitted. + """ + self._require_connected() + motors, values, register_name = normalize_sync_write_targets(writes, register_name) + resolved_protocol, address, size = resolve_sync_register(motors, register_name) + if protocol is not None and protocol != resolved_protocol: + raise ValueError( + f"Requested protocol {protocol} does not match motors (use {resolved_protocol})." + ) + protocol = resolved_protocol + handler = self.packet_handler[protocol - 1] + + group = GroupSyncWrite(self.port_handler, handler, address, size) + for motor, value in zip(motors, values): + if not group.addParam(motor.dxl_id, encode_register_value(value, size)): + raise DynamixelCommError( + f"Failed to add sync write param for motor id={motor.dxl_id}." + ) + + comm_result = group.txPacket() + self._raise_on_comm_failure(protocol, comm_result) + + def bulk_read(self, specs, protocol=None): + """Read registers via Fast Bulk Read; each motor may use a different address/length. + + Each entry in ``specs`` is ``(motor, register_name)`` or ``(motor, address, size)``. + Returns a dict mapping dxl_id -> raw register value. + """ + self._require_connected() + parsed = parse_bulk_read_specs(specs) + if protocol is None: + protocol = parsed[0][0].PROTOCOL + for motor, address, size in parsed: + if motor.PROTOCOL != protocol: + raise ValueError( + f"All motors must use protocol {protocol} (id={motor.dxl_id} uses " + f"{motor.PROTOCOL})." + ) + + handler = self.packet_handler[protocol - 1] + group = GroupBulkRead(self.port_handler, handler) + read_targets: list[tuple[int, int, int]] = [] + + for motor, address, size in parsed: + if not group.addParam(motor.dxl_id, address, size): + raise DynamixelCommError( + f"Failed to add bulk read param for motor id={motor.dxl_id}." + ) + read_targets.append((motor.dxl_id, address, size)) + + comm_result = group.txRxPacket() + self._raise_on_comm_failure(protocol, comm_result) + + return { + dxl_id: group.getData(dxl_id, address, size) + for dxl_id, address, size in read_targets + } + + def bulk_write(self, specs, protocol=None): + """Write registers via Bulk Write; each motor may use a different address/length. + + Each entry is ``(motor, register_name, value)`` or ``(motor, address, size, value)``. + """ + self._require_connected() + parsed = parse_bulk_write_specs(specs) + if protocol is None: + protocol = parsed[0][0].PROTOCOL + if protocol != 2: + raise ValueError("bulk_write requires Protocol 2.") + + for motor, _, _ in parsed: + if motor.PROTOCOL != protocol: + raise ValueError( + f"All motors must use protocol {protocol} (id={motor.dxl_id} uses " + f"{motor.PROTOCOL})." + ) + + handler = self.packet_handler[protocol - 1] + group = GroupBulkWrite(self.port_handler, handler) + for motor, address, size, data in parsed: + if not group.addParam(motor.dxl_id, address, size, data): + raise DynamixelCommError( + f"Failed to add bulk write param for motor id={motor.dxl_id}." + ) + + comm_result = group.txPacket() + self._raise_on_comm_failure(protocol, comm_result) # the following functions are deprecated and will be removed in version 1.0 release. They have been restructured # to continue to function for the time being, but are the result of an older system of JSON config files which # initially stored less information about each motor, causing a different initialization function to be needed @@ -305,7 +484,12 @@ def get_current(self): current *= -1 return current elif self.CONTROL_TABLE_PROTOCOL == 2: - return self.read_control_table("Present_Current") + table = self.CONTROL_TABLE + if "Present_Current" in table: + return self.read_control_table("Present_Current") + if "Present_Load" in table: + # XL430-W250-T reports load at address 126 (0.1% units, signed). + return self.read_control_table("Present_Load") def torque_enable(self): """Enables motor torque""" @@ -314,3 +498,23 @@ def torque_enable(self): def torque_disable(self): """Disables motor torque""" self.write_control_table("Torque_Enable", 0) + + def get_model_number(self): + """Returns the model number of the motor""" + return self.read_control_table("Model_Number") + + def get_id(self): + """Returns the ID of the motor""" + return self.read_control_table("ID") + + def get_baud_rate(self): + """Returns the baud rate of the motor""" + return self.read_control_table("Baud_Rate") + + def get_present_temperature(self): + """Returns the present temperature of the motor""" + return self.read_control_table("Present_Temperature") + + def hardware_error_status(self): + """Returns the hardware error status of the motor""" + return self.read_control_table("Hardware_Error_Status") \ No newline at end of file diff --git a/dynio/group_io.py b/dynio/group_io.py new file mode 100644 index 0000000..9d1f3d8 --- /dev/null +++ b/dynio/group_io.py @@ -0,0 +1,180 @@ +"""Helpers for Dynamixel group (sync/bulk) communication.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Iterable, Union + +from dynamixel_sdk import ( + DXL_HIBYTE, + DXL_HIWORD, + DXL_LOBYTE, + DXL_LOWORD, +) + +if TYPE_CHECKING: + from dynio.dynamixel_controller import DynamixelMotor + +BulkReadSpec = Union[ + "DynamixelMotor", + tuple["DynamixelMotor", str], + tuple["DynamixelMotor", int, int], +] + +BulkWriteSpec = Union[ + tuple["DynamixelMotor", str, int], + tuple["DynamixelMotor", int, int, int], +] + +WriteTarget = Union["DynamixelMotor", int] + + +class DynamixelCommError(Exception): + """Raised when a group transaction fails on the bus.""" + + def __init__(self, message: str, comm_result: int | None = None): + super().__init__(message) + self.comm_result = comm_result + + +def motor_register(motor: DynamixelMotor, register_name: str) -> tuple[int, int]: + """Return (address, size) for a named control-table entry.""" + table = motor.CONTROL_TABLE + if register_name not in table: + raise ValueError( + f"Register '{register_name}' is not in the control table for motor id={motor.dxl_id}." + ) + address, size = table[register_name] + return int(address), int(size) + + +def encode_register_value(value: int, size: int) -> list[int]: + """Encode an integer as little-endian register bytes for sync/bulk write.""" + value = int(value) + # just get the lower 8 bits for a 1-byte register + if size == 1: + return [value & 0xFF] + if size == 2: + return [DXL_LOBYTE(value), DXL_HIBYTE(value)] + if size == 4: + return [ + DXL_LOBYTE(DXL_LOWORD(value)), + DXL_HIBYTE(DXL_LOWORD(value)), + DXL_LOBYTE(DXL_HIWORD(value)), + DXL_HIBYTE(DXL_HIWORD(value)), + ] + raise ValueError(f"Unsupported register size: {size}") + + +def normalize_motor_list(motors: Iterable[DynamixelMotor]) -> list[DynamixelMotor]: + """Normalize a list of motors to a list of DynamixelMotor objects.""" + motors = list(motors) + if not motors: + raise ValueError("At least one motor is required.") + return motors + + +def resolve_sync_register( + motors: list[DynamixelMotor], register_name: str +) -> tuple[int, int, int]: + """Validate motors share protocol and register layout; return (protocol, address, size).""" + protocol = motors[0].PROTOCOL + address, size = motor_register(motors[0], register_name) + for motor in motors[1:]: + if motor.PROTOCOL != protocol: + raise ValueError( + f"All motors must use the same bus protocol (id={motors[0].dxl_id} uses " + f"{protocol}, id={motor.dxl_id} uses {motor.PROTOCOL})." + ) + other_address, other_size = motor_register(motor, register_name) + if (other_address, other_size) != (address, size): + raise ValueError( + f"Register '{register_name}' must share the same address and size for sync " + f"operations (id={motors[0].dxl_id}: {address}/{size}, " + f"id={motor.dxl_id}: {other_address}/{other_size})." + ) + return protocol, address, size + + +def parse_bulk_read_specs( + specs: Iterable[BulkReadSpec], +) -> list[tuple[DynamixelMotor, int, int]]: + parsed: list[tuple[DynamixelMotor, int, int]] = [] + for spec in specs: + if isinstance(spec, tuple): + if len(spec) == 2: + motor, register_name = spec + parsed.append((motor, *motor_register(motor, register_name))) + elif len(spec) == 3: + motor, address, size = spec + parsed.append((motor, int(address), int(size))) + else: + raise ValueError(f"Invalid bulk read spec: {spec!r}") + else: + raise ValueError( + "Bulk read requires (motor, register_name) or (motor, address, size) specs." + ) + if not parsed: + raise ValueError("At least one bulk read spec is required.") + return parsed + + +def parse_bulk_write_specs( + specs: Iterable[BulkWriteSpec], +) -> list[tuple[DynamixelMotor, int, int, list[int]]]: + parsed: list[tuple[DynamixelMotor, int, int, list[int]]] = [] + for spec in specs: + if len(spec) == 3: + motor, register_name, value = spec + address, size = motor_register(motor, register_name) + parsed.append((motor, address, size, encode_register_value(value, size))) + elif len(spec) == 4: + motor, address, size, value = spec + size = int(size) + parsed.append( + (motor, int(address), size, encode_register_value(value, size)) + ) + else: + raise ValueError(f"Invalid bulk write spec: {spec!r}") + if not parsed: + raise ValueError("At least one bulk write spec is required.") + return parsed + + +def normalize_sync_write_targets( + writes: dict[WriteTarget, Any], + register_name: str | None, +) -> tuple[list[DynamixelMotor], list[int], str | None]: + """Return (motors, values, register_name) from a sync_write mapping.""" + if not writes: + raise ValueError("writes must not be empty.") + + motors: list[DynamixelMotor] = [] + values: list[int] = [] + resolved_register: str | None = register_name + + for target, payload in writes.items(): + motor = target if hasattr(target, "CONTROL_TABLE") else None + if motor is None: + raise ValueError("sync_write keys must be DynamixelMotor instances.") + + if isinstance(payload, tuple) and len(payload) == 2: + reg_name, value = payload + value = int(value) + if resolved_register is None: + resolved_register = reg_name + elif resolved_register != reg_name: + raise ValueError("All sync_write entries must use the same register.") + else: + if resolved_register is None and register_name is None: + raise ValueError( + "register_name is required when write values are plain integers." + ) + value = int(payload) + + motors.append(motor) + values.append(value) + + if resolved_register is None: + raise ValueError("register_name could not be determined from writes.") + + return motors, values, resolved_register