YR Direct Tensor Transport¶
Last updated: 04/20/2026
OpenYuanrong (YR) direct transport enables efficient zero-copy transfer of both CPU and NPU tensors between Ray actors.
Features¶
- Zero-copy transfer: Efficient memory sharing via OpenYuanrong DataSystem
- Dual device support: Works with both CPU and NPU tensors
- One-sided communication: Pull-based model for efficient transfers
- Automatic garbage collection: Cleans up resources when no longer needed
Prerequisites¶
Before using YR transport, ensure you have:
- Installed ray-ascend with YR support:
pip install "ray-ascend[yr]" - Installed dscli:
pip install "openyuanrong-datasystem>=0.8.0"
Ray Cluster Setup¶
# Head node
ray start --head --resources='{"NPU": 8}'
# Worker nodes
ray start --address <head_ip>:6379 --resources='{"NPU": 8}'
Quick Start¶
import torch
import ray
from ray_ascend import register_yr_tensor_transport
ray.init()
# Initialize YR backend (uses default metastore mode, no env vars needed)
register_yr_tensor_transport(["npu", "cpu"])
@ray.remote(resources={"NPU": 1})
class NPUActor:
def __init__(self):
register_yr_tensor_transport(["npu", "cpu"])
@ray.method(tensor_transport="YR")
def get_tensor(self):
import torch
import torch_npu
return torch.randn(1024, 1024, device="npu")
# Create actors and transfer tensor
sender = NPUActor.remote()
receiver = NPUActor.remote()
tensor = ray.get(sender.get_tensor.remote())
print("Tensor shape:", tensor.shape)
Initialization Modes¶
YR backend supports two initialization modes with different metadata storage backends,
controlled by YR_DS_INIT_MODE environment variable. For details, see
OpenYuanrong DataSystem Cluster Management.
| Mode | Description | Use Case |
|---|---|---|
metastore |
Built-in metastore service via YR DS worker | Default, no external infra |
etcd |
External etcd for metadata storage | Existing etcd infrastructure |
Environment Variables¶
Set these in driver node only. YR backend coordinator will propagate configuration to all worker nodes via Ray placement group.
| Variable | Default | Description | Mode |
|---|---|---|---|
YR_DS_INIT_MODE |
metastore |
Initialization mode (metastore or etcd) |
Both |
YR_DS_WORKER_PORT |
31501 |
YR DS worker port | Both |
YR_DS_METASTORE_PORT |
2379 |
Metastore service port | metastore |
YR_DS_ETCD_ADDRESS |
- | Etcd address (e.g., 10.0.0.1:2379) |
etcd |
YR_DS_WORKER_ARGS |
- | Additional dscli arguments | Both |
Worker Args Options for NPU:
--shared_memory_size_mb: Shared memory size in MB for tensor storage--remote_h2d_device_ids: Enable RH2D for cross-node transfer (comma-separated NPU IDs)--enable_huge_tlb: Enable huge page memory (required for >21GB on 910B)
For more YR DS worker arguments, see dscli documentation.
Set in Python (before ray.init()):
import os
os.environ["YR_DS_WORKER_ARGS"] = "--shared_memory_size_mb 16384 --remote_h2d_device_ids 0,1,2,3 --enable_huge_tlb true"
import ray
ray.init() # env vars must be set before ray.init()
Set in Bash:
export YR_DS_WORKER_ARGS="--shared_memory_size_mb 16384 --remote_h2d_device_ids 0,1,2,3 --enable_huge_tlb true"
python your_script.py
Metastore Mode Example¶
Metastore mode is the default and recommended for most use cases. No external etcd service required - a YR DS worker automatically starts the metastore service.
import os
import ray
from ray_ascend import register_yr_tensor_transport
os.environ["YR_DS_INIT_MODE"] = "metastore"
os.environ["YR_DS_WORKER_PORT"] = 31501
os.environ["YR_DS_METASTORE_PORT"] = 2379
os.environ["YR_DS_WORKER_ARGS"] = "--shared_memory_size_mb 16384 --remote_h2d_device_ids 0,1,2,3"
ray.init()
register_yr_tensor_transport(["npu", "cpu"])
Etcd Mode Example¶
Etcd mode requires an external etcd service. Ensure etcd is running and accessible before starting YR backend. For etcd installation, see Installation.
import os
import ray
from ray_ascend import register_yr_tensor_transport
os.environ["YR_DS_INIT_MODE"] = "etcd"
os.environ["YR_DS_WORKER_PORT"] = 31501
os.environ["YR_DS_ETCD_ADDRESS"] = "10.0.0.1:2379"
os.environ["YR_DS_WORKER_ARGS"] = "--shared_memory_size_mb 16384 --remote_h2d_device_ids 0,1,2,3"
ray.init()
register_yr_tensor_transport(["npu", "cpu"])
Detailed Example¶
This example demonstrates full YR transport usage with NPU and CPU actors, including tensor transfer via HCCS (NPU) and RDMA (CPU if available).
import os
import ray
from ray_ascend import register_yr_tensor_transport
ray.init()
register_yr_tensor_transport(["npu", "cpu"])
@ray.remote(resources={"NPU": 1})
class NPUActor:
def __init__(self):
register_yr_tensor_transport(["npu", "cpu"])
@ray.method(tensor_transport="YR")
def get_npu_tensor(self):
"""Return an NPU tensor via YR transport."""
import torch
import torch_npu
return torch.randn(1024, 1024, device="npu")
@ray.method(tensor_transport="YR")
def process_npu_tensor(self, tensor):
"""Receive an NPU tensor via YR transport and process it."""
return tensor.sum().item()
@ray.remote
class CPUActor:
def __init__(self):
register_yr_tensor_transport(["cpu"])
@ray.method(tensor_transport="YR")
def get_cpu_tensor(self):
"""Return a CPU tensor via YR transport."""
import torch
return torch.randn(1024, 1024)
# Create actors
npu_sender = NPUActor.remote()
npu_receiver = NPUActor.remote()
cpu_worker = CPUActor.remote()
# Transfer NPU tensor via HCCS
npu_tensor = ray.get(npu_sender.get_npu_tensor.remote())
result = ray.get(npu_receiver.process_npu_tensor.remote(npu_tensor))
print("NPU tensor sum:", result)
# Transfer CPU tensor via RDMA (if available)
cpu_tensor = ray.get(cpu_worker.get_cpu_tensor.remote())
print("CPU tensor shape:", cpu_tensor.shape)
ray.shutdown()
Combined HCCL + YR Transport¶
For advanced use cases combining HCCL collective communication with YR direct transport:
import os
import ray
from ray.util.collective import create_collective_group
from ray_ascend import register_yr_tensor_transport
from ray_ascend.collective import HCCLGroup
ray.init(address="auto")
# Register both backends
ray.register_collective_backend("HCCL", HCCLGroup)
register_yr_tensor_transport(["npu", "cpu"])
@ray.remote(resources={"NPU": 1})
class RayActor:
def __init__(self):
import torch
import torch_npu
register_yr_tensor_transport(["npu", "cpu"])
@ray.method(tensor_transport="YR")
def random_tensor(self):
"""Return an NPU tensor via YR transport."""
import torch
return torch.zeros(1024, device="npu")
def sum(self, tensor):
"""Process a received tensor."""
return tensor.sum()
# Create actors
sender, receiver = RayActor.remote(), RayActor.remote()
# Create HCCL collective group
group = create_collective_group([sender, receiver], backend="HCCL")
# Use YR transport for tensor transfer
tensor = sender.random_tensor.remote()
result = receiver.sum.remote(tensor)
print(ray.get(result))
Decorator Usage¶
Use @ray.method(tensor_transport="YR") on actor methods that return or receive tensors
to be transported via YR.
Single Device Type Constraint¶
All tensors in one RDT object must have the same device type (all CPU or all NPU).
Cleanup¶
Use cleanup_yr_resources() for explicit cleanup, or ray stop will automatically
destroy YR backend: