diff --git a/include/infinicore/context/context.hpp b/include/infinicore/context/context.hpp index 1612db830..67cd73678 100644 --- a/include/infinicore/context/context.hpp +++ b/include/infinicore/context/context.hpp @@ -29,6 +29,7 @@ void memcpyH2D(void *dst, const void *src, size_t size, bool async = true); void memcpyD2H(void *dst, const void *src, size_t size); void memcpyD2D(void *dst, const void *src, size_t size, bool async = true); void memcpyH2H(void *dst, const void *src, size_t size); +void memcpyD2DPeer(void *dst, int dst_device, const void *src, int src_device, size_t size, bool async = true); // Timing APIs for performance measurement infinirtEvent_t createEvent(); diff --git a/include/infinirt.h b/include/infinirt.h index ba16c19b2..11f0838a6 100644 --- a/include/infinirt.h +++ b/include/infinirt.h @@ -59,6 +59,9 @@ __C __export infiniStatus_t infinirtFreeHost(void *ptr); __C __export infiniStatus_t infinirtMemcpy(void *dst, const void *src, size_t size, infinirtMemcpyKind_t kind); __C __export infiniStatus_t infinirtMemcpyAsync(void *dst, const void *src, size_t size, infinirtMemcpyKind_t kind, infinirtStream_t stream); +__C __export infiniStatus_t infinirtMemcpyPeer(void *dst, int dst_device, const void *src, int src_device, size_t size); +__C __export infiniStatus_t infinirtMemcpyPeerAsync(void *dst, int dst_device, const void *src, int src_device, size_t size, infinirtStream_t stream); + // Stream-ordered memory __C __export infiniStatus_t infinirtMallocAsync(void **p_ptr, size_t size, infinirtStream_t stream); __C __export infiniStatus_t infinirtFreeAsync(void *ptr, infinirtStream_t stream); diff --git a/src/infinicore/context/context_impl.cc b/src/infinicore/context/context_impl.cc index c5ed7acd1..dbd3e8656 100644 --- a/src/infinicore/context/context_impl.cc +++ b/src/infinicore/context/context_impl.cc @@ -145,6 +145,10 @@ void memcpyH2H(void *dst, const void *src, size_t size) { return ContextImpl::singleton().getCpuRuntime()->memcpyD2D(dst, src, size); } +void memcpyD2DPeer(void *dst, int dst_device, const void *src, int src_device, size_t size, bool async) { + return ContextImpl::singleton().getCurrentRuntime()->memcpyD2DPeer(dst, dst_device, src, src_device, size, async); +} + // Timing API implementations infinirtEvent_t createEvent() { return ContextImpl::singleton().getCurrentRuntime()->createEvent(); diff --git a/src/infinicore/context/runtime/runtime.cc b/src/infinicore/context/runtime/runtime.cc index ecacb9207..788920977 100644 --- a/src/infinicore/context/runtime/runtime.cc +++ b/src/infinicore/context/runtime/runtime.cc @@ -96,6 +96,14 @@ void Runtime::memcpyD2D(void *dst, const void *src, size_t size, bool async) { } } +void Runtime::memcpyD2DPeer(void *dst, int dst_device, const void *src, int src_device, size_t size, bool async) { + if (async) { + INFINICORE_CHECK_ERROR(infinirtMemcpyPeerAsync(dst, dst_device, src, src_device, size, stream_)); + } else { + INFINICORE_CHECK_ERROR(infinirtMemcpyPeer(dst, dst_device, src, src_device, size)); + } +} + // Timing method implementations infinirtEvent_t Runtime::createEvent() { infinirtEvent_t event; diff --git a/src/infinicore/context/runtime/runtime.hpp b/src/infinicore/context/runtime/runtime.hpp index c731b7804..8b209421d 100644 --- a/src/infinicore/context/runtime/runtime.hpp +++ b/src/infinicore/context/runtime/runtime.hpp @@ -37,6 +37,7 @@ class Runtime { void memcpyH2D(void *dst, const void *src, size_t size, bool async = true); void memcpyD2H(void *dst, const void *src, size_t size); void memcpyD2D(void *dst, const void *src, size_t size, bool async = true); + void memcpyD2DPeer(void *dst, int dst_device, const void *src, int src_device, size_t size, bool async = true); // Timing methods infinirtEvent_t createEvent(); diff --git a/src/infinicore/tensor/copy.cc b/src/infinicore/tensor/copy.cc index 5ff0e1c61..b23f17b70 100644 --- a/src/infinicore/tensor/copy.cc +++ b/src/infinicore/tensor/copy.cc @@ -21,7 +21,9 @@ void TensorImpl::copy_from(Tensor src) { if (src->shape() != this->shape()) { throw std::runtime_error("Cannot copy from tensor with different shape"); } - if (this->device() == src->device()) { + auto this_device = this->device(); + auto src_device = src->device(); + if (this_device == src_device) { op::rearrange_(Tensor(const_cast(this)->shared_from_this()), src); } else { if (!src->is_contiguous()) { @@ -30,23 +32,39 @@ void TensorImpl::copy_from(Tensor src) { // Use nbytes() to get the actual tensor size, not the full memory size size_t copy_size = std::min(this->nbytes(), src->nbytes()); - if (this->device().getType() == Device::Type::CPU) { - context::setDevice(src->device()); + if (this_device.getType() == src_device.getType()) { + context::setDevice(this_device); + // Same device type, e.g., NVIDIA to NVIDIA, different indices; won't be CPU if (this->is_contiguous()) { - context::memcpyD2H(this->data(), src->data(), copy_size); + context::memcpyD2DPeer(this->data(), this_device.getIndex(), + src->data(), src_device.getIndex(), + copy_size); } else { auto local_src = Tensor::empty(this->shape(), this->dtype(), this->device()); - context::memcpyD2H(local_src->data(), src->data(), this->data_.memory->size()); + context::memcpyD2DPeer(local_src->data(), this_device.getIndex(), + src->data(), src_device.getIndex(), + copy_size); op::rearrange_(Tensor(const_cast(this)->shared_from_this()), local_src); } - } else if (src->device().getType() == Device::Type::CPU) { - context::setDevice(this->device()); - if (this->is_contiguous()) { - context::memcpyH2D(this->data(), src->data(), copy_size); - } else { - auto local_src = Tensor::empty(this->shape(), this->dtype(), this->device()); - context::memcpyH2D(local_src->data(), src->data(), copy_size); - op::rearrange_(Tensor(const_cast(this)->shared_from_this()), local_src); + } else { + if (this_device.getType() == Device::Type::CPU) { + context::setDevice(src_device); + if (this->is_contiguous()) { + context::memcpyD2H(this->data(), src->data(), copy_size); + } else { + auto local_src = Tensor::empty(this->shape(), this->dtype(), this->device()); + context::memcpyD2H(local_src->data(), src->data(), this->data_.memory->size()); + op::rearrange_(Tensor(const_cast(this)->shared_from_this()), local_src); + } + } else if (src_device.getType() == Device::Type::CPU) { + context::setDevice(this_device); + if (this->is_contiguous()) { + context::memcpyH2D(this->data(), src->data(), copy_size); + } else { + auto local_src = Tensor::empty(this->shape(), this->dtype(), this->device()); + context::memcpyH2D(local_src->data(), src->data(), copy_size); + op::rearrange_(Tensor(const_cast(this)->shared_from_this()), local_src); + } } } } diff --git a/src/infinirt/ascend/infinirt_ascend.cc b/src/infinirt/ascend/infinirt_ascend.cc index 4731f086a..8d489ab4d 100644 --- a/src/infinirt/ascend/infinirt_ascend.cc +++ b/src/infinirt/ascend/infinirt_ascend.cc @@ -96,6 +96,9 @@ infiniStatus_t eventDestroy(infinirtEvent_t event) { infiniStatus_t eventElapsedTime(float *ms_ptr, infinirtEvent_t start, infinirtEvent_t end) { return INFINI_STATUS_NOT_IMPLEMENTED; + // Commented before validation + // CHECK_ACLRT(aclrtEventElapsedTime(ms_ptr, (aclrtEvent)start, (aclrtEvent)end)); + // return INFINI_STATUS_SUCCESS; } infiniStatus_t mallocDevice(void **p_ptr, size_t size) { @@ -143,6 +146,30 @@ infiniStatus_t memcpyAsync(void *dst, const void *src, size_t size, infinirtMemc return INFINI_STATUS_SUCCESS; } +infiniStatus_t memcpyPeer(void *dst, int dst_device, const void *src, int src_device, size_t size) { + return INFINI_STATUS_NOT_IMPLEMENTED; + // Commented before validation + // int32_t can_access_peer = 0; + // CHECK_ACLRT(aclrtDeviceCanAccessPeer(&can_access_peer, dst_device, src_device)); + // if (!can_access_peer) { + // CHECK_ACLRT(aclrtDeviceEnablePeerAccess(src_device, 0)); + // } + // CHECK_ACLRT(aclrtMemcpy(dst, size, src, size, ACL_MEMCPY_DEVICE_TO_DEVICE)); + // return INFINI_STATUS_SUCCESS; +} + +infiniStatus_t memcpyPeerAsync(void *dst, int dst_device, const void *src, int src_device, size_t size, infinirtStream_t stream) { + return INFINI_STATUS_NOT_IMPLEMENTED; + // Commented before validation + // int32_t can_access_peer = 0; + // CHECK_ACLRT(aclrtDeviceCanAccessPeer(&can_access_peer, dst_device, src_device)); + // if (!can_access_peer) { + // CHECK_ACLRT(aclrtDeviceEnablePeerAccess(src_device, 0)); + // } + // CHECK_ACLRT(aclrtMemcpyAsync(dst, size, src, size, ACL_MEMCPY_DEVICE_TO_DEVICE, (aclrtStream)stream)); + // return INFINI_STATUS_SUCCESS; +} + infiniStatus_t mallocAsync(void **p_ptr, size_t size, infinirtStream_t stream) { return mallocDevice(p_ptr, size); } diff --git a/src/infinirt/bang/infinirt_bang.cc b/src/infinirt/bang/infinirt_bang.cc index bccbb7c19..70ca7807c 100644 --- a/src/infinirt/bang/infinirt_bang.cc +++ b/src/infinirt/bang/infinirt_bang.cc @@ -52,7 +52,19 @@ infiniStatus_t eventCreate(infinirtEvent_t *event_ptr) { } infiniStatus_t eventCreateWithFlags(infinirtEvent_t *event_ptr, uint32_t flags) { - return INFINI_STATUS_NOT_IMPLEMENTED; + cnrtNotifier_t notifier; + unsigned int cnrt_flags = CNRT_NOTIFIER_DEFAULT; + + if (flags & INFINIRT_EVENT_DISABLE_TIMING) { + cnrt_flags |= CNRT_NOTIFIER_DISABLE_TIMING_ALL; + } + if (flags & INFINIRT_EVENT_BLOCKING_SYNC) { + cnrt_flags |= CNRT_NOTIFIER_DISABLE_TIMING_SW; + } + + CHECK_BANGRT(cnrtNotifierCreateWithFlags(¬ifier, cnrt_flags)); + *event_ptr = notifier; + return INFINI_STATUS_SUCCESS; } infiniStatus_t eventRecord(infinirtEvent_t event, infinirtStream_t stream) { @@ -83,7 +95,8 @@ infiniStatus_t eventDestroy(infinirtEvent_t event) { } infiniStatus_t eventElapsedTime(float *ms_ptr, infinirtEvent_t start, infinirtEvent_t end) { - return INFINI_STATUS_NOT_IMPLEMENTED; + CHECK_BANGRT(cnrtNotifierElapsedTime((cnrtNotifier_t)start, (cnrtNotifier_t)end, ms_ptr)); + return INFINI_STATUS_SUCCESS; } infiniStatus_t mallocDevice(void **p_ptr, size_t size) { @@ -131,6 +144,16 @@ infiniStatus_t memcpyAsync(void *dst, const void *src, size_t size, infinirtMemc return INFINI_STATUS_SUCCESS; } +infiniStatus_t memcpyPeer(void *dst, int dst_device, const void *src, int src_device, size_t size) { + CHECK_BANGRT(cnrtMemcpyPeer(dst, dst_device, (void *)src, src_device, size)); + return INFINI_STATUS_SUCCESS; +} + +infiniStatus_t memcpyPeerAsync(void *dst, int dst_device, const void *src, int src_device, size_t size, infinirtStream_t stream) { + CHECK_BANGRT(cnrtMemcpyPeerAsync(dst, dst_device, (void *)src, src_device, size, (cnrtQueue_t)stream)); + return INFINI_STATUS_SUCCESS; +} + // Does not support async malloc. Use blocking-style malloc instead infiniStatus_t mallocAsync(void **p_ptr, size_t size, infinirtStream_t stream) { CHECK_BANGRT(cnrtMalloc(p_ptr, size)); diff --git a/src/infinirt/cpu/infinirt_cpu.cc b/src/infinirt/cpu/infinirt_cpu.cc index c8709b1d4..fc56a6032 100644 --- a/src/infinirt/cpu/infinirt_cpu.cc +++ b/src/infinirt/cpu/infinirt_cpu.cc @@ -108,6 +108,14 @@ infiniStatus_t memcpyAsync(void *dst, const void *src, size_t size, infinirtMemc return memcpy(dst, src, size, kind); } +infiniStatus_t memcpyPeer(void *dst, int dst_device, const void *src, int src_device, size_t size) { + return INFINI_STATUS_INTERNAL_ERROR; +} + +infiniStatus_t memcpyPeerAsync(void *dst, int dst_device, const void *src, int src_device, size_t size, infinirtStream_t stream) { + return INFINI_STATUS_INTERNAL_ERROR; +} + infiniStatus_t mallocAsync(void **p_ptr, size_t size, infinirtStream_t stream) { return mallocDevice(p_ptr, size); } diff --git a/src/infinirt/cuda/infinirt_cuda.cu b/src/infinirt/cuda/infinirt_cuda.cu index 3e32b113f..0f55f6c85 100644 --- a/src/infinirt/cuda/infinirt_cuda.cu +++ b/src/infinirt/cuda/infinirt_cuda.cu @@ -159,6 +159,16 @@ infiniStatus_t memcpyAsync(void *dst, const void *src, size_t size, infinirtMemc return INFINI_STATUS_SUCCESS; } +infiniStatus_t memcpyPeer(void *dst, int dst_device, const void *src, int src_device, size_t size) { + CHECK_CUDART(cudaMemcpyPeer(dst, dst_device, src, src_device, size)); + return INFINI_STATUS_SUCCESS; +} + +infiniStatus_t memcpyPeerAsync(void *dst, int dst_device, const void *src, int src_device, size_t size, infinirtStream_t stream) { + CHECK_CUDART(cudaMemcpyPeerAsync(dst, dst_device, src, src_device, size, (cudaStream_t)stream)); + return INFINI_STATUS_SUCCESS; +} + infiniStatus_t mallocAsync(void **p_ptr, size_t size, infinirtStream_t stream) { CHECK_CUDART(cudaMallocAsync(p_ptr, size, (cudaStream_t)stream)); return INFINI_STATUS_SUCCESS; diff --git a/src/infinirt/infinirt.cc b/src/infinirt/infinirt.cc index 82bb5ab28..5defa38c0 100644 --- a/src/infinirt/infinirt.cc +++ b/src/infinirt/infinirt.cc @@ -185,6 +185,14 @@ __C infiniStatus_t infinirtMemcpyAsync(void *dst, const void *src, size_t size, INFINIRT_CALL_DEVICE_API(memcpyAsync, (dst, src, size, kind, stream)); } +__C infiniStatus_t infinirtMemcpyPeer(void *dst, int dst_device, const void *src, int src_device, size_t size) { + INFINIRT_CALL_DEVICE_API(memcpyPeer, (dst, dst_device, src, src_device, size)); +} + +__C infiniStatus_t infinirtMemcpyPeerAsync(void *dst, int dst_device, const void *src, int src_device, size_t size, infinirtStream_t stream) { + INFINIRT_CALL_DEVICE_API(memcpyPeerAsync, (dst, dst_device, src, src_device, size, stream)); +} + __C infiniStatus_t infinirtMallocAsync(void **p_ptr, size_t size, infinirtStream_t stream) { INFINIRT_CALL_DEVICE_API(mallocAsync, (p_ptr, size, stream)); } diff --git a/src/infinirt/infinirt_impl.h b/src/infinirt/infinirt_impl.h index 8ae4347c5..3cafd2f2b 100644 --- a/src/infinirt/infinirt_impl.h +++ b/src/infinirt/infinirt_impl.h @@ -3,33 +3,36 @@ #include "infinirt.h" #include -#define INFINIRT_DEVICE_API(INLINE, IMPL, COUNT) \ - INLINE infiniStatus_t getDeviceCount(int *count) COUNT; \ - INLINE infiniStatus_t setDevice(int device_id) IMPL; \ - INLINE infiniStatus_t deviceSynchronize() IMPL; \ - \ - INLINE infiniStatus_t streamCreate(infinirtStream_t *stream_ptr) IMPL; \ - INLINE infiniStatus_t streamDestroy(infinirtStream_t stream) IMPL; \ - INLINE infiniStatus_t streamSynchronize(infinirtStream_t stream) IMPL; \ - INLINE infiniStatus_t streamWaitEvent(infinirtStream_t stream, infinirtEvent_t event) IMPL; \ - \ - INLINE infiniStatus_t eventCreate(infinirtEvent_t *event_ptr) IMPL; \ - INLINE infiniStatus_t eventCreateWithFlags(infinirtEvent_t *event_ptr, uint32_t flags) IMPL; \ - INLINE infiniStatus_t eventRecord(infinirtEvent_t event, infinirtStream_t stream) IMPL; \ - INLINE infiniStatus_t eventQuery(infinirtEvent_t event, infinirtEventStatus_t *status_ptr) IMPL; \ - INLINE infiniStatus_t eventSynchronize(infinirtEvent_t event) IMPL; \ - INLINE infiniStatus_t eventDestroy(infinirtEvent_t event) IMPL; \ - INLINE infiniStatus_t eventElapsedTime(float *ms_ptr, infinirtEvent_t start, infinirtEvent_t end) IMPL; \ - \ - INLINE infiniStatus_t mallocDevice(void **p_ptr, size_t size) IMPL; \ - INLINE infiniStatus_t mallocHost(void **p_ptr, size_t size) IMPL; \ - INLINE infiniStatus_t freeDevice(void *ptr) IMPL; \ - INLINE infiniStatus_t freeHost(void *ptr) IMPL; \ - \ - INLINE infiniStatus_t memcpy(void *dst, const void *src, size_t size, infinirtMemcpyKind_t kind) IMPL; \ - INLINE infiniStatus_t memcpyAsync(void *dst, const void *src, size_t size, infinirtMemcpyKind_t kind, infinirtStream_t stream) IMPL; \ - \ - INLINE infiniStatus_t mallocAsync(void **p_ptr, size_t size, infinirtStream_t stream) IMPL; \ +#define INFINIRT_DEVICE_API(INLINE, IMPL, COUNT) \ + INLINE infiniStatus_t getDeviceCount(int *count) COUNT; \ + INLINE infiniStatus_t setDevice(int device_id) IMPL; \ + INLINE infiniStatus_t deviceSynchronize() IMPL; \ + \ + INLINE infiniStatus_t streamCreate(infinirtStream_t *stream_ptr) IMPL; \ + INLINE infiniStatus_t streamDestroy(infinirtStream_t stream) IMPL; \ + INLINE infiniStatus_t streamSynchronize(infinirtStream_t stream) IMPL; \ + INLINE infiniStatus_t streamWaitEvent(infinirtStream_t stream, infinirtEvent_t event) IMPL; \ + \ + INLINE infiniStatus_t eventCreate(infinirtEvent_t *event_ptr) IMPL; \ + INLINE infiniStatus_t eventCreateWithFlags(infinirtEvent_t *event_ptr, uint32_t flags) IMPL; \ + INLINE infiniStatus_t eventRecord(infinirtEvent_t event, infinirtStream_t stream) IMPL; \ + INLINE infiniStatus_t eventQuery(infinirtEvent_t event, infinirtEventStatus_t *status_ptr) IMPL; \ + INLINE infiniStatus_t eventSynchronize(infinirtEvent_t event) IMPL; \ + INLINE infiniStatus_t eventDestroy(infinirtEvent_t event) IMPL; \ + INLINE infiniStatus_t eventElapsedTime(float *ms_ptr, infinirtEvent_t start, infinirtEvent_t end) IMPL; \ + \ + INLINE infiniStatus_t mallocDevice(void **p_ptr, size_t size) IMPL; \ + INLINE infiniStatus_t mallocHost(void **p_ptr, size_t size) IMPL; \ + INLINE infiniStatus_t freeDevice(void *ptr) IMPL; \ + INLINE infiniStatus_t freeHost(void *ptr) IMPL; \ + \ + INLINE infiniStatus_t memcpy(void *dst, const void *src, size_t size, infinirtMemcpyKind_t kind) IMPL; \ + INLINE infiniStatus_t memcpyAsync(void *dst, const void *src, size_t size, infinirtMemcpyKind_t kind, infinirtStream_t stream) IMPL; \ + \ + INLINE infiniStatus_t memcpyPeer(void *dst, int dst_device, const void *src, int src_device, size_t size) IMPL; \ + INLINE infiniStatus_t memcpyPeerAsync(void *dst, int dst_device, const void *src, int src_device, size_t size, infinirtStream_t stream) IMPL; \ + \ + INLINE infiniStatus_t mallocAsync(void **p_ptr, size_t size, infinirtStream_t stream) IMPL; \ INLINE infiniStatus_t freeAsync(void *ptr, infinirtStream_t stream) IMPL; #define INFINIRT_DEVICE_API_IMPL INFINIRT_DEVICE_API(, , ) diff --git a/src/infinirt/kunlun/infinirt_kunlun.cc b/src/infinirt/kunlun/infinirt_kunlun.cc index f2fe43680..8abaf6a50 100644 --- a/src/infinirt/kunlun/infinirt_kunlun.cc +++ b/src/infinirt/kunlun/infinirt_kunlun.cc @@ -141,6 +141,21 @@ infiniStatus_t memcpyAsync(void *dst, const void *src, size_t size, infinirtMemc } } +infiniStatus_t memcpyPeer(void *dst, int dst_device, const void *src, int src_device, size_t size) { + return INFINI_STATUS_NOT_IMPLEMENTED; + // Commented before validation + // CHECK_KUNLUNRT(xpu_memcpy_peer(dst_device, dst, src_device, src, size)); + // return INFINI_STATUS_SUCCESS; +} + +infiniStatus_t memcpyPeerAsync(void *dst, int dst_device, const void *src, int src_device, size_t size, infinirtStream_t stream) { + return INFINI_STATUS_NOT_IMPLEMENTED; + // Commented before validation + // kunlunxin p series does not support async peer memcpy + // CHECK_KUNLUNRT(xpu_memcpy_peer(dst_device, dst, src_device, src, size)); + // return INFINI_STATUS_SUCCESS; +} + infiniStatus_t mallocAsync(void **p_ptr, size_t size, infinirtStream_t stream) { // kunlun3 does not support async memory allocation // TODO: support async malloc diff --git a/src/infinirt/metax/infinirt_metax.cc b/src/infinirt/metax/infinirt_metax.cc index bd7131e9d..caa3bec07 100644 --- a/src/infinirt/metax/infinirt_metax.cc +++ b/src/infinirt/metax/infinirt_metax.cc @@ -143,6 +143,20 @@ infiniStatus_t memcpyAsync(void *dst, const void *src, size_t size, infinirtMemc return INFINI_STATUS_SUCCESS; } +infiniStatus_t memcpyPeer(void *dst, int dst_device, const void *src, int src_device, size_t size) { + return INFINI_STATUS_NOT_IMPLEMENTED; + // Commented before validation + // CHECK_MACART(hcMemcpyPeer(dst, dst_device, src, src_device, size)); + // return INFINI_STATUS_SUCCESS; +} + +infiniStatus_t memcpyPeerAsync(void *dst, int dst_device, const void *src, int src_device, size_t size, infinirtStream_t stream) { + return INFINI_STATUS_NOT_IMPLEMENTED; + // Commented before validation + // CHECK_MACART(hcMemcpyPeerAsync(dst, dst_device, src, src_device, size, (hcStream_t)stream)); + // return INFINI_STATUS_SUCCESS; +} + infiniStatus_t mallocAsync(void **p_ptr, size_t size, infinirtStream_t stream) { CHECK_MACART(hcMallocAsync(p_ptr, size, (hcStream_t)stream)); return INFINI_STATUS_SUCCESS; diff --git a/src/infinirt/moore/infinirt_moore.cc b/src/infinirt/moore/infinirt_moore.cc index b1861c0f2..5f10b046f 100644 --- a/src/infinirt/moore/infinirt_moore.cc +++ b/src/infinirt/moore/infinirt_moore.cc @@ -51,7 +51,21 @@ infiniStatus_t eventCreate(infinirtEvent_t *event_ptr) { } infiniStatus_t eventCreateWithFlags(infinirtEvent_t *event_ptr, uint32_t flags) { - return INFINI_STATUS_NOT_IMPLEMENTED; + musaEvent_t event; + unsigned int musa_flags = musaEventDefault; + + // 映射 InfiniCore 的 flags 到 HC Runtime flags + if (flags & INFINIRT_EVENT_DISABLE_TIMING) { + musa_flags |= musaEventDisableTiming; + } + if (flags & INFINIRT_EVENT_BLOCKING_SYNC) { + musa_flags |= musaEventBlockingSync; + } + + CHECK_MUSART(musaEventCreateWithFlags(&event, musa_flags)); + + *event_ptr = event; + return INFINI_STATUS_SUCCESS; } infiniStatus_t eventRecord(infinirtEvent_t event, infinirtStream_t stream) { @@ -131,6 +145,16 @@ infiniStatus_t memcpyAsync(void *dst, const void *src, size_t size, infinirtMemc return INFINI_STATUS_SUCCESS; } +infiniStatus_t memcpyPeer(void *dst, int dst_device, const void *src, int src_device, size_t size) { + CHECK_MUSART(musaMemcpyPeer(dst, dst_device, src, src_device, size)); + return INFINI_STATUS_SUCCESS; +} + +infiniStatus_t memcpyPeerAsync(void *dst, int dst_device, const void *src, int src_device, size_t size, infinirtStream_t stream) { + CHECK_MUSART(musaMemcpyPeerAsync(dst, dst_device, src, src_device, size, (musaStream_t)stream)); + return INFINI_STATUS_SUCCESS; +} + infiniStatus_t mallocAsync(void **p_ptr, size_t size, infinirtStream_t stream) { return mallocDevice(p_ptr, size); } diff --git a/test/infinicore/test.py b/test/infinicore/test.py index 36aeffe4e..aa0bd1542 100644 --- a/test/infinicore/test.py +++ b/test/infinicore/test.py @@ -265,6 +265,44 @@ def func6_initialize_device_relationship(): z_infini.debug() +def test7_p2p(): + print("测试P2P功能") + if infinicore.get_device_count("cuda") < 2: + print("跳过p2p测试,原因:当前cuda设备小于2个") + return + + dev0 = infinicore.device("cuda", 0) + dev1 = infinicore.device("cuda", 1) + + shape = [1024, 1024] + dtype = infinicore.float16 + + torch_0 = torch.ones(shape, dtype=torch.float16, device="cuda:0") + torch_1 = torch.zeros(shape, dtype=torch.float16, device="cuda:1") + torch_1_ref = torch.ones(shape, dtype=torch.float16, device="cuda:1") + + infinicore_0 = infinicore.from_blob( + torch_0.data_ptr(), + shape, + dtype=dtype, + device=dev0, + ) + + infinicore_1 = infinicore.from_blob( + torch_1.data_ptr(), + shape, + dtype=dtype, + device=dev1, + ) + + assert not torch.allclose(torch_1, torch_1_ref) + + infinicore_1.copy_(infinicore_0) + + assert torch.allclose(torch_1, torch_1_ref) + print("P2P test passed!") + + if __name__ == "__main__": test() test2() @@ -272,3 +310,4 @@ def func6_initialize_device_relationship(): test4_to() test5_bf16() func6_initialize_device_relationship() + test7_p2p()