fileDescriptor = control.openFile(file, true);
+
+ MemorySegment bufferWrite = LibaioContext.newAlignedBuffer(4096, 4096);
+
+ try {
+ for (int i = 0; i < 4096; i++) {
+ bufferWrite.asByteBuffer().put((byte) 'B');
+ }
+
+ for (int j = 0; j < LIBAIO_QUEUE_SIZE * 2; j++) {
+ for (int i = 0; i < LIBAIO_QUEUE_SIZE; i++) {
+ TestInfo countClass = new TestInfo();
+ fileDescriptor.write(i * 4096, 4096, bufferWrite.asByteBuffer(), countClass);
+ }
+
+ Assert.assertEquals(LIBAIO_QUEUE_SIZE, control.poll(callbacks, LIBAIO_QUEUE_SIZE, LIBAIO_QUEUE_SIZE));
+
+ for (int i = 0; i < LIBAIO_QUEUE_SIZE; i++) {
+ Assert.assertNotNull(callbacks[i]);
+ callbacks[i] = null;
+ }
+ }
+
+ TestInfo.checkLeaks();
+ } finally {
+ LibaioContext.freeBuffer(bufferWrite);
+ }
+ }
+
+ @Test
+ public void testLock() throws Exception {
+ File file = temporaryFolder.newFile("test.bin");
+
+ LibaioFile fileDescriptor = control.openFile(file, true);
+ fileDescriptor.lock();
+
+ fileDescriptor.close();
+ }
+
+ @Test
+ public void testAlloc() throws Exception {
+ File file = temporaryFolder.newFile("test.bin");
+
+ LibaioFile fileDescriptor = control.openFile(file, true);
+ fileDescriptor.fill(fileDescriptor.getBlockSize(), 10 * 1024 * 1024);
+
+ fileDescriptor.close();
+ }
+
+ @Test
+ public void testReleaseNullBuffer() throws Exception {
+ boolean failed = false;
+ try {
+ LibaioContext.freeBuffer(null);
+ } catch (Exception expected) {
+ failed = true;
+ }
+
+ Assert.assertTrue("Exception happened!", failed);
+
+ }
+
+ @Test
+ public void testMemset() throws Exception {
+
+ MemorySegment memorySegment = LibaioContext.newAlignedBuffer(4096 * 8, 4096);
+ ByteBuffer buffer = memorySegment.asByteBuffer();
+
+ for (int i = 0; i < buffer.capacity(); i++) {
+ buffer.put((byte) 'z');
+ }
+
+ buffer.position(0);
+
+ for (int i = 0; i < buffer.capacity(); i++) {
+ Assert.assertEquals((byte) 'z', buffer.get());
+ }
+
+ control.memsetBuffer(buffer);
+
+ buffer.position(0);
+
+ for (int i = 0; i < buffer.capacity(); i++) {
+ Assert.assertEquals((byte) 0, buffer.get());
+ }
+
+ LibaioContext.freeBuffer(memorySegment);
+
+ }
+
+ @Test
+ @Ignore
+ public void testIOExceptionConditions() throws Exception {
+ boolean exceptionThrown = false;
+
+ control.close();
+ control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, false, true);
+ try {
+ // There is no space for a queue this huge, the native layer should throw the exception
+ LibaioContext newController = new LibaioContext(Integer.MAX_VALUE, false, true);
+ } catch (RuntimeException e) {
+ exceptionThrown = true;
+ }
+
+ Assert.assertTrue(exceptionThrown);
+ exceptionThrown = false;
+
+ try {
+ // this should throw an exception, we shouldn't be able to open a directory!
+ control.openFile(temporaryFolder.getRoot(), true);
+ } catch (IOException expected) {
+ exceptionThrown = true;
+ }
+
+ Assert.assertTrue(exceptionThrown);
+
+ exceptionThrown = false;
+
+ LibaioFile fileDescriptor = control.openFile(temporaryFolder.newFile(), true);
+ fileDescriptor.close();
+ try {
+ fileDescriptor.close();
+ } catch (IOException expected) {
+ exceptionThrown = true;
+ }
+
+ Assert.assertTrue(exceptionThrown);
+
+ fileDescriptor = control.openFile(temporaryFolder.newFile(), true);
+
+ MemorySegment memorySegment = fileDescriptor.newBuffer(4096);
+ ByteBuffer buffer = memorySegment.asByteBuffer();
+
+ try {
+ for (int i = 0; i < 4096; i++) {
+ buffer.put((byte) 'a');
+ }
+
+ for (int i = 0; i < LIBAIO_QUEUE_SIZE; i++) {
+ fileDescriptor.write(i * 4096, 4096, buffer, new TestInfo());
+ }
+
+ boolean ex = false;
+ try {
+ fileDescriptor.write(0, 4096, buffer, new TestInfo());
+ } catch (Exception e) {
+ ex = true;
+ }
+
+ Assert.assertTrue(ex);
+
+ TestInfo[] callbacks = new TestInfo[LIBAIO_QUEUE_SIZE];
+ Assert.assertEquals(LIBAIO_QUEUE_SIZE, control.poll(callbacks, LIBAIO_QUEUE_SIZE, LIBAIO_QUEUE_SIZE));
+
+ // it should be possible to write now after queue space being released
+ fileDescriptor.write(0, 4096, buffer, new TestInfo());
+ Assert.assertEquals(1, control.poll(callbacks, 1, 100));
+
+ TestInfo errorCallback = new TestInfo();
+ // odd positions will have failures through O_DIRECT
+ fileDescriptor.read(3, 4096, buffer, errorCallback);
+ Assert.assertEquals(1, control.poll(callbacks, 1, 50));
+ Assert.assertTrue(callbacks[0].isError());
+ Assert.assertSame(errorCallback, (callbacks[0]));
+
+ // to help GC and the checkLeaks
+ callbacks = null;
+ errorCallback = null;
+
+ TestInfo.checkLeaks();
+
+ exceptionThrown = false;
+ try {
+ LibaioContext.newAlignedBuffer(300, 4096);
+ } catch (RuntimeException e) {
+ exceptionThrown = true;
+ }
+
+ Assert.assertTrue(exceptionThrown);
+
+ exceptionThrown = false;
+ try {
+ LibaioContext.newAlignedBuffer(-4096, 4096);
+ } catch (RuntimeException e) {
+ exceptionThrown = true;
+ }
+
+ Assert.assertTrue(exceptionThrown);
+ } finally {
+ LibaioContext.freeBuffer(memorySegment);
+ }
+ }
+
+ @Test
+ public void testBlockedCallback() throws Exception {
+ final LibaioContext blockedContext = new LibaioContext(LIBAIO_QUEUE_SIZE, true, true);
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ blockedContext.poll();
+ }
+ };
+
+ t.start();
+
+ int NUMBER_OF_BLOCKS = LIBAIO_QUEUE_SIZE * 10;
+
+ final CountDownLatch latch = new CountDownLatch(NUMBER_OF_BLOCKS);
+
+ File file = temporaryFolder.newFile("sub-file.txt");
+ LibaioFile aioFile = blockedContext.openFile(file, true);
+ aioFile.fill(aioFile.getBlockSize(), NUMBER_OF_BLOCKS * 4096);
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ class MyCallback implements SubmitInfo {
+
+ @Override
+ public void onError(int errno, String message) {
+ errors.incrementAndGet();
+ }
+
+ @Override
+ public void done() {
+ latch.countDown();
+ }
+ }
+
+ MyCallback callback = new MyCallback();
+
+ MemorySegment memorySegment = LibaioContext.newAlignedBuffer(4096, 4096);
+ ByteBuffer buffer = memorySegment.asByteBuffer();
+
+ for (int i = 0; i < 4096; i++) {
+ buffer.put((byte) 'a');
+ }
+
+ long start = System.currentTimeMillis();
+
+ for (int i = 0; i < NUMBER_OF_BLOCKS; i++) {
+ aioFile.write(i * 4096, 4096, buffer, callback);
+ }
+
+ long end = System.currentTimeMillis();
+
+ latch.await();
+
+ logger.debug("time = " + (end - start) + " writes/second=" + NUMBER_OF_BLOCKS * 1000L / (end - start));
+
+ blockedContext.close();
+ t.join();
+ }
+
+ private void fillupFile(File file, int blocks) throws IOException {
+ FileOutputStream fileOutputStream = new FileOutputStream(file);
+ byte[] bufferWrite = new byte[4096];
+ for (int i = 0; i < 4096; i++) {
+ bufferWrite[i] = (byte) 0;
+ }
+
+ for (int i = 0; i < blocks; i++) {
+ fileOutputStream.write(bufferWrite);
+ }
+
+ fileOutputStream.close();
+ }
+
+ static class TestInfo implements SubmitInfo {
+
+ static final Cleaner cleaner;
+
+ static {
+ Cleaner tempCleaner;
+ try {
+ tempCleaner = Cleaner.create();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ tempCleaner = null;
+ }
+ cleaner = tempCleaner;
+ }
+
+ static AtomicInteger count = new AtomicInteger();
+
+ public static void checkLeaks() throws InterruptedException {
+ for (int i = 0; count.get() != 0 && i < 50; i++) {
+ WeakReference reference = new WeakReference(new Object());
+ while (reference.get() != null) {
+ System.gc();
+ Thread.sleep(100);
+ }
+ }
+ Assert.assertEquals(0, count.get());
+ }
+
+ boolean error = false;
+ String errorMessage;
+ int errno;
+
+ TestInfo() {
+ count.incrementAndGet();
+ cleaner.register(this, count::decrementAndGet);
+
+ }
+
+ @Override
+ public void onError(int errno, String message) {
+ this.errno = errno;
+ this.errorMessage = message;
+ this.error = true;
+ }
+
+ @Override
+ public void done() {
+ }
+
+ public int getErrno() {
+ return errno;
+ }
+
+ public void setErrno(int errno) {
+ this.errno = errno;
+ }
+
+ public boolean isError() {
+ return error;
+ }
+
+ public void setError(boolean error) {
+ this.error = error;
+ }
+
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+
+ public void setErrorMessage(String errorMessage) {
+ this.errorMessage = errorMessage;
+ }
+ }
+}
diff --git a/artemis-ffm/src/test/java/org/apache/artemis/nativo/jlibaio/test/LoadedTest.java b/artemis-ffm/src/test/java/org/apache/artemis/nativo/jlibaio/test/LoadedTest.java
new file mode 100644
index 00000000000..c531fd9a858
--- /dev/null
+++ b/artemis-ffm/src/test/java/org/apache/artemis/nativo/jlibaio/test/LoadedTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.artemis.nativo.jlibaio.test;
+
+import org.apache.artemis.nativo.jlibaio.LibaioContext;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+public class LoadedTest {
+
+ private static final String OS = System.getProperty("os.name").toLowerCase();
+ private static final boolean IS_LINUX = OS.startsWith("linux");
+
+ @Test
+ public void testValidateIsLoaded() {
+ Assume.assumeTrue(IS_LINUX);
+ Assert.assertTrue(LibaioContext.isLoaded());
+ }
+
+}
diff --git a/artemis-ffm/src/test/java/org/apache/artemis/nativo/jlibaio/test/OpenCloseContextTest.java b/artemis-ffm/src/test/java/org/apache/artemis/nativo/jlibaio/test/OpenCloseContextTest.java
new file mode 100644
index 00000000000..88a28a36233
--- /dev/null
+++ b/artemis-ffm/src/test/java/org/apache/artemis/nativo/jlibaio/test/OpenCloseContextTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.artemis.nativo.jlibaio.test;
+
+import org.apache.artemis.nativo.jlibaio.LibaioContext;
+import org.apache.artemis.nativo.jlibaio.LibaioFile;
+import org.apache.artemis.nativo.jlibaio.SubmitInfo;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.lang.foreign.MemorySegment;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class OpenCloseContextTest {
+
+ Logger logger = LoggerFactory.getLogger(OpenCloseContextTest.class);
+
+ @BeforeClass
+ public static void testAIO() {
+ Assume.assumeTrue(LibaioContext.isLoaded());
+ }
+
+ @Rule
+ public TemporaryFolder folder;
+
+ public OpenCloseContextTest() {
+ folder = new TemporaryFolder(new File("./target"));
+ }
+
+ @Test
+ public void testRepeatOpenCloseContext() throws Exception {
+ MemorySegment memorySegment = LibaioContext.newAlignedBuffer(512, 512);
+ ByteBuffer buffer = memorySegment.asByteBuffer();
+ for (int i = 0; i < 512; i++) {
+ buffer.put((byte) 'x');
+ }
+
+ for (int i = 0; i < 10; i++) {
+ logger.debug("#test " + i);
+ final LibaioContext control = new LibaioContext<>(5, true, true);
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ control.poll();
+ }
+ };
+ t.start();
+ LibaioFile file = control.openFile(folder.newFile(), true);
+ file.fill(file.getBlockSize(), 4 * 1024);
+ final CountDownLatch insideMethod = new CountDownLatch(1);
+ final CountDownLatch awaitInside = new CountDownLatch(1);
+ file.write(0, 512, buffer, new SubmitInfo() {
+ @Override
+ public void onError(int errno, String message) {
+
+ }
+
+ @Override
+ public void done() {
+ insideMethod.countDown();
+ try {
+ awaitInside.await();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ logger.debug("done");
+ }
+ });
+
+ insideMethod.await();
+
+ file.write(512, 512, buffer, new SubmitInfo() {
+ @Override
+ public void onError(int errno, String message) {
+ }
+
+ @Override
+ public void done() {
+ }
+ });
+
+ awaitInside.countDown();
+ control.close();
+
+ t.join();
+ }
+
+ }
+
+ @Test
+ public void testRepeatOpenCloseContext2() throws Exception {
+ MemorySegment memorySegment = LibaioContext.newAlignedBuffer(512, 512);
+ ByteBuffer buffer = memorySegment.asByteBuffer();
+ for (int i = 0; i < 512; i++) {
+ buffer.put((byte) 'x');
+ }
+
+ for (int i = 0; i < 10; i++) {
+ logger.debug("#test " + i);
+ final LibaioContext control = new LibaioContext<>(5, true, true);
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ control.poll();
+ }
+ };
+ t.start();
+ LibaioFile file = control.openFile(folder.newFile(), true);
+ file.fill(file.getBlockSize(), 4 * 1024);
+ final CountDownLatch insideMethod = new CountDownLatch(1);
+ final CountDownLatch awaitInside = new CountDownLatch(1);
+ file.write(0, 512, buffer, new SubmitInfo() {
+ @Override
+ public void onError(int errno, String message) {
+
+ }
+
+ @Override
+ public void done() {
+ insideMethod.countDown();
+ try {
+ awaitInside.await(100, TimeUnit.MILLISECONDS);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ logger.debug("done");
+ }
+ });
+
+ insideMethod.await();
+
+ file.write(512, 512, buffer, new SubmitInfo() {
+ @Override
+ public void onError(int errno, String message) {
+ }
+
+ @Override
+ public void done() {
+ }
+ });
+
+ awaitInside.countDown();
+
+ control.close();
+
+ t.join();
+ }
+
+ }
+
+ @Test
+ public void testCloseAndStart() throws Exception {
+ final LibaioContext control2 = new LibaioContext<>(5, true, true);
+
+ final LibaioContext control = new LibaioContext<>(5, true, true);
+ control.close();
+ control.poll();
+
+ control2.close();
+ control2.poll();
+ }
+
+}
diff --git a/artemis-ffm/src/test/java/org/apache/artemis/nativo/jlibaio/test/ReusableLatch.java b/artemis-ffm/src/test/java/org/apache/artemis/nativo/jlibaio/test/ReusableLatch.java
new file mode 100644
index 00000000000..9d88e6189b1
--- /dev/null
+++ b/artemis-ffm/src/test/java/org/apache/artemis/nativo/jlibaio/test/ReusableLatch.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.artemis.nativo.jlibaio.test;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+/**
+ * This class will use the framework provided to by AbstractQueuedSynchronizer.
+ * AbstractQueuedSynchronizer is the framework for any sort of concurrent synchronization, such as Semaphores, events, etc, based on AtomicIntegers.
+ *
+ * This class works just like CountDownLatch, with the difference you can also increase the counter
+ *
+ * It could be used for sync points when one process is feeding the latch while another will wait when everything is done. (e.g. waiting IO completions to finish)
+ *
+ * On ActiveMQ Artemis we have the requirement of increment and decrement a counter until the user fires a ready event (commit). At that point we just act as a regular countDown.
+ *
+ * Note: This latch is reusable. Once it reaches zero, you can call up again, and reuse it on further waits.
+ *
+ * For example: prepareTransaction will wait for the current completions, and further adds will be called on the latch. Later on when commit is called you can reuse the same latch.
+ */
+public class ReusableLatch {
+
+ /**
+ * Look at the doc and examples provided by AbstractQueuedSynchronizer for more information
+ *
+ * @see AbstractQueuedSynchronizer
+ */
+ @SuppressWarnings("serial")
+ private static class CountSync extends AbstractQueuedSynchronizer {
+
+ private CountSync(int count) {
+ setState(count);
+ }
+
+ public int getCount() {
+ return getState();
+ }
+
+ public void setCount(final int count) {
+ setState(count);
+ }
+
+ @Override
+ public int tryAcquireShared(final int numberOfAqcquires) {
+ return getState() == 0 ? 1 : -1;
+ }
+
+ public void add() {
+ for (; ; ) {
+ int actualState = getState();
+ int newState = actualState + 1;
+ if (compareAndSetState(actualState, newState)) {
+ return;
+ }
+ }
+ }
+
+ @Override
+ public boolean tryReleaseShared(final int numberOfReleases) {
+ for (; ; ) {
+ int actualState = getState();
+ if (actualState == 0) {
+ return true;
+ }
+
+ int newState = actualState - numberOfReleases;
+
+ if (newState < 0) {
+ newState = 0;
+ }
+
+ if (compareAndSetState(actualState, newState)) {
+ return newState == 0;
+ }
+ }
+ }
+ }
+
+ private final CountSync control;
+
+ public ReusableLatch() {
+ this(0);
+ }
+
+ public ReusableLatch(final int count) {
+ control = new CountSync(count);
+ }
+
+ public int getCount() {
+ return control.getCount();
+ }
+
+ public void setCount(final int count) {
+ control.setCount(count);
+ }
+
+ public void countUp() {
+ control.add();
+ }
+
+ public void countDown() {
+ control.releaseShared(1);
+ }
+
+ public void countDown(final int count) {
+ control.releaseShared(count);
+ }
+
+ public void await() throws InterruptedException {
+ control.acquireSharedInterruptibly(1);
+ }
+
+ public boolean await(final long milliseconds) throws InterruptedException {
+ return control.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(milliseconds));
+ }
+
+ public boolean await(final long timeWait, TimeUnit timeUnit) throws InterruptedException {
+ return control.tryAcquireSharedNanos(1, timeUnit.toNanos(timeWait));
+ }
+}
diff --git a/artemis-ffm/src/test/java/org/apache/artemis/nativo/jlibaio/test/ffm/FFMNativeHelperTest.java b/artemis-ffm/src/test/java/org/apache/artemis/nativo/jlibaio/test/ffm/FFMNativeHelperTest.java
new file mode 100644
index 00000000000..a5fce2e46d0
--- /dev/null
+++ b/artemis-ffm/src/test/java/org/apache/artemis/nativo/jlibaio/test/ffm/FFMNativeHelperTest.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.artemis.nativo.jlibaio.test.ffm;
+
+import org.apache.artemis.nativo.jlibaio.SubmitInfo;
+import org.apache.artemis.nativo.jlibaio.ffm.FFMNativeHelper;
+import org.apache.artemis.nativo.jlibaio.ffm.IOControl;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.foreign.Arena;
+import java.lang.foreign.MemorySegment;
+import java.lang.foreign.SymbolLookup;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.artemis.nativo.jlibaio.ffm.FFMHandles.LIBAIO;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class FFMNativeHelperTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(FFMNativeHelperTest.class);
+
+ @Test
+ @EnabledOnOs(OS.LINUX)
+ public void libLoadInittest() {
+ logger.trace("@Test:: libLoadInittest");
+ String libName = System.getProperty("libaio.path", "libaio.so.1");
+ SymbolLookup libaio = SymbolLookup.libraryLookup(libName, Arena.global());
+ assertTrue(libaio.find("io_setup").isPresent());
+ }
+
+ @Test
+ @EnabledOnOs(OS.LINUX)
+ public void libLoadtest() {
+ logger.trace("@Test:: libLoadtest");
+ assertTrue(LIBAIO.find("io_setup").isPresent());
+ }
+
+ @Test
+ @EnabledOnOs(OS.LINUX)
+ public void testOpenCloseLifecycle() throws IOException, InterruptedException {
+ logger.trace("@Test:: testOpenCloseLifecycle");
+ Path testFile = Path.of("libaio-test.bin");
+ logger.info("Testing file: {}", testFile.toAbsolutePath());
+ try {
+ int fd = FFMNativeHelper.open(testFile.toString(), false);
+ long allocate = 16 * 1024 * 1024L;
+ FFMNativeHelper.fallocate(fd, allocate);
+ long size = FFMNativeHelper.getSize(fd);
+ assertEquals(allocate, size, "file size mismatch");
+
+ fd = FFMNativeHelper.open(testFile.toString(), true);
+ assertTrue(fd >= 0, "Failed to open with O_DIRECT");
+ logger.info("Opened fd={} WITH O_DIRECT", fd);
+
+ FFMNativeHelper.close(fd);
+ } finally {
+ // Cleanup
+ Files.deleteIfExists(testFile);
+ }
+ }
+
+ @Test
+ @EnabledOnOs(OS.LINUX)
+ public void getBlockSizeFDTest() throws IOException {
+ logger.trace("@Test:: getBlockSizeFDTest");
+ Path testFile = Path.of("libaio-test.bin");
+ logger.info("Testing file: {}", testFile.toAbsolutePath());
+ try {
+ int fd = FFMNativeHelper.open(testFile.toString(), false);
+ long blockSize = FFMNativeHelper.getBlockSizeFD(fd);
+ assertTrue(blockSize > 512 && blockSize < 65536, "Invalid blockSize = " + blockSize);
+ FFMNativeHelper.close(fd);
+ } finally {
+ // Cleanup
+ Files.deleteIfExists(testFile);
+ }
+ }
+
+ @Test
+ @EnabledOnOs(OS.LINUX)
+ public void getBlockSizeTest() throws IOException {
+ logger.trace("@Test:: getBlockSizeTest");
+ Path testFile = Path.of("libaio-test.bin");
+ Files.write(testFile, new byte[4096]);
+ logger.info("Testing file: {}", testFile.toAbsolutePath());
+ try {
+ int fd = FFMNativeHelper.open(testFile.toString(), false);
+ int fdBlockSize = FFMNativeHelper.getBlockSizeFD(fd);
+ FFMNativeHelper.close(fd);
+
+ int pathBlockSize = FFMNativeHelper.getBlockSize(testFile.toString());
+ assertEquals(fdBlockSize, pathBlockSize, "FD vs Path block size mismatch");
+ } finally {
+ // Cleanup
+ Files.deleteIfExists(testFile);
+ }
+ }
+
+ @Test
+ @EnabledOnOs(OS.LINUX)
+ public void memsetBufferTest() throws IOException {
+ logger.trace("@Test:: memsetBufferTest");
+ int blockSize = 4096;
+ ByteBuffer buffer = ByteBuffer.allocateDirect(blockSize * 2);
+ byte[] garbage = new byte[blockSize];
+ new Random().nextBytes(garbage);
+ buffer.put(garbage);
+
+ FFMNativeHelper.memsetBuffer(buffer, blockSize);
+ for (int i = 0; i < blockSize; i++) {
+ assertEquals(0, buffer.get(i), "Byte " + i + " is not Zeroed");
+ }
+ }
+
+ @Test
+ @EnabledOnOs(OS.LINUX)
+ public void newAlignedBufferTest() throws IOException {
+ logger.trace("@Test:: newAlignedBufferTest");
+ int alignment = 4096;
+ int size = alignment * 4;
+
+ ByteBuffer buffer = FFMNativeHelper.newAlignedBuffer(size, alignment).asByteBuffer();
+ assertTrue(buffer.isDirect());
+ assertEquals(size, buffer.capacity());
+
+ MemorySegment addr = MemorySegment.ofBuffer(buffer);
+ long address = addr.address();
+
+ assertEquals(0, address % alignment, "Buffer not aligned to " + alignment);
+ }
+
+ @Test
+ @EnabledOnOs(OS.LINUX)
+ public void testNewContextDeleteContextLifecycle() throws IOException {
+ logger.trace("@Test:: testNewContextDeleteContextLifecycle");
+ FFMNativeHelper helper = new FFMNativeHelper<>(null);
+ IOControl context = null;
+ context = helper.newContext(10);
+ assertNotNull(context, "Context should not be null");
+ logger.info("Created context = {}", context);
+
+ helper.deleteContext(context);
+ logger.info("Context deleted successfully");
+ }
+
+ @Test
+ @EnabledOnOs(OS.LINUX)
+ public void testSubmitWriteReadFullCycle() throws IOException, InterruptedException {
+ logger.trace("@Test:: testSubmitWriteReadFullCycle");
+ Path testFile = Path.of("aio-cycle-test.bin");
+ FFMNativeHelper helper = new FFMNativeHelper<>(null);
+ IOControl context = null;
+ int fd = -1;
+ ByteBuffer writeBuffer = null, readBuffer = null;
+ try {
+ Files.deleteIfExists(testFile);
+ fd = FFMNativeHelper.open(testFile.toString(), true);
+ FFMNativeHelper.fallocate(fd, 4096);
+
+ context = helper.newContext(4);
+
+ byte[] testData = new byte[4096];
+ new Random(12345).nextBytes(testData);
+
+ writeBuffer = FFMNativeHelper.newAlignedBuffer(4096, 4096).asByteBuffer();
+ writeBuffer.put(testData).flip();
+
+ readBuffer = FFMNativeHelper.newAlignedBuffer(4096, 4096).asByteBuffer();
+
+ //Write
+ TestSubmitInfo writeCb = new TestSubmitInfo();
+ helper.submitWrite(fd, context, 0, 4096, writeBuffer, writeCb);
+
+ int events = helper.poll(context, new TestSubmitInfo[1], 1, 1);
+ assertEquals(1, events);
+ assertTrue(writeCb.isDone());
+ assertFalse(writeCb.hasError());
+
+ //Read
+ TestSubmitInfo readCb = new TestSubmitInfo();
+ helper.submitRead(fd, context, 0, 4096, readBuffer, readCb);
+
+ events = helper.poll(context, new TestSubmitInfo[1], 1, 1);
+ assertEquals(1, events);
+ assertTrue(readCb.isDone());
+ assertFalse(readCb.hasError());
+
+ //verify data
+ readBuffer.position(0);
+ byte[] readData = new byte[4096];
+ readBuffer.get(readData);
+ assertArrayEquals(testData, readData);
+ } finally {
+ if (context != null) {
+ helper.deleteContext(context);
+ }
+ if (fd >= 0) {
+ FFMNativeHelper.close(fd);
+ }
+ Files.deleteIfExists(testFile);
+ }
+ }
+
+ @Test
+ @EnabledOnOs(OS.LINUX)
+ public void testPollMultipleEvents() throws IOException, InterruptedException {
+ logger.trace("@Test:: testPollMultipleEvents");
+ Path testFile = Path.of("multi-poll-test.bin");
+ FFMNativeHelper helper = new FFMNativeHelper<>(null);
+ IOControl context = null;
+ int fd = -1;
+ TestSubmitInfo[] callBacks = new TestSubmitInfo[4];
+ MemorySegment[] nativeBuffers = new MemorySegment[4];
+ ByteBuffer[] buffers = new ByteBuffer[4];
+
+ try {
+ Files.deleteIfExists(testFile);
+ fd = FFMNativeHelper.open(testFile.toString(), true);
+ FFMNativeHelper.fallocate(fd, 8192);
+
+ context = helper.newContext(8);
+
+ for (int i = 0; i < 4; i++) {
+ callBacks[i] = new TestSubmitInfo();
+ nativeBuffers[i] = FFMNativeHelper.newAlignedBuffer(4096, 4096);
+ buffers[i] = nativeBuffers[i].asByteBuffer();
+ byte[] data = new byte[2048];
+ new Random(12345 + i).nextBytes(data);
+ buffers[i].put(data).flip();
+ helper.submitWrite(fd, context, i * 2048, 2048, buffers[i], callBacks[i]);
+ }
+
+ int events = helper.poll(context, callBacks, 2, 4);
+ assertTrue(events >= 2 && events <= 4, "Expected 2-4 events, got = " + events);
+
+ for (TestSubmitInfo cb : callBacks) {
+ assertTrue(cb.isDone());
+ assertFalse(cb.hasError());
+ }
+
+ } finally {
+ if (context != null) {
+ helper.deleteContext(context);
+ }
+ if (fd >= 0) {
+ FFMNativeHelper.close(fd);
+ }
+ Files.deleteIfExists(testFile);
+ for (MemorySegment buf : nativeBuffers) {
+ if (buf != null) {
+ FFMNativeHelper.freeBuffer(buf);
+ }
+ }
+ }
+ }
+
+ @Test
+ @EnabledOnOs(OS.LINUX)
+ public void blockedPollTest() throws IOException, InterruptedException {
+ logger.trace("@Test:: blockedPollTest");
+ Path testFile = Path.of("blocked-poll-test.bin");
+ FFMNativeHelper helper = new FFMNativeHelper<>(null);
+ IOControl context = null;
+ int fd = -1;
+ MemorySegment nativeBuffer = null;
+ ByteBuffer buffer = null;
+
+ try {
+ Files.deleteIfExists(testFile);
+ fd = FFMNativeHelper.open(testFile.toString(), true);
+ FFMNativeHelper.fallocate(fd, 4096);
+
+ context = helper.newContext(2);
+
+ TestSubmitInfo callBack = new TestSubmitInfo();
+ nativeBuffer = FFMNativeHelper.newAlignedBuffer(4096, 4096);
+ buffer = nativeBuffer.asByteBuffer();
+ buffer.put((byte) 42).flip();
+
+ helper.submitWrite(fd, context, 0, 4096, buffer, callBack);
+ final IOControl contextRef = context;
+ Thread pollThread = new Thread(() -> {
+ try {
+ helper.blockedPoll(contextRef, false);
+ } catch (Throwable e) {
+ logger.error("BlockedPoll failed", e);
+ }
+ });
+
+ pollThread.start();
+ Thread.sleep(100);
+ pollThread.join(1000);
+
+ assertTrue(callBack.isDone());
+ } finally {
+ if (context != null) {
+ helper.deleteContext(context);
+ }
+ if (fd >= 0) {
+ FFMNativeHelper.close(fd);
+ }
+ Files.deleteIfExists(testFile);
+ if (buffer != null) {
+ FFMNativeHelper.freeBuffer(nativeBuffer);
+ }
+ }
+ }
+
+ @Test
+ @EnabledOnOs(OS.LINUX)
+ public void fillMethodTest() throws IOException {
+ logger.trace("@Test:: fillMethodTest");
+ Path testFile = Path.of("fill-test.bin");
+ int fd = -1;
+
+ try {
+ Files.deleteIfExists(testFile);
+ fd = FFMNativeHelper.open(testFile.toString(), false);
+ long size = 3 * 1024 * 1024L;
+
+ FFMNativeHelper.fill(fd, 4096, size);
+ long actualSize = FFMNativeHelper.getSize(fd);
+ assertEquals(size, actualSize);
+ } finally {
+ if (fd >= 0) {
+ FFMNativeHelper.close(fd);
+ }
+ Files.deleteIfExists(testFile);
+ }
+ }
+
+ @Test
+ @EnabledOnOs(OS.LINUX)
+ public void lockUnlockTest() throws IOException {
+ logger.trace("@Test:: lockUnlockTest");
+ Path testFile = Path.of("lock-test.bin");
+ int fd = -1;
+
+ try {
+ Files.deleteIfExists(testFile);
+ fd = FFMNativeHelper.open(testFile.toString(), false);
+
+ assertTrue(FFMNativeHelper.lock(fd));
+ int fd2 = FFMNativeHelper.open(testFile.toString(), false);
+ try {
+ assertFalse(FFMNativeHelper.lock(fd2));
+ } finally {
+ FFMNativeHelper.close(fd2);
+ }
+ } finally {
+ if (fd >= 0) {
+ FFMNativeHelper.close(fd);
+ Files.deleteIfExists(testFile);
+ }
+ }
+ }
+
+ @Test
+ @EnabledOnOs(OS.LINUX)
+ public void iocbPoolExhaustionTest() throws IOException {
+ logger.trace("@Test:: iocbPoolExhaustionTest");
+ FFMNativeHelper helper = new FFMNativeHelper<>(null);
+ IOControl context = helper.newContext(1);
+ int fd = FFMNativeHelper.open("pool-test.bin", false);
+ MemorySegment nativeBuffer = FFMNativeHelper.newAlignedBuffer(4096, 4096);
+ ByteBuffer buffer = nativeBuffer.asByteBuffer();
+ try {
+ TestSubmitInfo cb1 = new TestSubmitInfo();
+ helper.submitWrite(fd, context, 0, 4096, buffer, cb1);
+
+ TestSubmitInfo cb2 = new TestSubmitInfo();
+ assertThrows(IOException.class, () -> helper.submitWrite(fd, context, 4096, 4096, buffer, cb2));
+
+ helper.poll(context, new TestSubmitInfo[1], 1, 1);
+
+ TestSubmitInfo cb3 = new TestSubmitInfo();
+ helper.submitWrite(fd, context, 8192, 4096, buffer, cb3);
+ } finally {
+ FFMNativeHelper.freeBuffer(nativeBuffer);
+ helper.deleteContext(context);
+ FFMNativeHelper.close(fd);
+ Files.deleteIfExists(Path.of("pool-test.bin"));
+ }
+ }
+
+ private static class TestSubmitInfo implements SubmitInfo {
+
+ private final AtomicBoolean done = new AtomicBoolean(false);
+ private final AtomicBoolean error = new AtomicBoolean(false);
+ private final AtomicReference errorCode = new AtomicReference<>(0);
+ private final AtomicReference errorMessage = new AtomicReference<>("");
+
+ @Override
+ public void onError(int errno, String message) {
+ error.set(true);
+ this.errorCode.set(errno);
+ this.errorMessage.set(message);
+ }
+
+ @Override
+ public void done() {
+ done.set(true);
+ }
+
+ public boolean isDone() {
+ return done.get();
+ }
+
+ public boolean hasError() {
+ return error.get();
+ }
+ }
+}
diff --git a/artemis-ffm/src/test/java/org/apache/artemis/nativo/jlibaio/test/ffm/IOCBLayoutTest.java b/artemis-ffm/src/test/java/org/apache/artemis/nativo/jlibaio/test/ffm/IOCBLayoutTest.java
new file mode 100644
index 00000000000..71731650357
--- /dev/null
+++ b/artemis-ffm/src/test/java/org/apache/artemis/nativo/jlibaio/test/ffm/IOCBLayoutTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.artemis.nativo.jlibaio.test.ffm;
+
+import org.apache.artemis.nativo.jlibaio.ffm.IOCBInit;
+import org.junit.jupiter.api.Test;
+
+import java.lang.foreign.Arena;
+import java.lang.foreign.MemorySegment;
+
+import static org.apache.artemis.nativo.jlibaio.ffm.IOCBInit.IOCB_LAYOUT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class IOCBLayoutTest {
+
+ @Test
+ public void iocbLayoutSizeTest() {
+ assertEquals(64, (int) IOCB_LAYOUT.byteSize(), "Expected 64-byte iocb");
+ }
+
+ @Test
+ public void iocbLayoutValueTest() {
+ try (Arena arena = Arena.ofConfined()) {
+ MemorySegment iocb = arena.allocate(IOCB_LAYOUT);
+ IOCBInit.setAioKey(iocb, 123);
+ IOCBInit.setAioFildes(iocb, 42);
+ IOCBInit.setAioBuf(iocb, 0x7f1234567890L);
+ IOCBInit.setAioNbytes(iocb, 4096);
+ IOCBInit.setAioFlags(iocb, 0);
+
+ assertEquals(123, IOCBInit.getAioKey(iocb));
+ assertEquals(42, IOCBInit.getAioFildes(iocb));
+ assertEquals(0x7f1234567890L, IOCBInit.getAioBuf(iocb));
+ assertEquals(4096, IOCBInit.getAioNbytes(iocb));
+ assertEquals(0, IOCBInit.getAioFlags(iocb));
+ }
+ }
+}
diff --git a/artemis-ffm/src/test/java/org/apache/artemis/nativo/jlibaio/test/ffm/IOControlTest.java b/artemis-ffm/src/test/java/org/apache/artemis/nativo/jlibaio/test/ffm/IOControlTest.java
new file mode 100644
index 00000000000..2bc78fa757a
--- /dev/null
+++ b/artemis-ffm/src/test/java/org/apache/artemis/nativo/jlibaio/test/ffm/IOControlTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.artemis.nativo.jlibaio.test.ffm;
+
+import org.apache.artemis.nativo.jlibaio.ffm.IOCBInit;
+import org.apache.artemis.nativo.jlibaio.ffm.IOControl;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.lang.foreign.Arena;
+import java.lang.foreign.MemorySegment;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DisplayName("IOControl lifecycle and concurrency tests")
+public class IOControlTest {
+
+ private Arena arena;
+ private IOControl ioControl;
+
+ @BeforeEach
+ void setUp() {
+ arena = Arena.ofConfined();
+
+ ioControl = new IOControl();
+ ioControl.setIoContext(arena.allocate(8));
+ ioControl.setEvents(arena.allocate(8));
+ ioControl.setQueueSize(8);
+
+ MemorySegment[] pool = new MemorySegment[8];
+ for (int i = 0; i < pool.length; i++) {
+ pool[i] = arena.allocate(IOCBInit.IOCB_LAYOUT);
+ }
+ ioControl.setIocbPool(pool);
+ }
+
+ @AfterEach
+ void tearDown() {
+ if (arena != null) {
+ arena.close();
+ }
+ }
+
+ @Test
+ void isValidShouldBeTrueForProperlyInitializedControl() {
+ assertTrue(ioControl.isValid());
+ }
+
+ @Test
+ void isValidShouldFailForNullContext() {
+ ioControl.setIoContext(MemorySegment.NULL);
+ assertFalse(ioControl.isValid());
+ }
+
+ @Test
+ void isValidShouldFailForNullEvents() {
+ ioControl.setEvents(MemorySegment.NULL);
+ assertFalse(ioControl.isValid());
+ }
+
+ @Test
+ void isValidShouldFailForZeroQueueSize() {
+ ioControl.setQueueSize(0);
+ assertFalse(ioControl.isValid());
+ }
+
+ @Test
+ void getIOCBShouldReturnDifferentSegmentsUntilQueueIsExhausted() {
+ Set addresses = new HashSet<>();
+
+ for (int i = 0; i < 8; i++) {
+ MemorySegment iocb = ioControl.getIOCB();
+ assertNotNull(iocb);
+ assertTrue(iocb.address() != 0L);
+ addresses.add(iocb.address());
+ }
+
+ assertEquals(8, addresses.size());
+ assertEquals(8, ioControl.used());
+ assertEquals(0, ioControl.iocbGet());
+
+ assertNull(ioControl.getIOCB());
+ }
+
+ @Test
+ void putIOCBShouldReturnIOCBToPoolAndDecrementUsed() {
+ MemorySegment first = ioControl.getIOCB();
+ MemorySegment second = ioControl.getIOCB();
+
+ assertNotNull(first);
+ assertNotNull(second);
+ assertEquals(2, ioControl.used());
+
+ ioControl.putIOCB(first);
+ assertEquals(1, ioControl.used());
+
+ ioControl.putIOCB(second);
+ assertEquals(0, ioControl.used());
+ }
+
+ @Test
+ void getIOCBShouldWrapAround() {
+ for (int i = 0; i < 8; i++) {
+ assertNotNull(ioControl.getIOCB());
+ }
+
+ for (int i = 0; i < 8; i++) {
+ ioControl.putIOCB(ioControl.iocbPool()[i]);
+ }
+
+ assertEquals(0, ioControl.used());
+ assertEquals(0, ioControl.iocbGet());
+ assertEquals(0, ioControl.iocbPut());
+
+ MemorySegment again = ioControl.getIOCB();
+ assertNotNull(again);
+ assertEquals(1, ioControl.used());
+ assertEquals(1, ioControl.iocbGet());
+ }
+
+ @Test
+ void putIOCBShouldIgnoreNullAndInvalidSegments() {
+ assertDoesNotThrow(() -> ioControl.putIOCB(null));
+ assertDoesNotThrow(() -> ioControl.putIOCB(MemorySegment.NULL));
+ assertEquals(0, ioControl.used());
+ }
+
+ @Test
+ void getIOCBShouldReturnNullWhenPoolIsEmpty() {
+ for (int i = 0; i < 8; i++) {
+ assertNotNull(ioControl.getIOCB());
+ }
+
+ assertNull(ioControl.getIOCB());
+ assertEquals(8, ioControl.used());
+ }
+
+ @Test
+ void concurrentGetAndPutShouldPreserveInvariant() throws Exception {
+ final int threads = 8;
+ final int iterationsPerThread = 5_000;
+
+ ExecutorService executor = Executors.newFixedThreadPool(threads);
+ CountDownLatch start = new CountDownLatch(1);
+ List> tasks = new ArrayList<>();
+
+ for (int t = 0; t < threads; t++) {
+ tasks.add(() -> {
+ start.await();
+
+ for (int i = 0; i < iterationsPerThread; i++) {
+ MemorySegment iocb = ioControl.getIOCB();
+ if (iocb != null) {
+ ioControl.putIOCB(iocb);
+ }
+ }
+ return null;
+ });
+ }
+
+ try {
+ List> futures = new ArrayList<>();
+ for (Callable task : tasks) {
+ futures.add(executor.submit(task));
+ }
+
+ start.countDown();
+
+ for (Future future : futures) {
+ future.get(30, TimeUnit.SECONDS);
+ }
+
+ assertTrue(ioControl.isValid());
+ assertEquals(0, ioControl.used());
+ assertEquals(0, ioControl.iocbGet());
+ assertEquals(0, ioControl.iocbPut());
+
+ MemorySegment[] pool = ioControl.iocbPool();
+ assertNotNull(pool);
+ assertEquals(8, pool.length);
+
+ Set addresses = new HashSet<>();
+ for (MemorySegment seg : pool) {
+ assertNotNull(seg);
+ assertTrue(seg.address() != 0L);
+ addresses.add(seg.address());
+ }
+ assertEquals(8, addresses.size());
+ } finally {
+ executor.shutdownNow();
+ assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));
+ }
+ }
+
+ @Test
+ void concurrentGetShouldNeverReturnSameIOCBTwiceWithoutPut() throws Exception {
+ final int threads = 8;
+ ExecutorService executor = Executors.newFixedThreadPool(threads);
+ CountDownLatch start = new CountDownLatch(1);
+
+ try {
+ List> futures = new ArrayList<>();
+ for (int i = 0; i < threads; i++) {
+ futures.add(executor.submit(() -> {
+ start.await();
+ return ioControl.getIOCB();
+ }));
+ }
+
+ start.countDown();
+
+ Set addresses = new HashSet<>();
+ int nonNullCount = 0;
+
+ for (Future future : futures) {
+ MemorySegment seg = future.get(10, TimeUnit.SECONDS);
+ if (seg != null) {
+ nonNullCount++;
+ addresses.add(seg.address());
+ }
+ }
+
+ assertEquals(nonNullCount, addresses.size());
+ assertTrue(ioControl.used() <= ioControl.queueSize());
+ assertTrue(ioControl.isValid());
+ } finally {
+ executor.shutdownNow();
+ assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));
+ }
+ }
+
+ @Test
+ void concurrentPutShouldBeSafeAfterPreallocation() throws Exception {
+ MemorySegment[] taken = new MemorySegment[8];
+ for (int i = 0; i < 8; i++) {
+ taken[i] = ioControl.getIOCB();
+ assertNotNull(taken[i]);
+ }
+ assertEquals(8, ioControl.used());
+
+ final int threads = 8;
+ ExecutorService executor = Executors.newFixedThreadPool(threads);
+ CountDownLatch start = new CountDownLatch(1);
+
+ try {
+ List> futures = new ArrayList<>();
+ for (int i = 0; i < threads; i++) {
+ final MemorySegment seg = taken[i];
+ futures.add(executor.submit(() -> {
+ start.await();
+ ioControl.putIOCB(seg);
+ return null;
+ }));
+ }
+
+ start.countDown();
+
+ for (Future future : futures) {
+ future.get(10, TimeUnit.SECONDS);
+ }
+
+ assertEquals(0, ioControl.used());
+ assertEquals(0, ioControl.iocbGet());
+ assertEquals(0, ioControl.iocbPut());
+ assertTrue(ioControl.isValid());
+ } finally {
+ executor.shutdownNow();
+ assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));
+ }
+ }
+}
diff --git a/artemis-ffm/src/test/java/org/apache/artemis/nativo/jlibaio/test/jmh/AioCompareBenchmark.java b/artemis-ffm/src/test/java/org/apache/artemis/nativo/jlibaio/test/jmh/AioCompareBenchmark.java
new file mode 100644
index 00000000000..329dc3886ba
--- /dev/null
+++ b/artemis-ffm/src/test/java/org/apache/artemis/nativo/jlibaio/test/jmh/AioCompareBenchmark.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.artemis.nativo.jlibaio.test.jmh;
+
+import org.apache.artemis.nativo.jlibaio.LibaioContext;
+import org.apache.artemis.nativo.jlibaio.LibaioFile;
+import org.apache.artemis.nativo.jlibaio.SubmitInfo;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.File;
+import java.lang.foreign.MemorySegment;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+@State(Scope.Benchmark)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@Fork(value = 2)
+@Warmup(iterations = 5, time = 200, timeUnit = TimeUnit.MILLISECONDS)
+@Measurement(iterations = 10, time = 200, timeUnit = TimeUnit.MILLISECONDS)
+public class AioCompareBenchmark {
+
+ private static final int FILE_SIZE = 10000 * 4096;
+ private static final int BLOCK_SIZE = 4096;
+
+ @Param({"2048"})
+ private int LIBAIO_QUEUE_SIZE;
+
+ @Param({"10000"})
+ private int recordCount;
+
+ private File file;
+ private LibaioContext control;
+ private LibaioFile libaioFile;
+
+ private MemorySegment headerSegment;
+ private ByteBuffer headerBuffer;
+
+ private MemorySegment recordSegment;
+ private ByteBuffer recordBuffer;
+
+ private final AtomicReference currentLatch = new AtomicReference<>();
+
+ private Thread pollThread;
+ private volatile boolean polling = true;
+
+ private final SubmitInfo callback = new SubmitInfo() {
+ @Override
+ public void onError(int errno, String message) {
+ //ignore
+ }
+
+ @Override
+ public void done() {
+ CountDownLatch latch = currentLatch.get();
+ if (latch != null) {
+ latch.countDown();
+ }
+ }
+ };
+
+ private long fileId = 1L;
+
+ @Setup(Level.Trial)
+ public void setuo() throws Exception {
+ file = File.createTempFile("aio-bench-", ".dat");
+
+ control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, true, true);
+ libaioFile = control.openFile(file, true);
+
+ //one-time file initialization
+ libaioFile.fallocate(FILE_SIZE);
+
+ headerSegment = LibaioContext.newAlignedBuffer(BLOCK_SIZE, BLOCK_SIZE);
+ headerBuffer = headerSegment.asByteBuffer();
+
+ recordSegment = LibaioContext.newAlignedBuffer(BLOCK_SIZE, BLOCK_SIZE);
+ recordBuffer = recordSegment.asByteBuffer();
+
+ initRecord(headerBuffer); // filling the record clock with 1
+ initRecord(recordBuffer); // filling the record clock with 1
+
+ fillHeader(fileId);
+ updateRecord(recordBuffer, fileId, 0L);
+
+ polling = true;
+ pollThread = new Thread(() -> {
+ while (polling && !Thread.currentThread().isInterrupted()) {
+ try {
+ control.poll();
+ } catch (Throwable e) {
+ if (polling) {
+ throw new RuntimeException(e);
+ }
+ break;
+ }
+ }
+ }, "aio-jmh-poll-thread");
+ pollThread.setDaemon(true);
+ pollThread.start();
+ }
+
+ @TearDown(Level.Trial)
+ public void tearDown() throws Exception {
+ polling = false;
+ if (pollThread != null) {
+ pollThread.interrupt();
+ pollThread.join(TimeUnit.SECONDS.toMillis(10));
+ }
+
+ if (libaioFile != null) {
+ libaioFile.close();
+ }
+ if (control != null) {
+ control.close();
+ }
+ if (headerSegment != null && headerSegment.address() != 0) {
+ LibaioContext.freeBuffer(headerSegment);
+ }
+ if (recordSegment != null && recordSegment.address() != 0) {
+ LibaioContext.freeBuffer(recordSegment);
+ }
+ if (file != null) {
+ file.delete();
+ }
+ }
+
+ @Benchmark
+ public void writeHeaderAndRecord() throws Exception {
+ CountDownLatch latch = new CountDownLatch(recordCount * 100);
+ currentLatch.set(latch);
+
+ try {
+ // fillHeader(fileId);
+ // libaioFile.write(0L, BLOCK_SIZE, headerBuffer, callback);
+
+ for (int j = 0; j < 100; j++) {
+ for (int i = 0; i < recordCount; i++) {
+ updateRecord(recordBuffer, fileId, i);
+ long offset = BLOCK_SIZE + ((long) i * BLOCK_SIZE);
+ libaioFile.write(offset, BLOCK_SIZE, recordBuffer, callback);
+ }
+ }
+
+ latch.await();
+ } finally {
+ currentLatch.compareAndSet(latch, null);
+ }
+ }
+
+ private void fillHeader(long fileId) {
+ headerBuffer.putLong(0, fileId);
+ }
+
+ private void updateRecord(ByteBuffer buffer, long fileId, long recordId) {
+ buffer.putLong(0, fileId);
+ buffer.putLong(8, recordId);
+ }
+
+ private void initRecord(ByteBuffer record) {
+ while (record.position() < BLOCK_SIZE) {
+ record.put((byte) 1);
+ }
+ }
+}
diff --git a/artemis-ffm/src/test/java/org/apache/artemis/nativo/jlibaio/test/jmh/BenchmarkRunner.java b/artemis-ffm/src/test/java/org/apache/artemis/nativo/jlibaio/test/jmh/BenchmarkRunner.java
new file mode 100644
index 00000000000..29b66d6a245
--- /dev/null
+++ b/artemis-ffm/src/test/java/org/apache/artemis/nativo/jlibaio/test/jmh/BenchmarkRunner.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.artemis.nativo.jlibaio.test.jmh;
+
+import org.openjdk.jmh.Main;
+
+import java.io.IOException;
+
+public class BenchmarkRunner {
+
+ public static void main(String[] args) throws IOException {
+ Main.main(args);
+ }
+}
diff --git a/pom.xml b/pom.xml
index 64a973ca016..51bf01a8ccc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -74,6 +74,7 @@
artemis-lockmanager
artemis-image
artemis-image/examples
+ artemis-ffm
Apache Artemis
@@ -750,6 +751,15 @@
+
+ jdk22-plus
+
+ [22,)
+
+
+ artemis-ffm
+
+