diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 64dae226d2..cdc3059553 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -37,8 +37,16 @@ jobs: if: matrix.target-platform == 'Native' run: | sudo apt-get update - sudo apt-get install libidn2-dev libcurl3-dev + sudo apt-get install libidn2-dev libcurl3-dev cmake libssl-dev echo "STTP_NATIVE=1" >> $GITHUB_ENV + - name: Install s2n-tls + if: matrix.target-platform == 'Native' + run: | + git clone https://github.com/aws/s2n-tls.git /tmp/s2n-tls + cd /tmp/s2n-tls + cmake . -Bbuild -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=/usr/local + cmake --build build -j $(nproc) + sudo cmake --install build - name: Install scala-cli if: matrix.target-platform == 'JVM' uses: VirtusLab/scala-cli-setup@77834b5926f3eb70869d8009530c65585f7a039b # main, specifically v1.9.1 @@ -95,7 +103,14 @@ jobs: - name: Install libidn2-dev libcurl3-dev run: | sudo apt-get update - sudo apt-get install libidn2-dev libcurl3-dev + sudo apt-get install libidn2-dev libcurl3-dev cmake libssl-dev + - name: Install s2n-tls + run: | + git clone https://github.com/aws/s2n-tls.git /tmp/s2n-tls + cd /tmp/s2n-tls + cmake . -Bbuild -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=/usr/local + cmake --build build -j $(nproc) + sudo cmake --install build - name: Enable Native-specific modules if: matrix.java == '11' run: echo "STTP_NATIVE=1" >> $GITHUB_ENV diff --git a/armeria-backend/fs2/src/main/scala/sttp/client4/armeria/fs2/ArmeriaFs2Backend.scala b/armeria-backend/fs2/src/main/scala/sttp/client4/armeria/fs2/ArmeriaFs2Backend.scala index df4fcc7921..48525db9fb 100644 --- a/armeria-backend/fs2/src/main/scala/sttp/client4/armeria/fs2/ArmeriaFs2Backend.scala +++ b/armeria-backend/fs2/src/main/scala/sttp/client4/armeria/fs2/ArmeriaFs2Backend.scala @@ -15,8 +15,8 @@ import sttp.client4.impl.cats.CatsMonadAsyncError import sttp.client4.{wrappers, BackendOptions, StreamBackend} import sttp.monad.MonadAsyncError import sttp.client4.compression.Compressor -import sttp.client4.impl.fs2.DeflateFs2Compressor -import sttp.client4.impl.fs2.GZipFs2Compressor +import sttp.client4.impl.fs2.PlatformDeflateFs2Compressor +import sttp.client4.impl.fs2.PlatformGZipFs2Compressor private final class ArmeriaFs2Backend[F[_]: Async](client: WebClient, closeFactory: Boolean, dispatcher: Dispatcher[F]) extends AbstractArmeriaBackend[F, Fs2Streams[F]](client, closeFactory, new CatsMonadAsyncError) { @@ -45,7 +45,7 @@ private final class ArmeriaFs2Backend[F[_]: Async](client: WebClient, closeFacto ) override protected def compressors: List[Compressor[R]] = - List(new GZipFs2Compressor[F, R](), new DeflateFs2Compressor[F, R]()) + List(new PlatformGZipFs2Compressor[F, R], new PlatformDeflateFs2Compressor[F, R]) override protected def ensureOnAbnormal[T](effect: F[T])(finalizer: => F[Unit]): F[T] = Async[F].guaranteeCase(effect) { outcome => diff --git a/build.sbt b/build.sbt index 9776f80eb1..e323bbe6f0 100644 --- a/build.sbt +++ b/build.sbt @@ -461,13 +461,23 @@ lazy val fs2 = (projectMatrix in file("effects/fs2")) libraryDependencies ++= Seq( "co.fs2" %%% "fs2-reactive-streams" % fs2_3_version, "co.fs2" %%% "fs2-io" % fs2_3_version - ) + ), + Compile / unmanagedSourceDirectories += (ThisBuild / baseDirectory).value / "effects" / "fs2" / "src" / "main" / "scalajvmnative" ) ) .jsPlatform( scalaVersions = scala2And3, settings = commonJsSettings ++ commonJsBackendSettings ++ browserChromeTestSettings ++ testServerSettings ) + .nativePlatform( + scalaVersions = scala2And3, + settings = commonNativeSettings ++ testServerSettings ++ Seq( + libraryDependencies ++= Seq( + "co.fs2" %%% "fs2-io" % fs2_3_version + ), + Compile / unmanagedSourceDirectories += (ThisBuild / baseDirectory).value / "effects" / "fs2" / "src" / "main" / "scalajvmnative" + ) + ) lazy val monix = (projectMatrix in file("effects/monix")) .settings( @@ -658,13 +668,23 @@ lazy val http4sBackend = (projectMatrix in file("http4s-backend")) .settings( name := "http4s-backend", libraryDependencies ++= Seq( - "org.http4s" %% "http4s-client" % http4s_ce3_version, - "org.http4s" %% "http4s-ember-client" % "0.23.34" % Optional, - "org.http4s" %% "http4s-blaze-client" % "0.23.17" % Optional + "org.http4s" %%% "http4s-client" % http4s_ce3_version, + "org.http4s" %%% "http4s-ember-client" % http4s_ce3_version % Optional ), evictionErrorLevel := Level.Info ) - .jvmPlatform(scalaVersions = scala2And3) + .jvmPlatform( + scalaVersions = scala2And3, + settings = commonJvmSettings ++ Seq( + libraryDependencies ++= Seq( + "org.http4s" %% "http4s-blaze-client" % "0.23.17" % Optional + ) + ) + ) + .nativePlatform( + scalaVersions = scala2And3, + settings = commonNativeSettings ++ testServerSettings + ) .dependsOn(cats % compileAndTest, core % compileAndTest, fs2 % compileAndTest) //-- finagle backend diff --git a/docs/backends/http4s.md b/docs/backends/http4s.md index deb554b68a..59b60f0226 100644 --- a/docs/backends/http4s.md +++ b/docs/backends/http4s.md @@ -28,7 +28,26 @@ Sending a request is a non-blocking, lazily-evaluated operation and results in a There are also [other cats-effect-based backends](catseffect.md), which don't depend on http4s. -Please note that: +## Scala Native Support + +When using the http4s backend on Scala Native, the `EmberClientBuilder` methods require a `Network[F]` constraint in addition to `Async[F]`: + +```scala mdoc:compile-only +import cats.effect.* +import fs2.io.net.Network +import sttp.capabilities.fs2.Fs2Streams +import sttp.client4.* +import sttp.client4.http4s.* + +// On Scala Native, this requires an implicit Network[IO] in scope +Http4sBackend.usingDefaultEmberClientBuilder[IO](): Resource[IO, StreamBackend[IO, Fs2Streams[IO]]] +``` + +The `Network[IO]` instance is provided automatically when extending `cats.effect.IOApp` or can be summoned explicitly using `Network[IO]` where needed. This constraint is specific to Scala Native; on the JVM, only the `Async[F]` constraint is required. + +Note that the Blaze client is JVM-only and not available on Scala Native. + +## Notes * the backend contains **optional** dependencies on `http4s-ember-client` and `http4s-blaze-client`, to provide the `Http4sBackend.usingEmberClientBuilder`, `Http4sBackend.usingBlazeClientBuilder`, `Http4sBackend.usingDefaultEmberClientBuilder` and `Http4sBackend.usingDefaultBlazeClientBuilder` methods. This makes the client usable with other http4s client implementations, without the need to depend on ember or blaze. * the backend does not support `SttpBackendOptions`, that is specifying proxy settings (proxies are not implemented in http4s, see [this issue](https://github.com/http4s/http4s/issues/251)), as well as configuring the connect timeout diff --git a/effects/fs2/src/main/scala/sttp/client4/impl/fs2/fs2Compressor.scala b/effects/fs2/src/main/scala/sttp/client4/impl/fs2/fs2Compressor.scala new file mode 100644 index 0000000000..ba745e377e --- /dev/null +++ b/effects/fs2/src/main/scala/sttp/client4/impl/fs2/fs2Compressor.scala @@ -0,0 +1,62 @@ +package sttp.client4.impl.fs2 + +import sttp.client4._ +import sttp.client4.GenericRequestBody +import fs2._ +import fs2.compression.Compression +import cats.syntax.all._ +import cats.effect.Sync +import sttp.capabilities.fs2.Fs2Streams +import fs2.compression.DeflateParams +import sttp.client4.compression.Compressor +import sttp.client4.internal.byteBufferToArray +import sttp.model.Encodings + +trait Fs2Compressor[F[_], R <: Fs2Streams[F]] extends Compressor[R] { + protected val fSync: Sync[F] + + override def apply[R2 <: R](body: GenericRequestBody[R2]): GenericRequestBody[R] = + body match { + case NoBody => NoBody + case StringBody(s, encoding, _) => + StreamBody(Fs2Streams[F])(compressStream(Stream.chunk(Chunk.array(s.getBytes(encoding))))) + case ByteArrayBody(b, _) => + StreamBody(Fs2Streams[F])(compressStream(Stream.chunk(Chunk.array(b)))) + case ByteBufferBody(b, _) => + StreamBody(Fs2Streams[F])(compressStream(Stream.chunk(Chunk.array(byteBufferToArray(b))))) + case InputStreamBody(b, _) => + compressInputStreamBody(b) + case StreamBody(b) => + StreamBody(Fs2Streams[F])(compressStream(b.asInstanceOf[Stream[F, Byte]])) + case fb @ FileBody(_, _) => + compressFileBody(fb) + case MultipartStreamBody(_) | BasicMultipartBody(_) => + throw new IllegalArgumentException("Multipart bodies cannot be compressed") + } + + protected def compressInputStreamBody(b: java.io.InputStream): GenericRequestBody[R] = + throw new UnsupportedOperationException("InputStream compression is not supported on this platform") + + protected def compressFileBody(f: FileBody): GenericRequestBody[R] = + throw new UnsupportedOperationException("File compression is not supported on this platform") + + def compressStream(stream: Stream[F, Byte]): Stream[F, Byte] +} + +class GZipFs2Compressor[F[_]: Compression: Sync, R <: Fs2Streams[F]] extends Fs2Compressor[F, R] { + + override protected val fSync: Sync[F] = implicitly + override val encoding: String = Encodings.Gzip + + def compressStream(stream: Stream[F, Byte]): Stream[F, Byte] = + stream.through(fs2.compression.Compression[F].gzip()) +} + +class DeflateFs2Compressor[F[_]: Compression: Sync, R <: Fs2Streams[F]] extends Fs2Compressor[F, R] { + + override protected val fSync: Sync[F] = implicitly + override val encoding: String = Encodings.Deflate + + def compressStream(stream: Stream[F, Byte]): Stream[F, Byte] = + stream.through(fs2.compression.Compression[F].deflate(DeflateParams())) +} diff --git a/effects/fs2/src/main/scalajvm/sttp/client4/impl/fs2/fs2Decompressors.scala b/effects/fs2/src/main/scala/sttp/client4/impl/fs2/fs2Decompressors.scala similarity index 100% rename from effects/fs2/src/main/scalajvm/sttp/client4/impl/fs2/fs2Decompressors.scala rename to effects/fs2/src/main/scala/sttp/client4/impl/fs2/fs2Decompressors.scala diff --git a/effects/fs2/src/main/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2Backend.scala b/effects/fs2/src/main/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2Backend.scala index 25203e3526..63b05b1072 100644 --- a/effects/fs2/src/main/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2Backend.scala +++ b/effects/fs2/src/main/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2Backend.scala @@ -19,10 +19,10 @@ import sttp.client4.compression.Compressor import sttp.client4.httpclient.HttpClientAsyncBackend import sttp.client4.httpclient.HttpClientBackend import sttp.client4.impl.cats.implicits._ -import sttp.client4.impl.fs2.DeflateFs2Compressor +import sttp.client4.impl.fs2.PlatformDeflateFs2Compressor import sttp.client4.impl.fs2.DeflateFs2Decompressor import sttp.client4.impl.fs2.Fs2SimpleQueue -import sttp.client4.impl.fs2.GZipFs2Compressor +import sttp.client4.impl.fs2.PlatformGZipFs2Compressor import sttp.client4.impl.fs2.GZipFs2Decompressor import sttp.client4.internal.httpclient.BodyFromHttpClient import sttp.client4.internal.httpclient.BodyToHttpClient @@ -136,7 +136,7 @@ class HttpClientFs2Backend[F[_]: Async] private ( object HttpClientFs2Backend { def defaultCompressionHandlers[F[_]: Async]: CompressionHandlers[Fs2Streams[F], Stream[F, Byte]] = CompressionHandlers( - List(new GZipFs2Compressor[F, Fs2Streams[F]](), new DeflateFs2Compressor[F, Fs2Streams[F]]()), + List(new PlatformGZipFs2Compressor[F, Fs2Streams[F]], new PlatformDeflateFs2Compressor[F, Fs2Streams[F]]), List(new GZipFs2Decompressor, new DeflateFs2Decompressor) ) diff --git a/effects/fs2/src/main/scalajvm/sttp/client4/impl/fs2/fs2Compressor.scala b/effects/fs2/src/main/scalajvm/sttp/client4/impl/fs2/fs2Compressor.scala deleted file mode 100644 index 15ab608cbe..0000000000 --- a/effects/fs2/src/main/scalajvm/sttp/client4/impl/fs2/fs2Compressor.scala +++ /dev/null @@ -1,48 +0,0 @@ -package sttp.client4.impl.fs2 - -import sttp.client4._ -import sttp.client4.GenericRequestBody -import fs2._ -import fs2.compression.Compression -import cats.syntax.all._ -import fs2.io.file.Files -import cats.effect.Sync -import sttp.capabilities.fs2.Fs2Streams -import fs2.compression.DeflateParams -import sttp.client4.compression.{Compressor, DeflateDefaultCompressor, GZipDefaultCompressor} - -trait Fs2Compressor[F[_], R <: Fs2Streams[F]] extends Compressor[R] { - protected val fSync: Sync[F] - protected val fFiles: Files[F] - - override abstract def apply[R2 <: R](body: GenericRequestBody[R2]): GenericRequestBody[R] = - body match { - case InputStreamBody(b, _) => - StreamBody(Fs2Streams[F])(compressStream(fs2.io.readInputStream(b.pure[F](fSync), 1024)(fSync))) - case StreamBody(b) => StreamBody(Fs2Streams[F])(compressStream(b.asInstanceOf[fs2.Stream[F, Byte]])) - case FileBody(f, _) => StreamBody(Fs2Streams[F])(compressStream(Files[F](fFiles).readAll(f.toPath, 1024))) - case _ => super.apply(body) - } - - def compressStream(stream: fs2.Stream[F, Byte]): fs2.Stream[F, Byte] -} - -class GZipFs2Compressor[F[_]: Compression: Sync: Files, R <: Fs2Streams[F]] - extends GZipDefaultCompressor[R] - with Fs2Compressor[F, R] { - - override protected val fSync: Sync[F] = implicitly - override protected val fFiles: Files[F] = implicitly - - def compressStream(stream: Stream[F, Byte]): Stream[F, Byte] = stream.through(fs2.compression.Compression[F].gzip()) -} - -class DeflateFs2Compressor[F[_]: Compression: Sync: Files, R <: Fs2Streams[F]] - extends DeflateDefaultCompressor[R] - with Fs2Compressor[F, R] { - override protected val fSync: Sync[F] = implicitly - override protected val fFiles: Files[F] = implicitly - - def compressStream(stream: Stream[F, Byte]): Stream[F, Byte] = - stream.through(fs2.compression.Compression[F].deflate(DeflateParams())) -} diff --git a/effects/fs2/src/main/scalajvmnative/sttp/client4/impl/fs2/fs2CompressorPlatform.scala b/effects/fs2/src/main/scalajvmnative/sttp/client4/impl/fs2/fs2CompressorPlatform.scala new file mode 100644 index 0000000000..d8f2bc9311 --- /dev/null +++ b/effects/fs2/src/main/scalajvmnative/sttp/client4/impl/fs2/fs2CompressorPlatform.scala @@ -0,0 +1,25 @@ +package sttp.client4.impl.fs2 + +import fs2.Stream +import fs2.io.file.Files +import cats.effect.Sync +import sttp.capabilities.fs2.Fs2Streams +import sttp.client4._ + +class PlatformGZipFs2Compressor[F[_]: Sync: Files, R <: Fs2Streams[F]] extends GZipFs2Compressor[F, R] { + + override protected def compressInputStreamBody(b: java.io.InputStream): GenericRequestBody[R] = + StreamBody(Fs2Streams[F])(compressStream(fs2.io.readInputStream(fSync.delay(b), 1024)(fSync))) + + override protected def compressFileBody(fb: FileBody): GenericRequestBody[R] = + StreamBody(Fs2Streams[F])(compressStream(Files[F].readAll(fb.f.toPath, 1024))) +} + +class PlatformDeflateFs2Compressor[F[_]: Sync: Files, R <: Fs2Streams[F]] extends DeflateFs2Compressor[F, R] { + + override protected def compressInputStreamBody(b: java.io.InputStream): GenericRequestBody[R] = + StreamBody(Fs2Streams[F])(compressStream(fs2.io.readInputStream(fSync.delay(b), 1024)(fSync))) + + override protected def compressFileBody(fb: FileBody): GenericRequestBody[R] = + StreamBody(Fs2Streams[F])(compressStream(Files[F].readAll(fb.f.toPath, 1024))) +} diff --git a/http4s-backend/src/main/scala/sttp/client4/http4s/Http4sBackend.scala b/http4s-backend/src/main/scala/sttp/client4/http4s/Http4sBackendBase.scala similarity index 79% rename from http4s-backend/src/main/scala/sttp/client4/http4s/Http4sBackend.scala rename to http4s-backend/src/main/scala/sttp/client4/http4s/Http4sBackendBase.scala index 7b3583f8d2..5d302da95b 100644 --- a/http4s-backend/src/main/scala/sttp/client4/http4s/Http4sBackend.scala +++ b/http4s-backend/src/main/scala/sttp/client4/http4s/Http4sBackendBase.scala @@ -2,19 +2,23 @@ package sttp.client4.http4s import java.io.InputStream import java.nio.charset.Charset -import cats.effect.{Async, Deferred, Resource} +import cats.effect.{Async, Deferred} import cats.implicits._ import cats.effect.implicits._ import fs2.io.file.Files import fs2.{Chunk, Stream} import org.http4s.{EntityBody, Request => Http4sRequest, Status} import org.http4s -import org.http4s.blaze.client.BlazeClientBuilder import org.http4s.client.Client -import org.http4s.ember.client.EmberClientBuilder import org.typelevel.ci.CIString import sttp.capabilities.fs2.Fs2Streams import sttp.client4.impl.cats.CatsMonadAsyncError +import sttp.client4.impl.fs2.{ + DeflateFs2Decompressor, + GZipFs2Decompressor, + PlatformDeflateFs2Compressor, + PlatformGZipFs2Compressor +} import sttp.client4.internal.{BodyFromResponseAs, IOBufferSize, SttpFile} import sttp.model._ import sttp.monad.MonadError @@ -22,21 +26,16 @@ import sttp.client4.testing.StreamBackendStub import sttp.client4.ws.{GotAWebSocketException, NotAWebSocketException} import sttp.client4._ import sttp.client4.wrappers.FollowRedirectsBackend -import sttp.client4.compression.Compressor -import sttp.client4.impl.fs2.GZipFs2Compressor -import sttp.client4.impl.fs2.DeflateFs2Compressor -import sttp.client4.compression.CompressionHandlers -import sttp.client4.impl.fs2.GZipFs2Decompressor -import sttp.client4.impl.fs2.DeflateFs2Decompressor -import sttp.client4.compression.Decompressor +import sttp.client4.compression.{CompressionHandlers, Compressor, Decompressor} import cats.effect.kernel.Resource.ExitCase // needs http4s using cats-effect -class Http4sBackend[F[_]: Async]( - client: Client[F], - customizeRequest: Http4sRequest[F] => Http4sRequest[F], - compressionHandlers: CompressionHandlers[Fs2Streams[F], EntityBody[F]] -) extends StreamBackend[F, Fs2Streams[F]] { +private[http4s] abstract class Http4sBackendBase[F[_]](implicit protected val asyncF: Async[F]) + extends StreamBackend[F, Fs2Streams[F]] { + protected def client: Client[F] + protected def customizeRequest: Http4sRequest[F] => Http4sRequest[F] + protected def compressionHandlers: CompressionHandlers[Fs2Streams[F], EntityBody[F]] + type R = Fs2Streams[F] with sttp.capabilities.Effect[F] override def send[T](r: GenericRequest[T, R]): F[Response[T]] = @@ -284,58 +283,20 @@ class Http4sBackend[F[_]: Async]( override def close(): F[Unit] = monad.unit(()) } -object Http4sBackend { +private[http4s] object Http4sBackendBase { + def defaultCompressionHandlers[F[_]: Async]: CompressionHandlers[Fs2Streams[F], Stream[F, Byte]] = CompressionHandlers( - List(new GZipFs2Compressor[F, Fs2Streams[F]](), new DeflateFs2Compressor[F, Fs2Streams[F]]()), + List(new PlatformGZipFs2Compressor[F, Fs2Streams[F]], new PlatformDeflateFs2Compressor[F, Fs2Streams[F]]), List(new GZipFs2Decompressor, new DeflateFs2Decompressor) ) def usingClient[F[_]: Async]( client: Client[F], - customizeRequest: Http4sRequest[F] => Http4sRequest[F] = identity[Http4sRequest[F]] _, - compressionHandlers: Async[F] => CompressionHandlers[Fs2Streams[F], EntityBody[F]] = - defaultCompressionHandlers[F](_: Async[F]) + customizeRequest: Http4sRequest[F] => Http4sRequest[F], + compressionHandlers: Async[F] => CompressionHandlers[Fs2Streams[F], EntityBody[F]] ): StreamBackend[F, Fs2Streams[F]] = FollowRedirectsBackend(new Http4sBackend[F](client, customizeRequest, compressionHandlers(implicitly))) - def usingBlazeClientBuilder[F[_]: Async]( - blazeClientBuilder: BlazeClientBuilder[F], - customizeRequest: Http4sRequest[F] => Http4sRequest[F] = identity[Http4sRequest[F]] _, - compressionHandlers: Async[F] => CompressionHandlers[Fs2Streams[F], EntityBody[F]] = - defaultCompressionHandlers[F](_: Async[F]) - ): Resource[F, StreamBackend[F, Fs2Streams[F]]] = - blazeClientBuilder.resource.map(c => usingClient(c, customizeRequest, compressionHandlers)) - - def usingDefaultBlazeClientBuilder[F[_]: Async]( - customizeRequest: Http4sRequest[F] => Http4sRequest[F] = identity[Http4sRequest[F]] _, - compressionHandlers: Async[F] => CompressionHandlers[Fs2Streams[F], EntityBody[F]] = - defaultCompressionHandlers[F](_: Async[F]) - ): Resource[F, StreamBackend[F, Fs2Streams[F]]] = - usingBlazeClientBuilder( - BlazeClientBuilder[F], - customizeRequest, - compressionHandlers - ) - - def usingEmberClientBuilder[F[_]: Async]( - emberClientBuilder: EmberClientBuilder[F], - customizeRequest: Http4sRequest[F] => Http4sRequest[F] = identity[Http4sRequest[F]] _, - compressionHandlers: Async[F] => CompressionHandlers[Fs2Streams[F], EntityBody[F]] = - defaultCompressionHandlers[F](_: Async[F]) - ): Resource[F, StreamBackend[F, Fs2Streams[F]]] = - emberClientBuilder.build.map(c => usingClient(c, customizeRequest, compressionHandlers)) - - def usingDefaultEmberClientBuilder[F[_]: Async]( - customizeRequest: Http4sRequest[F] => Http4sRequest[F] = identity[Http4sRequest[F]] _, - compressionHandlers: Async[F] => CompressionHandlers[Fs2Streams[F], EntityBody[F]] = - defaultCompressionHandlers[F](_: Async[F]) - ): Resource[F, StreamBackend[F, Fs2Streams[F]]] = - usingEmberClientBuilder(EmberClientBuilder.default[F], customizeRequest, compressionHandlers) - - /** Create a stub backend for testing, which uses the `F` response wrapper, and supports `Stream[F, Byte]` streaming. - * - * See [[StreamBackendStub]] for details on how to configure stub responses. - */ def stub[F[_]: Async]: StreamBackendStub[F, Fs2Streams[F]] = StreamBackendStub(new CatsMonadAsyncError) } diff --git a/http4s-backend/src/main/scalajvm/sttp/client4/http4s/Http4sBackend.scala b/http4s-backend/src/main/scalajvm/sttp/client4/http4s/Http4sBackend.scala new file mode 100644 index 0000000000..5adc4f2835 --- /dev/null +++ b/http4s-backend/src/main/scalajvm/sttp/client4/http4s/Http4sBackend.scala @@ -0,0 +1,83 @@ +package sttp.client4.http4s + +import cats.effect.{Async, Resource} +import fs2.Stream +import org.http4s.{EntityBody, Request => Http4sRequest} +import org.http4s.blaze.client.BlazeClientBuilder +import org.http4s.client.Client +import org.http4s.ember.client.EmberClientBuilder +import sttp.capabilities.fs2.Fs2Streams +import sttp.client4.StreamBackend +import sttp.client4.compression.CompressionHandlers +import sttp.client4.testing.StreamBackendStub + +class Http4sBackend[F[_]: Async]( + protected val client: Client[F], + protected val customizeRequest: Http4sRequest[F] => Http4sRequest[F], + protected val compressionHandlers: CompressionHandlers[Fs2Streams[F], EntityBody[F]] +) extends Http4sBackendBase[F] + +object Http4sBackend { + + def defaultCompressionHandlers[F[_]: Async]: CompressionHandlers[Fs2Streams[F], Stream[F, Byte]] = + Http4sBackendBase.defaultCompressionHandlers[F] + + def usingClient[F[_]: Async]( + client: Client[F], + customizeRequest: Http4sRequest[F] => Http4sRequest[F] = identity[Http4sRequest[F]] _ + ): StreamBackend[F, Fs2Streams[F]] = + Http4sBackendBase.usingClient(client, customizeRequest, defaultCompressionHandlers[F](_: Async[F])) + + def usingClient[F[_]: Async]( + client: Client[F], + customizeRequest: Http4sRequest[F] => Http4sRequest[F], + compressionHandlers: Async[F] => CompressionHandlers[Fs2Streams[F], EntityBody[F]] + ): StreamBackend[F, Fs2Streams[F]] = + Http4sBackendBase.usingClient(client, customizeRequest, compressionHandlers) + + def usingBlazeClientBuilder[F[_]: Async]( + blazeClientBuilder: BlazeClientBuilder[F], + customizeRequest: Http4sRequest[F] => Http4sRequest[F] = identity[Http4sRequest[F]] _ + ): Resource[F, StreamBackend[F, Fs2Streams[F]]] = + usingBlazeClientBuilder(blazeClientBuilder, customizeRequest, defaultCompressionHandlers[F](_: Async[F])) + + def usingBlazeClientBuilder[F[_]: Async]( + blazeClientBuilder: BlazeClientBuilder[F], + customizeRequest: Http4sRequest[F] => Http4sRequest[F], + compressionHandlers: Async[F] => CompressionHandlers[Fs2Streams[F], EntityBody[F]] + ): Resource[F, StreamBackend[F, Fs2Streams[F]]] = + blazeClientBuilder.resource.map(c => usingClient(c, customizeRequest, compressionHandlers)) + + def usingDefaultBlazeClientBuilder[F[_]: Async]( + customizeRequest: Http4sRequest[F] => Http4sRequest[F] = identity[Http4sRequest[F]] _, + compressionHandlers: Async[F] => CompressionHandlers[Fs2Streams[F], EntityBody[F]] = + defaultCompressionHandlers[F](_: Async[F]) + ): Resource[F, StreamBackend[F, Fs2Streams[F]]] = + usingBlazeClientBuilder(BlazeClientBuilder[F], customizeRequest, compressionHandlers) + + def usingEmberClientBuilder[F[_]: Async]( + emberClientBuilder: EmberClientBuilder[F], + customizeRequest: Http4sRequest[F] => Http4sRequest[F] = identity[Http4sRequest[F]] _ + ): Resource[F, StreamBackend[F, Fs2Streams[F]]] = + usingEmberClientBuilder(emberClientBuilder, customizeRequest, defaultCompressionHandlers[F](_: Async[F])) + + def usingEmberClientBuilder[F[_]: Async]( + emberClientBuilder: EmberClientBuilder[F], + customizeRequest: Http4sRequest[F] => Http4sRequest[F], + compressionHandlers: Async[F] => CompressionHandlers[Fs2Streams[F], EntityBody[F]] + ): Resource[F, StreamBackend[F, Fs2Streams[F]]] = + emberClientBuilder.build.map(c => usingClient(c, customizeRequest, compressionHandlers)) + + def usingDefaultEmberClientBuilder[F[_]: Async]( + customizeRequest: Http4sRequest[F] => Http4sRequest[F] = identity[Http4sRequest[F]] _, + compressionHandlers: Async[F] => CompressionHandlers[Fs2Streams[F], EntityBody[F]] = + defaultCompressionHandlers[F](_: Async[F]) + ): Resource[F, StreamBackend[F, Fs2Streams[F]]] = + usingEmberClientBuilder(EmberClientBuilder.default[F], customizeRequest, compressionHandlers) + + /** Create a stub backend for testing, which uses the `F` response wrapper, and supports `Stream[F, Byte]` streaming. + * + * See [[StreamBackendStub]] for details on how to configure stub responses. + */ + def stub[F[_]: Async]: StreamBackendStub[F, Fs2Streams[F]] = Http4sBackendBase.stub[F] +} diff --git a/http4s-backend/src/main/scalanative/sttp/client4/http4s/Http4sBackend.scala b/http4s-backend/src/main/scalanative/sttp/client4/http4s/Http4sBackend.scala new file mode 100644 index 0000000000..8b06186f97 --- /dev/null +++ b/http4s-backend/src/main/scalanative/sttp/client4/http4s/Http4sBackend.scala @@ -0,0 +1,63 @@ +package sttp.client4.http4s + +import cats.effect.{Async, Resource} +import fs2.Stream +import fs2.io.net.Network +import org.http4s.{EntityBody, Request => Http4sRequest} +import org.http4s.client.Client +import org.http4s.ember.client.EmberClientBuilder +import sttp.capabilities.fs2.Fs2Streams +import sttp.client4.StreamBackend +import sttp.client4.compression.CompressionHandlers +import sttp.client4.testing.StreamBackendStub + +class Http4sBackend[F[_]: Async]( + protected val client: Client[F], + protected val customizeRequest: Http4sRequest[F] => Http4sRequest[F], + protected val compressionHandlers: CompressionHandlers[Fs2Streams[F], EntityBody[F]] +) extends Http4sBackendBase[F] + +object Http4sBackend { + + def defaultCompressionHandlers[F[_]: Async]: CompressionHandlers[Fs2Streams[F], Stream[F, Byte]] = + Http4sBackendBase.defaultCompressionHandlers[F] + + def usingClient[F[_]: Async]( + client: Client[F], + customizeRequest: Http4sRequest[F] => Http4sRequest[F] = identity[Http4sRequest[F]] _ + ): StreamBackend[F, Fs2Streams[F]] = + Http4sBackendBase.usingClient(client, customizeRequest, defaultCompressionHandlers[F](_: Async[F])) + + def usingClient[F[_]: Async]( + client: Client[F], + customizeRequest: Http4sRequest[F] => Http4sRequest[F], + compressionHandlers: Async[F] => CompressionHandlers[Fs2Streams[F], EntityBody[F]] + ): StreamBackend[F, Fs2Streams[F]] = + Http4sBackendBase.usingClient(client, customizeRequest, compressionHandlers) + + def usingEmberClientBuilder[F[_]: Async: Network]( + emberClientBuilder: EmberClientBuilder[F], + customizeRequest: Http4sRequest[F] => Http4sRequest[F] = identity[Http4sRequest[F]] _ + ): Resource[F, StreamBackend[F, Fs2Streams[F]]] = + usingEmberClientBuilder(emberClientBuilder, customizeRequest, defaultCompressionHandlers[F](_: Async[F])) + + def usingEmberClientBuilder[F[_]: Async: Network]( + emberClientBuilder: EmberClientBuilder[F], + customizeRequest: Http4sRequest[F] => Http4sRequest[F], + compressionHandlers: Async[F] => CompressionHandlers[Fs2Streams[F], EntityBody[F]] + ): Resource[F, StreamBackend[F, Fs2Streams[F]]] = + emberClientBuilder.build.map(c => usingClient(c, customizeRequest, compressionHandlers)) + + def usingDefaultEmberClientBuilder[F[_]: Async: Network]( + customizeRequest: Http4sRequest[F] => Http4sRequest[F] = identity[Http4sRequest[F]] _, + compressionHandlers: Async[F] => CompressionHandlers[Fs2Streams[F], EntityBody[F]] = + defaultCompressionHandlers[F](_: Async[F]) + ): Resource[F, StreamBackend[F, Fs2Streams[F]]] = + usingEmberClientBuilder(EmberClientBuilder.default[F], customizeRequest, compressionHandlers) + + /** Create a stub backend for testing, which uses the `F` response wrapper, and supports `Stream[F, Byte]` streaming. + * + * See [[StreamBackendStub]] for details on how to configure stub responses. + */ + def stub[F[_]: Async]: StreamBackendStub[F, Fs2Streams[F]] = Http4sBackendBase.stub[F] +} diff --git a/http4s-backend/src/test/scala/sttp/client4/http4s/Http4sHttpStreamingTest.scala b/http4s-backend/src/test/scalajvm/sttp/client4/http4s/Http4sHttpStreamingTest.scala similarity index 100% rename from http4s-backend/src/test/scala/sttp/client4/http4s/Http4sHttpStreamingTest.scala rename to http4s-backend/src/test/scalajvm/sttp/client4/http4s/Http4sHttpStreamingTest.scala diff --git a/http4s-backend/src/test/scala/sttp/client4/http4s/Http4sHttpTest.scala b/http4s-backend/src/test/scalajvm/sttp/client4/http4s/Http4sHttpTest.scala similarity index 100% rename from http4s-backend/src/test/scala/sttp/client4/http4s/Http4sHttpTest.scala rename to http4s-backend/src/test/scalajvm/sttp/client4/http4s/Http4sHttpTest.scala diff --git a/http4s-backend/src/test/scalanative/sttp/client4/http4s/Http4sNativeHttpStreamingTest.scala b/http4s-backend/src/test/scalanative/sttp/client4/http4s/Http4sNativeHttpStreamingTest.scala new file mode 100644 index 0000000000..7d6f9547ce --- /dev/null +++ b/http4s-backend/src/test/scalanative/sttp/client4/http4s/Http4sNativeHttpStreamingTest.scala @@ -0,0 +1,20 @@ +package sttp.client4.http4s + +import cats.effect.IO +import sttp.client4.StreamBackend +import sttp.client4.impl.fs2.Fs2StreamingTest +import sttp.capabilities.fs2.Fs2Streams + +class Http4sNativeHttpStreamingTest extends Fs2StreamingTest { + + override val backend: StreamBackend[IO, Fs2Streams[IO]] = { + try { + Http4sBackend.usingDefaultEmberClientBuilder[IO]().allocated.unsafeRunSync()._1 + } catch { + case e: Throwable => + Console.err.println(s"[Http4sNativeHttpStreamingTest] Failed to create backend: $e ${e.getMessage()} ${e.getCause()}") + e.printStackTrace(Console.err) + throw e + } + } +} diff --git a/http4s-backend/src/test/scalanative/sttp/client4/http4s/Http4sNativeHttpTest.scala b/http4s-backend/src/test/scalanative/sttp/client4/http4s/Http4sNativeHttpTest.scala new file mode 100644 index 0000000000..2f0f6f7c80 --- /dev/null +++ b/http4s-backend/src/test/scalanative/sttp/client4/http4s/Http4sNativeHttpTest.scala @@ -0,0 +1,23 @@ +package sttp.client4.http4s + +import cats.effect.IO +import sttp.client4.Backend +import sttp.client4.impl.cats.{CatsRetryTest, CatsTestBase} +import sttp.client4.testing.HttpTest + +class Http4sNativeHttpTest extends HttpTest[IO] with CatsRetryTest with CatsTestBase { + + override val backend: Backend[IO] = { + try { + Http4sBackend.usingDefaultEmberClientBuilder[IO]().allocated.unsafeRunSync()._1 + } catch { + case e: Throwable => + Console.err.println("[Http4sNativeHttpTest] Failed to create backend:") + e.printStackTrace(Console.err) + throw e + } + } + + override protected def supportsRequestTimeout = false + override protected def supportsCustomMultipartContentType = false +}