diff --git a/.gitignore b/.gitignore index cd6596b34..161dbd515 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,5 @@ conf/application.conf sbt-launch.jar .vscode +.claude +.site diff --git a/CHANGELOG.md b/CHANGELOG.md index 077c8d118..5536b9709 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ # Change Log +## Unreleased + + - [DL-5871] fix: run analyzer/responder jobs on dedicated thread pools to keep the HTTP API responsive under heavy job load + + **Upgrade note:** the `analyzer` and `responder` thread pools now use a `thread-pool-executor` + (`fixed-pool-size`) instead of a `fork-join-executor`. Any custom + `analyzer.fork-join-executor` / `responder.fork-join-executor` tuning in `application.conf` is + no longer applied — switch to `analyzer.thread-pool-executor.fixed-pool-size` + (see `conf/application.sample`). + ## 3.2.0 (2025-06-02) - [DL-1231] Add support of Kubernetes diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 000000000..49e8aa5ce --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,84 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Stack + +- **Backend:** Scala 2.13.17, Play Framework 3.0.x, Pekko 1.2 (cluster + typed), Guice DI (via `scala-guice`). +- **Build:** sbt 1.11.7 (use the wrapper `./sbt`). JDK 11 (Amazon Corretto in CI). +- **Search/storage:** ElasticSearch 8.19.x via `elastic4s` 8.19. No relational DB. Default HTTP port: `9001`. +- **Frontend:** AngularJS 1.7 + Bootstrap 3, bundled with webpack 3, lives in `www/`. Node 18.16 in CI. Requires `npm install --legacy-peer-deps (--ignore-scripts for macos zsh)` (pinned legacy deps). +- **Job execution:** runs analyzers/responders as Docker containers, Kubernetes pods, or local subprocesses. + +## Commands + +### Build / run +- `./sbt run` — start the Cortex Play app (port 9001). First run also builds the front-end via the `FrontEnd` sbt plugin (`npm install --legacy-peer-deps && npm run build` inside `www/`). +- `./sbt compile` — backend only. +- `./sbt clean stage` — produces `target/universal/stage` runnable layout. +- `./sbt Universal/packageBin` — full distributable zip (this is what CI runs alongside tests). +- `./sbt Debian/packageBin Rpm/packageBin Docker/publishLocal` — OS packages and local Docker image. `DockerSettings` produces two images: `cortex` (slim) and `cortexWithDeps` (the `target/docker-withdeps` virtual project, used when you want bundled deps; see `build.sbt`). +- Opt-in sbt plugins: `./sbt -Dplugins=sbom,depcheck …` enables `sbt-sbom` and `sbt-dependency-check` (off by default — see `project/plugins.sbt`). + +### Frontend (standalone) +- `cd www && npm install --legacy-peer-deps (--ignore-scripts for macos zsh)` +- `npm run dev` — webpack-dev-server with hot reload. +- `npm run build` — production bundle into `www/dist`, which `FrontEnd.scala` then packages into the Play assets. + +### Tests +- `./sbt test` — runs the whole suite. Tests are **forked** and **non-parallel** (`Test / fork := true`, `Test / parallelExecution := false` in `project/Common.scala`). +- Single spec: `./sbt "testOnly org.thp.cortex.services.JobRunnerSrvSpec"` (Specs2 with `@RunWith(classOf[JUnitRunner])`). +- Specs2 example filter: `./sbt "testOnly *JobRunnerSrvSpec -- only \"return the original image when prefix is empty\""`. + +### Formatting +- `./sbt scalafmtAll` (config in `.scalafmt.conf`, maxColumn 150, sorts imports/modifiers, rewrites unicode arrows). CI does not auto-format; run before committing. + +## Architecture + +### Multi-project sbt layout +`build.sbt` defines three projects: +- **`cortex`** (root, `app/`) — the Play app. Enables `PlayScala` + packaging plugins. +- **`elastic4play`** (subdir `elastic4play/`) — an in-tree library wrapping ElasticSearch as a Play-friendly data layer: `ModelDef`/`EntityDef`/`AttributeDef` DSL, `CreateSrv`/`UpdateSrv`/`FindSrv`/`AttachmentSrv`, `MigrationCtrl`, and `auth/` provider scaffolding. Cortex `dependsOn(elastic4play)`. Changes to ES models or query plumbing usually live here, not in `app/`. +- **`cortexWithDeps`** — virtual project at `target/docker-withdeps` purely to produce the `cortex-withdeps` Docker tag from the main project's mappings. + +### Backend package layout (`app/org/thp/cortex`) +- `Module.scala` — Guice bindings; registered via `play.modules.enabled += org.thp.cortex.Module` in `conf/reference.conf`. +- `controllers/` — Play actions, one file per resource (Analyzer, Responder, Job, User, Organization, Stream, Misp, Auth, …). Routes wired in `conf/routes`. +- `models/` — ES-backed entities built on the elastic4play attribute DSL (`Job`, `Worker`, `Organization`, `User`, `Report`, `Artifact`, `Audit`, `WorkerConfig`, `WorkerDefinition`). +- `services/` — business logic. Most controllers are thin wrappers around a `*Srv`. +- `services/mappers/` — group/role mappers used by external auth (LDAP/AD/OAuth2 → org+role mapping). + +### Worker model (analyzers + responders) +- A **Worker** is the running instance of a **WorkerDefinition** (analyzer or responder catalog entry, loaded from URLs in `analyzer.urls` / `responder.urls`). `WorkerSrv` loads definitions on startup and on demand, with `worker.updateDockerImage = true` triggering image refreshes when the catalog changes. +- A **Job** is one execution of a Worker against an artifact; it produces a **Report** and possibly child **Artifacts** (extracted IoCs). See `models/Job.scala` for the `JobStatus` enum (`Waiting`, `InProgress`, `Success`, `Failure`, `Deleted`) and the attribute schema. +- **Caching:** identical `(worker, data)` jobs within `cache.job` (default 10 min) reuse the previous report — the cache key is `cacheTag` on the Job. + +### Job runner selection (`JobRunnerSrv`) +At startup, `job.runners` in config (default `[kubernetes, docker, process]`) is filtered down to actually-available runners: +- `kubernetes` requires the fabric8 client to detect a cluster (`K8sJobRunnerSrv.isAvailable`). +- `docker` requires a reachable Docker daemon (`DockerJobRunnerSrv.isAvailable`). +- `process` requires `cortexutils` Python package ≥ 2.0 to be installed (probed for `python`, `python2`, `python3`). + +Runners are tried **in the configured order** for each job — first one able to run the worker wins. When editing runner logic, keep in mind the docker image name can be rewritten through `docker.imageRegistryPrefix` (see `JobRunnerSrv.applyImagePrefix` and its spec). + +### Auth +`auth.provider` is a **list** evaluated in order (`local`, `ad`, `ldap`, `oauth2`, `key`). `CortexAuthSrv` composes them; multi-valued is the supported way to migrate users between providers. API-key auth (`KeyAuthSrv`) is always available alongside whatever interactive providers are configured. + +### Configuration +- `conf/reference.conf` ships defaults; operators override via `conf/application.conf` (template: `conf/application.sample`). +- Job runner, cache TTLs, ES connection, auth providers and per-provider config, and analyzer/responder catalogs all live in HOCON. + +## Conventions specific to this codebase + +- **Models are not plain case classes** — they extend `ModelDef[…]` + a `*Attributes` trait from elastic4play. Adding a field means editing both the trait and (often) a migration. Look at `models/Job.scala` for the canonical pattern. +- **No DB migrations file** — schema lives in code; the `MigrationCtrl` endpoint (`POST /api/maintenance/migrate`) runs version-aware migrations registered through elastic4play. +- **`organization` is the tenant boundary.** Almost every model carries an `organization` attribute and `*Srv` queries scope by it via `AuthContext`. Don't add cross-organization queries without explicit ACL handling. +- **Routes file is the source of truth** for the public API surface (`conf/routes`) — there's no annotation-based routing. +- **The front-end is legacy AngularJS 1.x** and is *not* under active framework upgrades; keep changes minimal and idiomatic to the existing module structure (`www/src/app/{components,pages,core}`). + +## Repository docs / refs + +- `README.md` — high-level product description and links to external docs. +- `CHANGELOG.md` — release-by-release feature/fix list (DL-xxxx ticket prefixes match the team's Jira). +- External docs site: (generated from `docs/`, built via `.github/workflows/build.docs.yaml`). +- This repo is mirrored to the public OSS repo at . The release process is **manual** — pushes to the mirror and version bumps are not automated. \ No newline at end of file diff --git a/app/org/thp/cortex/controllers/AnalyzerConfigCtrl.scala b/app/org/thp/cortex/controllers/AnalyzerConfigCtrl.scala index e948f1af2..5fef4452d 100644 --- a/app/org/thp/cortex/controllers/AnalyzerConfigCtrl.scala +++ b/app/org/thp/cortex/controllers/AnalyzerConfigCtrl.scala @@ -1,21 +1,20 @@ package org.thp.cortex.controllers -import javax.inject.{Inject, Singleton} -import scala.concurrent.{ExecutionContext, Future} - +import org.elastic4play.BadRequestError +import org.elastic4play.controllers.{Authenticated, Fields, FieldsBodyParser, Renderer} +import org.thp.cortex.models.{BaseConfig, Roles} +import org.thp.cortex.services.AnalyzerConfigSrv +import play.api.Logger import play.api.libs.json.JsObject import play.api.mvc.{AbstractController, Action, AnyContent, ControllerComponents} -import org.thp.cortex.models.{BaseConfig, Roles} -import org.thp.cortex.services.{AnalyzerConfigSrv, UserSrv} - -import org.elastic4play.BadRequestError -import org.elastic4play.controllers.{Authenticated, Fields, FieldsBodyParser, Renderer} +import javax.inject.{Inject, Singleton} +import scala.concurrent.{ExecutionContext, Future} +import scala.util.chaining.scalaUtilChainingOps @Singleton class AnalyzerConfigCtrl @Inject() ( analyzerConfigSrv: AnalyzerConfigSrv, - userSrv: UserSrv, authenticated: Authenticated, fieldsBodyParser: FieldsBodyParser, renderer: Renderer, @@ -23,6 +22,8 @@ class AnalyzerConfigCtrl @Inject() ( implicit val ec: ExecutionContext ) extends AbstractController(components) { + private lazy val logger: Logger = Logger(getClass.getName) + def get(analyzerConfigName: String): Action[AnyContent] = authenticated(Roles.orgAdmin).async { request => analyzerConfigSrv .getForUser(request.userId, analyzerConfigName) @@ -50,6 +51,7 @@ class AnalyzerConfigCtrl @Inject() ( analyzerConfigSrv .updateOrCreate(request.userId, analyzerConfigName, config) .map(renderer.toOutput(OK, _)) + .tap(_ => logger.info(s"Analyzer $analyzerConfigName updated with $config by user id ${request.userId}")) case None => Future.failed(BadRequestError("attribute config has invalid format")) } } diff --git a/app/org/thp/cortex/controllers/AssetCtrl.scala b/app/org/thp/cortex/controllers/AssetCtrl.scala index 149eef1c4..4e617999d 100644 --- a/app/org/thp/cortex/controllers/AssetCtrl.scala +++ b/app/org/thp/cortex/controllers/AssetCtrl.scala @@ -13,7 +13,9 @@ trait AssetCtrl { } @Singleton -class AssetCtrlProd @Inject() (errorHandler: HttpErrorHandler, meta: AssetsMetadata, env: Environment) extends Assets(errorHandler, meta, env) with AssetCtrl { +class AssetCtrlProd @Inject() (errorHandler: HttpErrorHandler, meta: AssetsMetadata, env: Environment) + extends Assets(errorHandler, meta, env) + with AssetCtrl { def get(file: String): Action[AnyContent] = at("/www", file) } diff --git a/app/org/thp/cortex/controllers/OrganizationCtrl.scala b/app/org/thp/cortex/controllers/OrganizationCtrl.scala index 29d721151..308623d8a 100644 --- a/app/org/thp/cortex/controllers/OrganizationCtrl.scala +++ b/app/org/thp/cortex/controllers/OrganizationCtrl.scala @@ -1,26 +1,22 @@ package org.thp.cortex.controllers -import javax.inject.{Inject, Singleton} - -import scala.concurrent.{ExecutionContext, Future} - -import play.api.Logger -import play.api.http.Status -import play.api.mvc._ - -import org.thp.cortex.models.Roles -import org.thp.cortex.services.{OrganizationSrv, UserSrv} - -import org.elastic4play.{BadRequestError, NotFoundError} import org.elastic4play.controllers.{Authenticated, Fields, FieldsBodyParser, Renderer} import org.elastic4play.models.JsonFormat.baseModelEntityWrites import org.elastic4play.services.JsonFormat.{aggReads, queryReads} import org.elastic4play.services.{UserSrv => _, _} +import org.elastic4play.{BadRequestError, NotFoundError} +import org.thp.cortex.models.Roles +import org.thp.cortex.services.{OrganizationSrv, UserSrv} +import play.api.Logger +import play.api.http.Status +import play.api.mvc._ + +import javax.inject.{Inject, Singleton} +import scala.concurrent.{ExecutionContext, Future} @Singleton class OrganizationCtrl @Inject() ( organizationSrv: OrganizationSrv, - authSrv: AuthSrv, auxSrv: AuxSrv, userSrv: UserSrv, authenticated: Authenticated, @@ -36,7 +32,10 @@ class OrganizationCtrl @Inject() ( def create: Action[Fields] = authenticated(Roles.superAdmin).async(fieldsBodyParser) { implicit request => organizationSrv .create(request.body) - .map(organization => renderer.toOutput(CREATED, organization)) + .map { organization => + logger.info(s"Organization ${organization.id} created by user ${request.userId}") + renderer.toOutput(CREATED, organization) + } } def get(organizationId: String): Action[Fields] = authenticated(Roles.superAdmin, Roles.orgAdmin).async(fieldsBodyParser) { implicit request => @@ -55,9 +54,12 @@ class OrganizationCtrl @Inject() ( if (organizationId == "cortex") Future.failed(BadRequestError("Cortex organization can't be updated")) else - organizationSrv.update(organizationId, request.body).map { organization => - renderer.toOutput(OK, organization) - } + organizationSrv + .update(organizationId, request.body) + .map { organization => + logger.info(s"Organization ${organization.id} updated by user ${request.userId}") + renderer.toOutput(OK, organization) + } } def delete(organizationId: String): Action[AnyContent] = authenticated(Roles.superAdmin).async { implicit request => @@ -66,7 +68,10 @@ class OrganizationCtrl @Inject() ( else organizationSrv .delete(organizationId) - .map(_ => NoContent) + .map { organization => + logger.info(s"Organization ${organization.id} deleted by user ${request.userId}") + NoContent + } } def find: Action[Fields] = authenticated(Roles.superAdmin).async(fieldsBodyParser) { implicit request => diff --git a/app/org/thp/cortex/controllers/ResponderConfigCtrl.scala b/app/org/thp/cortex/controllers/ResponderConfigCtrl.scala index a4f232ba0..bab165091 100644 --- a/app/org/thp/cortex/controllers/ResponderConfigCtrl.scala +++ b/app/org/thp/cortex/controllers/ResponderConfigCtrl.scala @@ -1,21 +1,20 @@ package org.thp.cortex.controllers -import scala.concurrent.{ExecutionContext, Future} - +import org.elastic4play.BadRequestError +import org.elastic4play.controllers.{Authenticated, Fields, FieldsBodyParser, Renderer} +import org.thp.cortex.models.{BaseConfig, Roles} +import org.thp.cortex.services.ResponderConfigSrv +import play.api.Logger import play.api.libs.json.JsObject import play.api.mvc.{AbstractController, Action, AnyContent, ControllerComponents} import javax.inject.{Inject, Singleton} -import org.thp.cortex.models.{BaseConfig, Roles} -import org.thp.cortex.services.{ResponderConfigSrv, UserSrv} - -import org.elastic4play.BadRequestError -import org.elastic4play.controllers.{Authenticated, Fields, FieldsBodyParser, Renderer} +import scala.concurrent.{ExecutionContext, Future} +import scala.util.chaining.scalaUtilChainingOps @Singleton class ResponderConfigCtrl @Inject() ( responderConfigSrv: ResponderConfigSrv, - userSrv: UserSrv, authenticated: Authenticated, fieldsBodyParser: FieldsBodyParser, renderer: Renderer, @@ -23,9 +22,11 @@ class ResponderConfigCtrl @Inject() ( implicit val ec: ExecutionContext ) extends AbstractController(components) { - def get(analyzerConfigName: String): Action[AnyContent] = authenticated(Roles.orgAdmin).async { request => + private lazy val logger: Logger = Logger(getClass.getName) + + def get(responderConfigName: String): Action[AnyContent] = authenticated(Roles.orgAdmin).async { request => responderConfigSrv - .getForUser(request.userId, analyzerConfigName) + .getForUser(request.userId, responderConfigName) .map(renderer.toOutput(OK, _)) } @@ -44,12 +45,13 @@ class ResponderConfigCtrl @Inject() ( } } - def update(analyzerConfigName: String): Action[Fields] = authenticated(Roles.orgAdmin).async(fieldsBodyParser) { implicit request => + def update(responderConfigName: String): Action[Fields] = authenticated(Roles.orgAdmin).async(fieldsBodyParser) { implicit request => request.body.getValue("config").flatMap(_.asOpt[JsObject]) match { case Some(config) => responderConfigSrv - .updateOrCreate(request.userId, analyzerConfigName, config) + .updateOrCreate(request.userId, responderConfigName, config) .map(renderer.toOutput(OK, _)) + .tap(_ => logger.info(s"Responder $responderConfigName updated with $config by user id ${request.userId}")) case None => Future.failed(BadRequestError("attribute config has invalid format")) } } diff --git a/app/org/thp/cortex/controllers/StreamCtrl.scala b/app/org/thp/cortex/controllers/StreamCtrl.scala index 33e292556..fab34fbf1 100644 --- a/app/org/thp/cortex/controllers/StreamCtrl.scala +++ b/app/org/thp/cortex/controllers/StreamCtrl.scala @@ -1,27 +1,24 @@ package org.thp.cortex.controllers -import javax.inject.{Inject, Singleton} - -import scala.collection.immutable -import scala.concurrent.{ExecutionContext, Future} -import scala.concurrent.duration.{DurationLong, FiniteDuration} -import scala.util.Random - -import play.api.{Configuration, Logger} -import play.api.http.Status -import play.api.libs.json.Json -import play.api.mvc.{AbstractController, Action, AnyContent, ControllerComponents} - import org.apache.pekko.actor.{ActorSystem, Props} -import org.apache.pekko.util.Timeout import org.apache.pekko.pattern.ask +import org.apache.pekko.util.Timeout +import org.elastic4play.Timed +import org.elastic4play.controllers._ +import org.elastic4play.services.{AuxSrv, EventSrv, MigrationSrv} import org.thp.cortex.models.Roles -import org.thp.cortex.services.StreamActor import org.thp.cortex.services.StreamActor.StreamMessages +import org.thp.cortex.services.{StreamActor, UserSrv} +import play.api.http.Status +import play.api.libs.json.Json +import play.api.mvc.{AbstractController, Action, AnyContent, ControllerComponents} +import play.api.{Configuration, Logger} -import org.elastic4play.Timed -import org.elastic4play.controllers._ -import org.elastic4play.services.{AuxSrv, EventSrv, MigrationSrv, UserSrv} +import java.security.SecureRandom +import javax.inject.{Inject, Singleton} +import scala.collection.immutable +import scala.concurrent.duration.{DurationLong, FiniteDuration} +import scala.concurrent.{ExecutionContext, Future} @Singleton class StreamCtrl( @@ -70,17 +67,32 @@ class StreamCtrl( ) private[StreamCtrl] lazy val logger = Logger(getClass) + // The bootstrap user has no User document in ES yet (initial setup / migration), so there is no + // organization to look up. Such a stream is left unbound (None) instead of being tied to an org. + private val initialUserId = "init" + + private def organizationId(userId: String): Future[Option[String]] = + if (userId == initialUserId) Future.successful(None) + else userSrv.getOrganizationId(userId).map(Some(_)) + /** Create a new stream entry with the event head */ @Timed("controllers.StreamCtrl.create") - def create: Action[AnyContent] = authenticated(Roles.read) { - val id = generateStreamId() - system.actorOf(Props(classOf[StreamActor], cacheExpiration, refresh, nextItemMaxWait, globalMaxWait, eventSrv, auxSrv), s"stream-$id") - Ok(id) + def create: Action[AnyContent] = authenticated(Roles.read).async { request => + // the stream is bound to the creator's organization + organizationId(request.userId).map { organizationId => + val id = generateStreamId() + system.actorOf( + Props(classOf[StreamActor], cacheExpiration, refresh, nextItemMaxWait, globalMaxWait, eventSrv, auxSrv, userSrv, organizationId), + s"stream-$id" + ) + Ok(id) + } } val alphanumeric: immutable.IndexedSeq[Char] = ('a' to 'z') ++ ('A' to 'Z') ++ ('0' to '9') - private[controllers] def generateStreamId() = Seq.fill(10)(alphanumeric(Random.nextInt(alphanumeric.size))).mkString + private val random = new SecureRandom() + private[controllers] def generateStreamId() = Seq.fill(10)(alphanumeric(random.nextInt(alphanumeric.size))).mkString private[controllers] def isValidStreamId(streamId: String): Boolean = streamId.length == 10 && streamId.forall(alphanumeric.contains) @@ -94,18 +106,28 @@ class StreamCtrl( if (!isValidStreamId(id)) { Future.successful(BadRequest("Invalid stream id")) } else { - val futureStatus = authenticated.expirationStatus(request) match { - case ExpirationError if !migrationSrv.isMigrating => - userSrv.getInitialUser(request).recoverWith { case _ => authenticated.getFromApiKey(request) }.map(_ => OK) - case _: ExpirationWarning => Future.successful(220) - case _ => Future.successful(OK) - } + val futureOrganizationAndStatus: Future[(Option[String], Status)] = + if (migrationSrv.isMigrating) + Future.successful((None, Ok)) + else + authenticated.expirationStatus(request) match { + case ExpirationError => + userSrv + .getInitialUser(request) + .recoverWith { case _ => authenticated.getFromApiKey(request) } + .flatMap(authContext => organizationId(authContext.userId).map(_ -> Ok)) + case _: ExpirationWarning => + authenticated.getContext(request).flatMap(authContext => organizationId(authContext.userId).map(_ -> new Status(220))) + case _ => + authenticated.getContext(request).flatMap(authContext => organizationId(authContext.userId).map(_ -> Ok)) + } - futureStatus.flatMap { status => - (system.actorSelection(s"/user/stream-$id") ? StreamActor.GetOperations) map { - case StreamMessages(operations) => renderer.toOutput(status, operations) - case m => InternalServerError(s"Unexpected message : $m (${m.getClass})") - } + futureOrganizationAndStatus.flatMap { + case (organizationId, status) => + (system.actorSelection(s"/user/stream-$id") ? StreamActor.GetOperations(organizationId)) map { + case StreamMessages(operations) => renderer.toOutput(status.header.status, operations) + case m => InternalServerError(s"Unexpected message : $m (${m.getClass})") + } } } } @@ -114,7 +136,7 @@ class StreamCtrl( def status: Action[AnyContent] = Action { implicit request => val status = authenticated.expirationStatus(request) match { case ExpirationWarning(duration) => Json.obj("remaining" -> duration.toSeconds, "warning" -> true) - case ExpirationError => Json.obj("remaining" -> 0, "warning" -> true) + case ExpirationError => Json.obj("remaining" -> 0, "warning" -> true) case ExpirationOk(duration) => Json.obj("remaining" -> duration.toSeconds, "warning" -> false) } Ok(status) diff --git a/app/org/thp/cortex/controllers/UserCtrl.scala b/app/org/thp/cortex/controllers/UserCtrl.scala index a8ff26def..0e102d9d5 100644 --- a/app/org/thp/cortex/controllers/UserCtrl.scala +++ b/app/org/thp/cortex/controllers/UserCtrl.scala @@ -1,23 +1,20 @@ package org.thp.cortex.controllers -import javax.inject.{Inject, Singleton} - -import scala.concurrent.{ExecutionContext, Future} -import scala.util.Try - +import org.elastic4play._ +import org.elastic4play.controllers.{Authenticated, Fields, FieldsBodyParser, Renderer} +import org.elastic4play.models.JsonFormat.baseModelEntityWrites +import org.elastic4play.services.JsonFormat.queryReads +import org.elastic4play.services.{AuthContext, AuthSrv, QueryDSL, QueryDef} +import org.thp.cortex.models.{OrganizationStatus, Roles} +import org.thp.cortex.services.{OrganizationSrv, UserSrv} import play.api.Logger import play.api.http.Status import play.api.libs.json.{JsObject, Json} import play.api.mvc._ -import org.thp.cortex.models.{OrganizationStatus, Roles} -import org.thp.cortex.services.{OrganizationSrv, UserSrv} - -import org.elastic4play.models.JsonFormat.baseModelEntityWrites -import org.elastic4play.services.JsonFormat.queryReads -import org.elastic4play.controllers.{Authenticated, Fields, FieldsBodyParser, Renderer} -import org.elastic4play.services.{AuthContext, AuthSrv, QueryDSL, QueryDef} -import org.elastic4play._ +import javax.inject.{Inject, Singleton} +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Try @Singleton class UserCtrl @Inject() ( @@ -32,7 +29,7 @@ class UserCtrl @Inject() ( ) extends AbstractController(components) with Status { - private[UserCtrl] lazy val logger = Logger(getClass) + private[UserCtrl] val logger = Logger(getClass) @Timed def create: Action[Fields] = authenticated(Roles.orgAdmin, Roles.superAdmin).async(fieldsBodyParser) { implicit request => @@ -46,6 +43,7 @@ class UserCtrl @Inject() ( (userOrganizationId == organizationId && !request.body.getStrings("roles").getOrElse(Nil).contains(Roles.superAdmin.name))) user <- userSrv.create(request.body.set("organization", organizationId)) + _ = logger.info(s"User ${user.id} created by user ${request.userId}") } yield renderer.toOutput(CREATED, user)) .recoverWith { case _: NoSuchElementException => Future.failed(AuthorizationError("You are not authorized to perform this action")) @@ -123,6 +121,7 @@ class UserCtrl @Inject() ( else Future.failed(AuthorizationError("You are not permitted to change user settings")) _ <- authChecks user <- userSrv.update(userId, request.body) + _ = logger.info(s"User ${user.id} updated by user ${request.userId}") } yield renderer.toOutput(OK, user) } @@ -168,6 +167,7 @@ class UserCtrl @Inject() ( else Future.failed(NotFoundError(s"user $userId not found")) _ <- if (userId != request.userId) Future.successful(()) else Future.failed(BadRequestError(s"You cannot disable your own account")) _ <- userSrv.delete(userId) + _ = logger.info(s"User $userId deleted by user ${request.userId}") } yield NoContent } @@ -249,7 +249,7 @@ class UserCtrl @Inject() ( @Timed def setKey(userId: String): Action[Fields] = authenticated(Roles.orgAdmin, Roles.superAdmin).async(fieldsBodyParser) { implicit request => for { - _ <- checkUserOrganization(userId) + _ <- checkUserOrganization(userId) keyInput <- request.body.getString("key").fold(Future.failed[String](MissingAttributeError("key")))(Future.successful) key <- authSrv.setKey(userId, keyInput) } yield Ok(key) diff --git a/app/org/thp/cortex/models/BaseConfig.scala b/app/org/thp/cortex/models/BaseConfig.scala index 119c6a589..527675e76 100644 --- a/app/org/thp/cortex/models/BaseConfig.scala +++ b/app/org/thp/cortex/models/BaseConfig.scala @@ -6,7 +6,8 @@ import play.api.Configuration import play.api.libs.json._ case class BaseConfig(name: String, workerNames: Seq[String], items: Seq[ConfigurationDefinitionItem], config: Option[WorkerConfig]) { - def +(other: BaseConfig): BaseConfig = BaseConfig(name, workerNames ++ other.workerNames, (items ++ other.items).distinctBy(_.name), config.orElse(other.config)) + def +(other: BaseConfig): BaseConfig = + BaseConfig(name, workerNames ++ other.workerNames, (items ++ other.items).distinctBy(_.name), config.orElse(other.config)) } object BaseConfig { diff --git a/app/org/thp/cortex/models/Job.scala b/app/org/thp/cortex/models/Job.scala index 90e8f1672..b86d8fc16 100644 --- a/app/org/thp/cortex/models/Job.scala +++ b/app/org/thp/cortex/models/Job.scala @@ -11,7 +11,7 @@ import org.elastic4play.models.{AttributeDef, EntityDef, HiveEnumeration, ModelD object JobStatus extends Enumeration with HiveEnumeration { type Type = Value val Waiting, InProgress, Success, Failure, Deleted = Value - implicit val reads: Format[Value] = enumFormat(this) + implicit val reads: Format[Value] = enumFormat(this) } trait JobAttributes { diff --git a/app/org/thp/cortex/models/Organization.scala b/app/org/thp/cortex/models/Organization.scala index 6c2d4c139..b22ab4ff5 100644 --- a/app/org/thp/cortex/models/Organization.scala +++ b/app/org/thp/cortex/models/Organization.scala @@ -10,7 +10,7 @@ import org.elastic4play.services.FindSrv object OrganizationStatus extends Enumeration with HiveEnumeration { type Type = Value - val Active, Locked = Value + val Active, Locked = Value implicit val reads: Format[Value] = enumFormat(this) } diff --git a/app/org/thp/cortex/models/User.scala b/app/org/thp/cortex/models/User.scala index 1c298b2c9..6b2319d50 100644 --- a/app/org/thp/cortex/models/User.scala +++ b/app/org/thp/cortex/models/User.scala @@ -8,7 +8,7 @@ import org.elastic4play.services.{User => EUser} object UserStatus extends Enumeration with HiveEnumeration { type Type = Value - val Ok, Locked = Value + val Ok, Locked = Value implicit val reads: Format[Value] = enumFormat(this) } diff --git a/app/org/thp/cortex/models/Worker.scala b/app/org/thp/cortex/models/Worker.scala index e99e11cee..849cfefe6 100644 --- a/app/org/thp/cortex/models/Worker.scala +++ b/app/org/thp/cortex/models/Worker.scala @@ -12,11 +12,11 @@ import scala.util.Try object RateUnit extends Enumeration with HiveEnumeration { type Type = Value - val Second = Value(1) - val Minute = Value(60) - val Hour = Value(60 * 60) - val Day = Value(60 * 60 * 24) - val Month = Value(60 * 60 * 24 * 30) + val Second = Value(1) + val Minute = Value(60) + val Hour = Value(60 * 60) + val Day = Value(60 * 60 * 24) + val Month = Value(60 * 60 * 24 * 30) implicit val reads: Format[Value] = enumFormat(this) } diff --git a/app/org/thp/cortex/services/AuditActor.scala b/app/org/thp/cortex/services/AuditActor.scala index ac812741e..0f66cadec 100644 --- a/app/org/thp/cortex/services/AuditActor.scala +++ b/app/org/thp/cortex/services/AuditActor.scala @@ -23,7 +23,7 @@ class AuditActor @Inject() (eventSrv: EventSrv, implicit val ec: ExecutionContex private object EntityExtractor { def unapply(e: BaseEntity): Option[(BaseModelDef, String, String)] = Some((e.model, e.id, e.routing)) } - private var registration = Map.empty[String, Seq[ActorRef]] + private var registration = Map.empty[String, Seq[ActorRef]] private[AuditActor] lazy val logger = Logger(getClass) override def preStart(): Unit = { diff --git a/app/org/thp/cortex/services/ErrorHandler.scala b/app/org/thp/cortex/services/ErrorHandler.scala index 5fb22295b..8c1e4583c 100644 --- a/app/org/thp/cortex/services/ErrorHandler.scala +++ b/app/org/thp/cortex/services/ErrorHandler.scala @@ -30,11 +30,11 @@ class ErrorHandler extends HttpErrorHandler { def toErrorResult(ex: Throwable): (Int, JsValue) = ex match { case AuthenticationError(message) => Status.UNAUTHORIZED -> Json.obj("type" -> "AuthenticationError", "message" -> message) - case AuthorizationError(message) => Status.FORBIDDEN -> Json.obj("type" -> "AuthorizationError", "message" -> message) + case AuthorizationError(message) => Status.FORBIDDEN -> Json.obj("type" -> "AuthorizationError", "message" -> message) case UpdateError(_, message, attributes) => Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> "UpdateError", "message" -> message, "object" -> attributes) case rle: RateLimitExceeded => Status.TOO_MANY_REQUESTS -> Json.obj("type" -> "RateLimitExceeded", "message" -> rle.getMessage) - case InternalError(message) => Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> "InternalError", "message" -> message) + case InternalError(message) => Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> "InternalError", "message" -> message) case nfe: NumberFormatException => Status.BAD_REQUEST -> Json.obj("type" -> "NumberFormatException", "message" -> ("Invalid format " + nfe.getMessage)) case NotFoundError(message) => Status.NOT_FOUND -> Json.obj("type" -> "NotFoundError", "message" -> message) @@ -46,7 +46,7 @@ class ErrorHandler extends HttpErrorHandler { Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> "NoNodeAvailable", "message" -> "ElasticSearch cluster is unreachable") case CreateError(_, message, attributes) => Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> "CreateError", "message" -> message, "object" -> attributes) - case ErrorWithObject(tpe, message, obj) => Status.BAD_REQUEST -> Json.obj("type" -> tpe, "message" -> message, "object" -> obj) + case ErrorWithObject(tpe, message, obj) => Status.BAD_REQUEST -> Json.obj("type" -> tpe, "message" -> message, "object" -> obj) case GetError(message) => Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> "GetError", "message" -> message) case MultiError(message, exceptions) => val suberrors = exceptions.map(e => toErrorResult(e)) diff --git a/app/org/thp/cortex/services/JobRunnerSrv.scala b/app/org/thp/cortex/services/JobRunnerSrv.scala index f572a2590..088ce2258 100644 --- a/app/org/thp/cortex/services/JobRunnerSrv.scala +++ b/app/org/thp/cortex/services/JobRunnerSrv.scala @@ -1,12 +1,11 @@ package org.thp.cortex.services -import org.apache.pekko.actor.ActorSystem import org.apache.pekko.stream.Materializer import org.apache.pekko.stream.scaladsl.FileIO import org.elastic4play.BadRequestError import org.elastic4play.controllers.{Fields, FileInputValue} import org.elastic4play.database.ModifyConfig -import org.elastic4play.services.{AttachmentSrv, AuthContext, CreateSrv, UpdateSrv} +import org.elastic4play.services.{AttachmentSrv, AuthContext, CreateSrv, ExecutionContextSrv, UpdateSrv} import org.thp.cortex.models._ import play.api.libs.json._ import play.api.{Configuration, Logger} @@ -32,16 +31,20 @@ class JobRunnerSrv @Inject() ( createSrv: CreateSrv, updateSrv: UpdateSrv, attachmentSrv: AttachmentSrv, - akkaSystem: ActorSystem, + executionContextSrv: ExecutionContextSrv, implicit val ec: ExecutionContext, implicit val mat: Materializer ) { val logger: Logger = Logger(getClass) - private lazy val analyzerExecutionContext: ExecutionContext = akkaSystem.dispatchers.lookup("analyzer") - private lazy val responderExecutionContext: ExecutionContext = akkaSystem.dispatchers.lookup("responder") + private lazy val analyzerExecutionContext: ExecutionContext = executionContextSrv.get("analyzer") + private lazy val responderExecutionContext: ExecutionContext = executionContextSrv.get("responder") val jobDirectory: Path = Paths.get(config.get[String]("job.directory")) private val globalKeepJobFolder: Boolean = config.get[Boolean]("job.keepJobFolder") + private val imageRegistryPrefix: String = config.getOptional[String]("docker.imageRegistryPrefix").getOrElse("") + + private def applyImagePrefix(dockerImage: String): String = + JobRunnerSrv.applyImagePrefix(imageRegistryPrefix, dockerImage) private val runners: Seq[String] = config .getOptional[Seq[String]]("job.runners") @@ -164,7 +167,7 @@ class JobRunnerSrv @Inject() ( } } - private def extractReport(jobFolder: Path, job: Job)(implicit authContext: AuthContext): Future[Job] = { + private def extractReport(jobFolder: Path, job: Job)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Job] = { val outputFile = jobFolder.resolve("output").resolve("output.json") if (Files.exists(outputFile)) { logger.debug(s"Job output: ${new String(Files.readAllBytes(outputFile))}") @@ -216,7 +219,7 @@ class JobRunnerSrv @Inject() ( } def run(worker: Worker, job: Job)(implicit authContext: AuthContext): Future[Job] = { - val executionContext = worker.tpe() match { + val executionContext: ExecutionContext = worker.tpe() match { case WorkerType.analyzer => analyzerExecutionContext case WorkerType.responder => responderExecutionContext case x => @@ -234,6 +237,7 @@ class JobRunnerSrv @Inject() ( case (None, "kubernetes") => worker .dockerImage() + .map(applyImagePrefix) .map(dockerImage => k8sJobRunnerSrv.run(jobFolder, dockerImage, job, worker.jobTimeout().map(_.minutes))) .orElse { logger.warn(s"worker ${worker.id} can't be run with kubernetes (doesn't have image)") @@ -242,6 +246,7 @@ class JobRunnerSrv @Inject() ( case (None, "docker") => worker .dockerImage() + .map(applyImagePrefix) .map(dockerImage => dockerJobRunnerSrv.run(jobFolder, dockerImage, worker.jobTimeout().map(_.minutes))) .orElse { logger.warn(s"worker ${worker.id} can't be run with docker (doesn't have image)") @@ -267,17 +272,20 @@ class JobRunnerSrv @Inject() ( .flatten .transformWith { case _: Success[_] => - extractReport(maybeJobFolder.get /* can't be none */, job) + extractReport(maybeJobFolder.get /* can't be none */, job)(authContext, executionContext) case Failure(error) => logger.error(s"Worker execution fails", error) - endJob(job, JobStatus.Failure, Option(error.getMessage), maybeJobFolder.map(jf => readFile(jf.resolve("input").resolve("input.json")))) + endJob(job, JobStatus.Failure, Option(error.getMessage), maybeJobFolder.map(jf => readFile(jf.resolve("input").resolve("input.json"))))( + authContext, + executionContext + ) - } + }(executionContext) .andThen { case _ => if (!(job.params \ "keepJobFolder").asOpt[Boolean].contains(true) || globalKeepJobFolder) maybeJobFolder.foreach(delete) - } + }(executionContext) } private def readFile(input: Path): String = new String(Files.readAllBytes(input), StandardCharsets.UTF_8) @@ -294,7 +302,8 @@ class JobRunnerSrv @Inject() ( private def endJob(job: Job, status: JobStatus.Type, errorMessage: Option[String] = None, input: Option[String] = None)( implicit - authContext: AuthContext + authContext: AuthContext, + ec: ExecutionContext ): Future[Job] = { val fields = Fields .empty @@ -305,3 +314,9 @@ class JobRunnerSrv @Inject() ( updateSrv(job, fields, ModifyConfig.default) } } + +object JobRunnerSrv { + def applyImagePrefix(prefix: String, dockerImage: String): String = + if (prefix.isEmpty) dockerImage + else prefix + dockerImage +} diff --git a/app/org/thp/cortex/services/K8sJobRunnerSrv.scala b/app/org/thp/cortex/services/K8sJobRunnerSrv.scala index 588d82331..1b0a6f68a 100644 --- a/app/org/thp/cortex/services/K8sJobRunnerSrv.scala +++ b/app/org/thp/cortex/services/K8sJobRunnerSrv.scala @@ -1,9 +1,10 @@ package org.thp.cortex.services -import org.apache.pekko.actor.ActorSystem -import io.fabric8.kubernetes.api.model.{PersistentVolumeClaimVolumeSourceBuilder, StatusDetails} import io.fabric8.kubernetes.api.model.batch.v1.{JobBuilder => KJobBuilder} +import io.fabric8.kubernetes.api.model.{PersistentVolumeClaimVolumeSourceBuilder, PodSecurityContext, PodSecurityContextBuilder, StatusDetails} import io.fabric8.kubernetes.client.DefaultKubernetesClient +import org.apache.pekko.actor.ActorSystem +import org.elastic4play.utils.Hasher import org.thp.cortex.models._ import org.thp.cortex.util.FunctionalCondition._ import play.api.{Configuration, Logger} @@ -20,6 +21,8 @@ class K8sJobRunnerSrv( client: DefaultKubernetesClient, jobBaseDirectory: Path, persistentVolumeClaimName: Option[String], // if not provided k8s runner is unavailable + extraLabels: Map[String, String], + podSecurityContext: Option[PodSecurityContext], implicit val system: ActorSystem ) { @@ -29,20 +32,26 @@ class K8sJobRunnerSrv( new DefaultKubernetesClient(), Paths.get(config.get[String]("job.directory")), config.getOptional[String]("job.kubernetes.persistentVolumeClaimName"), + K8sJobRunnerSrv.parseLabels(config), + K8sJobRunnerSrv.parsePodSecurityContext(config), system: ActorSystem ) lazy val logger: Logger = Logger(getClass) + import K8sJobRunnerSrv.{maxNameLen, sanitizeLabelValue} + lazy val isAvailable: Boolean = Try { val ver = client.getVersion logger.info(s"Kubernetes is available: major ${ver.getMajor} minor ${ver.getMinor} git ${ver.getGitCommit}") } match { case _: Success[_] if persistentVolumeClaimName.isDefined => true - case _: Success[_] => - logger.info(s"Kubernetes is not available because no persistent volume claim is provided. " + - "Please add `job.kubernetes.persistentVolumeClaimName=...` in the configuration") + case _: Success[_] => + logger.info( + s"Kubernetes is not available because no persistent volume claim is provided. " + + "Please add `job.kubernetes.persistentVolumeClaimName=...` in the configuration" + ) false case Failure(error) => logger.info(s"Kubernetes is not available", error) @@ -50,35 +59,49 @@ class K8sJobRunnerSrv( } def run(jobDirectory: Path, dockerImage: String, job: Job, timeout: Option[FiniteDuration]): Try[Unit] = { - val cacertsFile = jobDirectory.resolve("input").resolve("cacerts") + val cacertsFile = jobDirectory.resolve("input").resolve("cacerts") val relativeJobDirectory = jobBaseDirectory.relativize(jobDirectory).toString // make the default longer than likely values, but still not infinite val timeout_or_default = timeout.getOrElse(8.hours) // https://kubernetes.io/docs/concepts/overview/working-with-objects/names/ - // FIXME: this collapses case, jeopardizing the uniqueness of the identifier. - // LDH: lowercase, digits, hyphens. - val ldh_jobid = job.id.toLowerCase().replace('_', '-') - val kjobName = "neuron-job-" + ldh_jobid - val pvcvs = new PersistentVolumeClaimVolumeSourceBuilder() + // K8s names must be RFC 1123 DNS labels: lowercase, digits, hyphens, max 63 chars. + // To preserve uniqueness after case-folding and char replacement, we append a short + // hash of the original ID when the sanitized name would need truncation. + val sanitizedJobId = sanitizeLabelValue(job.id) + val ldhJobId = job.id.toLowerCase().replace('_', '-').replaceAll("[^a-z0-9-]", "").replaceAll("^-+|-+$", "") + val prefix = "neuron-job-" + // FIXME to be tested by Sonny + val kJobName = if ((prefix + ldhJobId).length <= maxNameLen) { + prefix + ldhJobId + } else { + val hash = Hasher("SHA-256").fromString(job.id).head.toString.take(8) // 8 hex chars + val maxIdLen = maxNameLen - prefix.length - 1 - hash.length // 1 for the separator hyphen + prefix + ldhJobId.take(maxIdLen).stripSuffix("-") + "-" + hash + } + val persistentVolumeClaimVolumeSource = new PersistentVolumeClaimVolumeSourceBuilder() .withClaimName(persistentVolumeClaimName.get) .withReadOnly(false) .build() + val cortexLabels = Map("cortex-job-id" -> sanitizedJobId, "cortex-worker-id" -> sanitizeLabelValue(job.workerId()), "cortex-neuron-job" -> "true") + val allLabels = K8sJobRunnerSrv.buildJobLabels(cortexLabels, extraLabels) + val kjob = new KJobBuilder() .withApiVersion("batch/v1") .withNewMetadata() - .withName(kjobName) - .withLabels(Map( - "cortex-job-id" -> job.id, - "cortex-worker-id" -> job.workerId(), - "cortex-neuron-job" -> "true").asJava) + .withName(kJobName) + .withLabels(allLabels.asJava) .endMetadata() .withNewSpec() .withNewTemplate() + .withNewMetadata() + .withLabels(allLabels.asJava) + .endMetadata() .withNewSpec() + .when(podSecurityContext.isDefined)(_.withSecurityContext(podSecurityContext.get)) .addNewVolume() .withName("job-directory") - .withPersistentVolumeClaim(pvcvs) + .withPersistentVolumeClaim(persistentVolumeClaimVolumeSource) .endVolume() .addNewContainer() .withName("neuron") @@ -89,10 +112,10 @@ class K8sJobRunnerSrv( .withValue(relativeJobDirectory) .endEnv() .when(Files.exists(cacertsFile))( - _.addNewEnv() - .withName("REQUESTS_CA_BUNDLE") - .withValue("/job/input/cacerts") - .endEnv() + _.addNewEnv() + .withName("REQUESTS_CA_BUNDLE") + .withValue("/job/input/cacerts") + .endEnv() ) .addNewVolumeMount() .withName("job-directory") @@ -109,39 +132,55 @@ class K8sJobRunnerSrv( .endContainer() .withRestartPolicy("Never") .endSpec() - .endTemplate() - .endSpec() - .build() + .endTemplate() + .endSpec() + .build() val execution = Try { val created_kjob = client.batch().v1().jobs().create(kjob) val created_env = created_kjob - .getSpec.getTemplate.getSpec.getContainers.get(0) - .getEnv.asScala + .getSpec + .getTemplate + .getSpec + .getContainers + .get(0) + .getEnv + .asScala logger.info( s"Created Kubernetes Job ${created_kjob.getMetadata.getName}\n" + - s" timeout: ${timeout_or_default.toString}\n" + - s" image : $dockerImage\n" + - s" mount : pvc $persistentVolumeClaimName subdir $relativeJobDirectory as /job" + - created_env.map(ev => s"\n env : ${ev.getName} = ${ev.getValue}").mkString) - val ended_kjob = client.batch().v1().jobs().withLabel("cortex-job-id", job.id) - .waitUntilCondition(x => Option(x).flatMap(j => - Option(j.getStatus).flatMap(s => - Some(s.getConditions.asScala.map(_.getType).exists(t => - t.equals("Complete") || t.equals("Failed"))))) - .getOrElse(false), - timeout_or_default.length, timeout_or_default.unit) - if(ended_kjob != null) { - logger.info(s"Kubernetes Job ${ended_kjob.getMetadata.getName} " + - s"(for job ${job.id}) status is now ${ended_kjob.getStatus.toString}") + s" timeout: ${timeout_or_default.toString}\n" + + s" image : $dockerImage\n" + + s" mount : pvc $persistentVolumeClaimName subdir $relativeJobDirectory as /job" + + created_env.map(ev => s"\n env : ${ev.getName} = ${ev.getValue}").mkString + ) + val endedKJob = client + .batch() + .v1() + .jobs() + .withLabel("cortex-job-id", sanitizedJobId) + .waitUntilCondition( + x => + Option(x) + .flatMap(j => + Option(j.getStatus).flatMap(s => Some(s.getConditions.asScala.map(_.getType).exists(t => t.equals("Complete") || t.equals("Failed")))) + ) + .getOrElse(false), + timeout_or_default.length, + timeout_or_default.unit + ) + if (endedKJob != null) { + logger.info( + s"Kubernetes Job ${endedKJob.getMetadata.getName} " + + s"(for job ${job.id}) status is now ${endedKJob.getStatus.toString}" + ) } else { logger.info(s"Kubernetes Job for ${job.id} no longer exists") } } // let's find the job by the attribute we know is fundamentally // unique, rather than one constructed from it - val deleted: util.List[StatusDetails] = client.batch().v1().jobs().withLabel("cortex-job-id", job.id).delete() - if(!deleted.isEmpty) { + val deleted: util.List[StatusDetails] = client.batch().v1().jobs().withLabel("cortex-job-id", sanitizedJobId).delete() + if (!deleted.isEmpty) { logger.info(s"Deleted Kubernetes Job for job ${job.id}") } else { logger.info(s"While trying to delete Kubernetes Job for ${job.id}, the job was not found; this is OK") @@ -149,3 +188,77 @@ class K8sJobRunnerSrv( execution } } + +object K8sJobRunnerSrv { + private[services] val maxNameLen = 63 + private[services] val maxKeyLen = 253 + + /** Sanitize a string for use as a K8s label value: max 63 chars, alphanumeric/.-_ only, + * must start and end with an alphanumeric character. */ + private[services] def sanitizeLabelValue(s: String): String = + s.replaceAll("[^a-zA-Z0-9._-]", "").replaceAll("^[^a-zA-Z0-9]+", "").replaceAll("[^a-zA-Z0-9]+$", "").take(maxNameLen) + + private val labelKeyPattern = "^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?" + + "([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9]$" + private val labelValuePattern = "^(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?$" + private val labelKeyRegex = labelKeyPattern.r + private val labelValueRegex = labelValuePattern.r + private val logger: Logger = Logger(classOf[K8sJobRunnerSrv]) + + def parseLabels(config: Configuration): Map[String, String] = { + val labels = config + .getOptional[Configuration]("job.kubernetes.labels") + .fold(Map.empty[String, String])( + _.underlying.root().asScala.map { case (k, v) => k -> v.unwrapped().toString }.toMap + ) + labels.foreach { + case (k, v) => + if (k.length > maxKeyLen || labelKeyRegex.findFirstIn(k).isEmpty) + logger.warn(s"job.kubernetes.labels: key '$k' may be rejected by Kubernetes (invalid label key format)") + if (v.length > maxNameLen || labelValueRegex.findFirstIn(v).isEmpty) + logger.warn(s"job.kubernetes.labels: value '$v' for key '$k' may be rejected by Kubernetes (invalid label value format)") + } + labels + } + + def buildJobLabels(cortexLabels: Map[String, String], extraLabels: Map[String, String]): Map[String, String] = + cortexLabels ++ extraLabels + + private object SecCtxKey { + val RunAsUser = "runAsUser" + val RunAsGroup = "runAsGroup" + val FsGroup = "fsGroup" + val RunAsNonRoot = "runAsNonRoot" + val all: Set[String] = Set(RunAsUser, RunAsGroup, FsGroup, RunAsNonRoot) + } + + private def warnIfNegative(name: String, value: Long): Unit = + if (value < 0L) logger.warn(s"job.kubernetes.securityContext.$name=$value is negative; Kubernetes may reject the pod") + + def parsePodSecurityContext(config: Configuration): Option[PodSecurityContext] = + config.getOptional[Configuration]("job.kubernetes.securityContext").flatMap { sc => + val configuredKeys = sc.underlying.root().asScala.keys.toSet + configuredKeys.diff(SecCtxKey.all).foreach { unknown => + logger.warn(s"job.kubernetes.securityContext: unknown key '$unknown' (supported: ${SecCtxKey.all.mkString(", ")})") + } + + val runAsUser = sc.getOptional[Long](SecCtxKey.RunAsUser) + val runAsGroup = sc.getOptional[Long](SecCtxKey.RunAsGroup) + val fsGroup = sc.getOptional[Long](SecCtxKey.FsGroup) + val runAsNonRoot = sc.getOptional[Boolean](SecCtxKey.RunAsNonRoot) + + runAsUser.foreach(warnIfNegative(SecCtxKey.RunAsUser, _)) + runAsGroup.foreach(warnIfNegative(SecCtxKey.RunAsGroup, _)) + fsGroup.foreach(warnIfNegative(SecCtxKey.FsGroup, _)) + + if (runAsUser.isEmpty && runAsGroup.isEmpty && fsGroup.isEmpty && runAsNonRoot.isEmpty) None + else { + val builder = new PodSecurityContextBuilder() + runAsUser.foreach(v => builder.withRunAsUser(v)) + runAsGroup.foreach(v => builder.withRunAsGroup(v)) + fsGroup.foreach(v => builder.withFsGroup(v)) + runAsNonRoot.foreach(v => builder.withRunAsNonRoot(v)) + Some(builder.build()) + } + } +} diff --git a/app/org/thp/cortex/services/KeyAuthSrv.scala b/app/org/thp/cortex/services/KeyAuthSrv.scala index 41880549b..fe6665899 100644 --- a/app/org/thp/cortex/services/KeyAuthSrv.scala +++ b/app/org/thp/cortex/services/KeyAuthSrv.scala @@ -8,18 +8,20 @@ import org.elastic4play.{AuthenticationError, BadRequestError} import play.api.libs.json.JsArray import play.api.mvc.RequestHeader +import java.security.SecureRandom import java.util.Base64 import javax.inject.{Inject, Singleton} import scala.concurrent.{ExecutionContext, Future} -import scala.util.Random @Singleton class KeyAuthSrv @Inject() (userSrv: UserSrv, implicit val ec: ExecutionContext, implicit val mat: Materializer) extends AuthSrv { override val name = "key" + private val secureRandom = new SecureRandom() + final protected def generateKey(): String = { val bytes = Array.ofDim[Byte](24) - Random.nextBytes(bytes) + secureRandom.nextBytes(bytes) Base64.getEncoder.encodeToString(bytes) } diff --git a/app/org/thp/cortex/services/LocalAuthSrv.scala b/app/org/thp/cortex/services/LocalAuthSrv.scala index 57308188a..6d92fc197 100644 --- a/app/org/thp/cortex/services/LocalAuthSrv.scala +++ b/app/org/thp/cortex/services/LocalAuthSrv.scala @@ -1,15 +1,17 @@ package org.thp.cortex.services -import javax.inject.{Inject, Singleton} -import scala.concurrent.{ExecutionContext, Future} -import scala.util.Random -import play.api.mvc.RequestHeader import org.apache.pekko.stream.Materializer -import org.thp.cortex.models.User import org.elastic4play.controllers.Fields import org.elastic4play.services.{AuthCapability, AuthContext, AuthSrv} import org.elastic4play.utils.Hasher import org.elastic4play.{AuthenticationError, AuthorizationError} +import org.thp.cortex.models.User +import play.api.mvc.RequestHeader + +import java.security.SecureRandom +import javax.inject.{Inject, Singleton} +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Random.javaRandomToRandom @Singleton class LocalAuthSrv @Inject() (userSrv: UserSrv, implicit val ec: ExecutionContext, implicit val mat: Materializer) extends AuthSrv { @@ -17,6 +19,8 @@ class LocalAuthSrv @Inject() (userSrv: UserSrv, implicit val ec: ExecutionContex val name: String = "local" override val capabilities: Set[AuthCapability.Type] = Set(AuthCapability.changePassword, AuthCapability.setPassword) + private val random = new SecureRandom() + private[services] def doAuthenticate(user: User, password: String): Boolean = user.password().map(_.split(",", 2)).fold(false) { case Array(seed, pwd) => @@ -38,7 +42,7 @@ class LocalAuthSrv @Inject() (userSrv: UserSrv, implicit val ec: ExecutionContex } override def setPassword(username: String, newPassword: String)(implicit authContext: AuthContext): Future[Unit] = { - val seed = Random.nextString(10).replace(',', '!') + val seed = random.nextString(10).replace(',', '!') val newHash = seed + "," + Hasher("SHA-256").fromString(seed + newPassword).head.toString userSrv.update(username, Fields.empty.set("password", newHash)).map(_ => ()) } diff --git a/app/org/thp/cortex/services/MispSrv.scala b/app/org/thp/cortex/services/MispSrv.scala index 9b41a635f..02a6c0b00 100644 --- a/app/org/thp/cortex/services/MispSrv.scala +++ b/app/org/thp/cortex/services/MispSrv.scala @@ -91,7 +91,7 @@ class MispSrv @Inject() ( val message = job.errorMessage().getOrElse("This job has failed, without message!") Future.successful(Json.obj("error" -> message)) case JobStatus.InProgress => Future.successful(Json.obj("error" -> "This job hasn't finished yet")) - case x => Future.successful(Json.obj("error" -> s"Unhandled JobStatus $x")) + case x => Future.successful(Json.obj("error" -> s"Unhandled JobStatus $x")) } private def toArtifact(mispType: String, data: String): Either[String, Attachment] = diff --git a/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala b/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala index db441d72c..4f72c73a1 100644 --- a/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala +++ b/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala @@ -29,7 +29,8 @@ class ProcessJobRunnerSrv @Inject() (implicit val system: ActorSystem) { } }.getOrElse(None) - def run(jobDirectory: Path, command: String, job: Job, timeout: Option[FiniteDuration])(implicit + def run(jobDirectory: Path, command: String, job: Job, timeout: Option[FiniteDuration])( + implicit ec: ExecutionContext ): Try[Unit] = { val baseDirectory = Paths.get(command).getParent.getParent diff --git a/app/org/thp/cortex/services/StreamSrv.scala b/app/org/thp/cortex/services/StreamSrv.scala index 3d1114dc8..1af9021cc 100644 --- a/app/org/thp/cortex/services/StreamSrv.scala +++ b/app/org/thp/cortex/services/StreamSrv.scala @@ -2,7 +2,7 @@ package org.thp.cortex.services import org.apache.pekko.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Cancellable, DeadLetter, PoisonPill} import org.apache.pekko.stream.Materializer -import org.elastic4play.services._ +import org.elastic4play.services.{UserSrv => _, _} import org.elastic4play.utils.Instance import play.api.Logger import play.api.libs.json.JsObject @@ -29,7 +29,7 @@ class DeadLetterMonitoringActor @Inject() (system: ActorSystem) extends Actor { } override def receive: Receive = { - case DeadLetter(StreamActor.GetOperations, sender, recipient) => + case DeadLetter(_: StreamActor.GetOperations, sender, recipient) => logger.warn(s"receive dead GetOperations message, $sender → $recipient") sender ! StreamActor.StreamNotFound case other => @@ -42,13 +42,16 @@ object StreamActor { case class Initialize(requestId: String) extends EventMessage /* Request process has finished, prepare to send associated messages */ case class Commit(requestId: String) extends EventMessage - /* Ask messages, wait if there is no ready messages*/ - case object GetOperations + /* Ask messages, wait if there is no ready messages. Carries the polling user's + organization (None for the unbound bootstrap stream) so the actor can authorize the access. */ + case class GetOperations(organizationId: Option[String]) /* Pending messages must be sent to sender */ case object Submit /* List of ready messages */ case class StreamMessages(messages: Seq[JsObject]) case object StreamNotFound + /* Internal: an audit operation that has passed the organization filter and may be buffered */ + private case class FilteredAuditOperation(operation: AuditOperation) } class StreamActor( @@ -57,7 +60,9 @@ class StreamActor( nextItemMaxWait: FiniteDuration, globalMaxWait: FiniteDuration, eventSrv: EventSrv, - auxSrv: AuxSrv + auxSrv: AuxSrv, + userSrv: UserSrv, + streamOrganizationId: Option[String] ) extends Actor with ActorLogging { import context.dispatcher @@ -98,11 +103,11 @@ class StreamActor( } } - var killCancel: Cancellable = FakeCancellable + private var killCancel: Cancellable = FakeCancellable /** renew global timer and rearm it */ - def renewExpiration(): Unit = + private def renewExpiration(): Unit = if (killCancel.cancel()) killCancel = context.system.scheduler.scheduleOnce(cacheExpiration, self, PoisonPill) @@ -154,8 +159,19 @@ class StreamActor( case EndOfMigrationEvent => context.become(receiveWithState(waitingRequest.map(_.renew), currentMessages + ("end" -> Some(MigrationEventGroup.endOfMigration)))) - /* */ case operation: AuditOperation => + // AuditOperation is delivered through the event bus with no sender, so a foreign-org + // operation is simply ignored (there is no poller waiting for a reply here). + userSrv + .getOrganizationId(operation.authContext.userId) + .foreach { organizationId => + // An unbound (bootstrap) stream owns no organization, so it never buffers audit operations. + if (streamOrganizationId.contains(organizationId)) { + self ! FilteredAuditOperation(operation) + } + } + + case FilteredAuditOperation(operation) => val requestId = operation.authContext.requestId val normalizedOperation = normalizeOperation(operation) logger.debug(s"Receiving audit operation : $operation ⇒ $normalizedOperation") @@ -175,13 +191,20 @@ class StreamActor( } context.become(receiveWithState(waitingRequest.map(_.renew), currentMessages + (requestId -> Some(updatedOperationGroup)))) - case GetOperations => + case GetOperations(organizationId) => renewExpiration() - waitingRequest.foreach { wr => - wr.submit(Nil) - logger.error("Multiple requests !") + // A poller may read this stream only if it belongs to the organization that owns the + // stream. Otherwise behave as if the stream did not exist, to avoid leaking its existence. + if (streamOrganizationId == organizationId) { + waitingRequest.foreach { wr => + wr.submit(Nil) + logger.error("Multiple requests !") + } + context.become(receiveWithState(Some(new WaitingRequest(sender())), currentMessages)) + } else { + logger.warn(s"Unauthorized stream access attempt from organization '${organizationId.getOrElse("")}'") + sender() ! StreamNotFound } - context.become(receiveWithState(Some(new WaitingRequest(sender())), currentMessages)) case Submit => waitingRequest match { diff --git a/app/org/thp/cortex/util/docker/DockerClient.scala b/app/org/thp/cortex/util/docker/DockerClient.scala index bd57bdb51..b16671f16 100644 --- a/app/org/thp/cortex/util/docker/DockerClient.scala +++ b/app/org/thp/cortex/util/docker/DockerClient.scala @@ -8,7 +8,6 @@ import play.api.{Configuration, Logger} import java.nio.file.{Files, Path} import java.time.Duration -import java.util.Collections import java.util.concurrent.{Executors, TimeUnit} import scala.concurrent.blocking import scala.concurrent.duration.FiniteDuration @@ -46,6 +45,7 @@ class DockerClient(config: Configuration) { val containerCmd = underlyingClient .createContainerCmd(image) .withHostConfig(configure(jobDirectory, jobBaseDirectory, dockerJobBaseDirectory)) + .withEnv("JAVA_TOOL_OPTIONS=-Dfile.encoding=UTF8") if (Files.exists(jobDirectory.resolve("input").resolve("cacerts"))) containerCmd.withEnv(s"REQUESTS_CA_BUNDLE=/job/input/cacerts") val containerResponse = containerCmd.exec() @@ -113,13 +113,13 @@ class DockerClient(config: Configuration) { } private def getContainerStatus(containerId: String): Option[String] = - underlyingClient - .listContainersCmd() - .withIdFilter(Collections.singletonList(containerId)) - .exec() - .asScala - .headOption - .map(_.getStatus) + Try( + underlyingClient + .inspectContainerCmd(containerId) + .exec() + .getState + .getStatus + ).toOption private def isContainerRunning(containerId: String): Boolean = underlyingClient diff --git a/build.sbt b/build.sbt index 189bf0668..ea5feeb7f 100644 --- a/build.sbt +++ b/build.sbt @@ -1,23 +1,65 @@ import Common.* -import Dependencies.{nettyVersion, pekkoVersion} +import Dependencies.{bouncyCastleVersion, jacksonVersion, nettyVersion, pekkoVersion, vertxVersion} ThisBuild / scalaVersion := Dependencies.scalaVersion ThisBuild / evictionErrorLevel := util.Level.Warn +// CVE-2025-12183 / CVE-2025-66566: the org.lz4:lz4-java project was archived and +// fixes ship under at.yawk.lz4:lz4-java (added below in elastic4play's deps). +// Strip the vulnerable transitive everywhere to avoid two copies on the classpath. +ThisBuild / excludeDependencies += ExclusionRule("org.lz4", "lz4-java") + ThisBuild / dependencyOverrides ++= Seq( + // jackson-module-scala 2.21.x declares scala-library 2.13.18; keep it pinned to our + // compiler version to avoid the SIP-51 "scala-library newer than compiler" build error. + "org.scala-lang" % "scala-library" % Dependencies.scalaVersion, Dependencies.Play.twirl, - "com.fasterxml.jackson.core" % "jackson-databind" % "2.19.2", - "org.apache.commons" % "commons-compress" % "1.28.0", - "com.google.guava" % "guava" % "32.1.1-jre", - "org.apache.pekko" %% "pekko-actor" % pekkoVersion, - "org.apache.pekko" %% "pekko-serialization-jackson" % pekkoVersion, - "org.apache.pekko" %% "pekko-actor-typed" % pekkoVersion, - "org.apache.pekko" %% "pekko-slf4j" % pekkoVersion, - "ch.qos.logback" % "logback-core" % "1.5.20", - "io.netty" % "netty-handler-proxy" % nettyVersion, - "io.netty" % "netty-resolver-dns" % nettyVersion, - "io.netty" % "netty-codec-http" % nettyVersion, - "io.netty" % "netty-codec-http2" % nettyVersion + // CVE-2026-54512 / CVE-2026-54513: PolymorphicTypeValidator bypass via generic type + // parameters, fixed in 2.21.4. Pulled in (as a family) via pekko-serialization-jackson. + // Bumped to 2.22.0 (latest) to stay ahead of the 2.x advisory line. + // Keep the whole jackson family aligned to avoid mixed-version deserialization issues. + "com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion, + "com.fasterxml.jackson.core" % "jackson-core" % jacksonVersion, + "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % jacksonVersion, + "com.fasterxml.jackson.dataformat" % "jackson-dataformat-yaml" % jacksonVersion, + "com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % jacksonVersion, + "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % jacksonVersion, + "com.fasterxml.jackson.module" % "jackson-module-parameter-names" % jacksonVersion, + "com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion, + "org.apache.commons" % "commons-compress" % "1.28.0", + "com.google.guava" % "guava" % "32.1.1-jre", + "org.apache.pekko" %% "pekko-actor" % pekkoVersion, + "org.apache.pekko" %% "pekko-serialization-jackson" % pekkoVersion, + "org.apache.pekko" %% "pekko-actor-typed" % pekkoVersion, + "org.apache.pekko" %% "pekko-slf4j" % pekkoVersion, + "ch.qos.logback" % "logback-core" % "1.5.32", + "ch.qos.logback" % "logback-classic" % "1.5.32", + // 4.1.135.Final fixes a batch of 2026 netty CVEs across codec-http (CVE-2026-42587, + // CVE-2026-50020), codec-http2 (CVE-2026-48043, CVE-2026-50560, CVE-2026-47244), + // handler (CVE-2026-44249, CVE-2026-50010, CVE-2026-45416) and resolver-dns + // (CVE-2026-45674, CVE-2026-47691, CVE-2026-45673). Keep the whole family aligned. + "io.netty" % "netty-buffer" % nettyVersion, + "io.netty" % "netty-common" % nettyVersion, + "io.netty" % "netty-transport" % nettyVersion, + "io.netty" % "netty-transport-native-unix-common" % nettyVersion, + "io.netty" % "netty-resolver" % nettyVersion, + "io.netty" % "netty-resolver-dns" % nettyVersion, + "io.netty" % "netty-codec" % nettyVersion, + "io.netty" % "netty-codec-dns" % nettyVersion, + "io.netty" % "netty-codec-socks" % nettyVersion, + "io.netty" % "netty-codec-http" % nettyVersion, + "io.netty" % "netty-codec-http2" % nettyVersion, + "io.netty" % "netty-handler" % nettyVersion, + "io.netty" % "netty-handler-proxy" % nettyVersion, + // CVE-2026-5588: bcpkix <= 1.83 accepts empty composite signature sequences. Pulled in via docker-java-core. + "org.bouncycastle" % "bcpkix-jdk18on" % bouncyCastleVersion, + "org.bouncycastle" % "bcprov-jdk18on" % bouncyCastleVersion, + "org.bouncycastle" % "bcutil-jdk18on" % bouncyCastleVersion, + // CVE-2025-49574 (fixed in 4.5.16) and CVE-2026-1002 (fixed in 4.5.24). Pulled in via io.fabric8:kubernetes-client. + "io.vertx" % "vertx-core" % vertxVersion, + "io.vertx" % "vertx-auth-common" % vertxVersion, + "io.vertx" % "vertx-web-client" % vertxVersion, + "io.vertx" % "vertx-web-common" % vertxVersion ) lazy val cortex = (project in file(".")) .enablePlugins(PlayScala) @@ -69,12 +111,13 @@ lazy val elastic4play = (project in file("elastic4play")) .settings( libraryDependencies ++= Seq( cacheApi, - "nl.gn0s1s" %% "elastic4s-core" % elastic4sVersion, + "nl.gn0s1s" %% "elastic4s-core" % elastic4sVersion, "nl.gn0s1s" %% "elastic4s-reactivestreams-pekko" % elastic4sVersion, - "nl.gn0s1s" %% "elastic4s-client-esjava" % elastic4sVersion, - "org.apache.pekko" %% "pekko-stream-testkit" % pekkoVersion % Test, - "org.scalactic" %% "scalactic" % "3.2.19", - specs2 % Test + "nl.gn0s1s" %% "elastic4s-client-esjava" % elastic4sVersion, + "org.apache.pekko" %% "pekko-stream-testkit" % pekkoVersion % Test, + "org.scalactic" %% "scalactic" % "3.2.19", + Dependencies.lz4Java, + specs2 % Test ) ) diff --git a/conf/application.sample b/conf/application.sample index ae601da22..fe2b70f5b 100644 --- a/conf/application.sample +++ b/conf/application.sample @@ -54,6 +54,32 @@ search { # analyzer. The cache is used only if the second job occurs within cache.job (the default is 10 minutes). cache.job = 10 minutes +## KUBERNETES JOB SETTINGS +# +# job.kubernetes { +# # Extra labels applied to all neuron Job and Pod resources. +# # Useful for ArgoCD tracking, cost allocation, or team ownership. +# # Keys must be valid K8s label keys (max 253 chars prefix + 63 chars name). +# # Values must be max 63 chars, alphanumeric start/end, with [-_.] allowed. +# # Example: make neuron jobs visible in ArgoCD application tree: +# labels { +# "app.kubernetes.io/instance" = "cortex" +# "app.kubernetes.io/managed-by" = "Helm" +# "app.kubernetes.io/part-of" = "cortex" +# } +# +# # Pod-level security context applied to all neuron Job pods. +# # Useful for restricted environments requiring non-root execution. +# # WARNING: many analyzer/responder images expect to run as root. +# # Test your analyzers/responders before configuring securityContext. +# securityContext { +# runAsUser = 1000 +# runAsGroup = 1000 +# fsGroup = 1000 +# runAsNonRoot = true +# } +# } + ## Authentication auth { # "provider" parameter contains the authentication provider(s). It can be multi-valued, which is useful @@ -182,15 +208,18 @@ analyzer { #"/absolute/path/of/analyzers" ] - # Sane defaults. Do not change unless you know what you are doing. - fork-join-executor { - # Min number of threads available for analysis. - parallelism-min = 2 - # Parallelism (threads) ... ceil(available processors * factor). - parallelism-factor = 2.0 - # Max number of threads available for analysis. - parallelism-max = 4 + # Dedicated thread pool that runs analyzer jobs. Jobs block for their whole + # lifetime (container/pod execution, file & ES I/O), so a thread-pool-executor + # is used instead of a fork-join pool. fixed-pool-size = max analyzer jobs run + # concurrently; extra jobs queue. Tune to your runner: kubernetes/docker mostly + # wait on the pod/container (can be high, e.g. dozens); the process runner spawns + # local subprocesses (size more conservatively). + type = Dispatcher + executor = "thread-pool-executor" + thread-pool-executor { + fixed-pool-size = 20 // 20 jobs in parallel, size according to your k8s or server resources } + throughput = 1 } # RESPONDERS @@ -202,15 +231,13 @@ responder { #"/absolute/path/of/responders" ] - # Sane defaults. Do not change unless you know what you are doing. - fork-join-executor { - # Min number of threads available for analysis. - parallelism-min = 2 - # Parallelism (threads) ... ceil(available processors * factor). - parallelism-factor = 2.0 - # Max number of threads available for analysis. - parallelism-max = 4 + # Dedicated thread pool that runs responder jobs (see analyzer above). + type = Dispatcher + executor = "thread-pool-executor" + thread-pool-executor { + fixed-pool-size = 20 } + throughput = 1 } # Proxy configuration to retrieve catalogs @@ -256,6 +283,11 @@ responder { # memorySwappiness = 0 # } # autoUpdate = false +# # Prepend a registry prefix to all neuron docker images at runtime. +# # Useful for private registries (e.g., Harbor proxy cache). +# # The value is prepended as-is; include a trailing '/' if needed. +# # Example: "harbor.example.com/docker.io/" +# imageRegistryPrefix = "" # pullImageTimeout = 10 minutes # } diff --git a/conf/reference.conf b/conf/reference.conf index d5dd5a488..beb60b24a 100644 --- a/conf/reference.conf +++ b/conf/reference.conf @@ -15,6 +15,10 @@ job { directory = ${java.io.tmpdir} dockerDirectory = ${job.directory} keepJobFolder = false + kubernetes { + labels {} + securityContext {} + } } # HTTP filters @@ -99,14 +103,15 @@ analyzer { # Directory that holds analyzers urls = [] - fork-join-executor { - # Min number of threads available for analyze - parallelism-min = 2 - # Parallelism (threads) ... ceil(available processors * factor) - parallelism-factor = 2.0 - # Max number of threads available for analyze - parallelism-max = 4 + # Dedicated thread pool that runs analyzer jobs. Jobs block for their whole + # lifetime (container/pod execution, file & ES I/O), so a thread-pool-executor + # is used instead of a fork-join pool. + type = Dispatcher + executor = "thread-pool-executor" + thread-pool-executor { + fixed-pool-size = 20 // 20 jobs in parallel, size according to your k8s or server resources } + throughput = 1 } @@ -114,14 +119,14 @@ responder { # Directory that holds responders urls = [] - fork-join-executor { - # Min number of threads available for analyze - parallelism-min = 2 - # Parallelism (threads) ... ceil(available processors * factor) - parallelism-factor = 2.0 - # Max number of threads available for analyze - parallelism-max = 4 + # Dedicated thread pool that runs responder jobs (see analyzer above). + type = Dispatcher + executor = "thread-pool-executor" + thread-pool-executor { + fixed-pool-size = 20 } + throughput = 1 } docker.pullImageTimeout = 10 minutes +docker.imageRegistryPrefix = "" diff --git a/package/docker/entrypoint b/package/docker/entrypoint index 9df157fb1..eba838234 100755 --- a/package/docker/entrypoint +++ b/package/docker/entrypoint @@ -21,24 +21,26 @@ DAEMON_USER=${daemon_user:-1001:1001} JOB_DIRECTORY=${job_directory:-/tmp/cortex-jobs} DOCKER_JOB_DIRECTORY=${docker_job_directory:-} KUBERNETES_JOB_PVC=${kubernetes_job_pvc:-} +DOCKER_IMAGE_REGISTRY_PREFIX=${docker_image_registry_prefix:-} function usage { cat <<- _EOF_ Available options: - --no-config | do not configure TheHive (add secret and elasticsearch) - --no-config-secret | do not add random secret to configuration - --no-config-es | do not add elasticsearch hosts to configuration - --es-uri | use this string to configure elasticsearch hosts (format: http(s)://host:port,host:port(/prefix)?querystring) - --es-hostname | resolve this hostname to find elasticsearch instances - --secret | secret to secure sessions - --show-secret | show the generated secret - --job-directory | use this directory to store job files - --docker-job-directory | indicate the job directory in the host (not inside container) - --kubernetes-job-pvc | indicate the ReadWriteMany persistent volume claim holding job directory - --analyzer-url | where analyzers are located (url or path) - --responder-url | where responders are located (url or path) - --start-docker | start a internal docker (inside container) to run analyzers/responders - --daemon-user [:] | run cortex using this user + --no-config | do not configure TheHive (add secret and elasticsearch) + --no-config-secret | do not add random secret to configuration + --no-config-es | do not add elasticsearch hosts to configuration + --es-uri | use this string to configure elasticsearch hosts (format: http(s)://host:port,host:port(/prefix)?querystring) + --es-hostname | resolve this hostname to find elasticsearch instances + --secret | secret to secure sessions + --show-secret | show the generated secret + --job-directory | use this directory to store job files + --docker-job-directory | indicate the job directory in the host (not inside container) + --kubernetes-job-pvc | indicate the ReadWriteMany persistent volume claim holding job directory + --docker-image-registry-prefix | prepend this prefix to all neuron docker images + --analyzer-url | where analyzers are located (url or path) + --responder-url | where responders are located (url or path) + --start-docker | start a internal docker (inside container) to run analyzers/responders + --daemon-user [:] | run cortex using this user _EOF_ exit 1 } @@ -47,28 +49,29 @@ STOP=0 while test $# -gt 0 -o $STOP = 1 do case "$1" in - "--no-config") CONFIG=0;; - "--no-config-secret") CONFIG_SECRET=0;; - "--no-config-es") CONFIG_ES=0;; - "--es-hosts") echo "--es-hosts is deprecated, please use --es-uri" - usage;; - "--es-uri") shift; ES_URI=$1;; - "--es-hostname") shift; ES_HOSTNAME=$1;; - "--secret") shift; SECRET=$1;; - "--show-secret") SHOW_SECRET=1;; - "--job-directory") shift; JOB_DIRECTORY=$1;; - "--docker-job-directory") shift; DOCKER_JOB_DIRECTORY=$1;; - "--kubernetes-job-pvc") shift; KUBERNETES_JOB_PVC=$1;; - "--analyzer-path") echo "--analyzer-path is deprecated, please use --analyzer-url" - shift; ANALYZER_URLS+=("$1");; - "--responder-path") echo "--responder-path is deprecated, please use --responder-url" - shift; RESPONDER_URLS+=("$1");; - "--analyzer-url") shift; ANALYZER_URLS+=("$1");; - "--responder-url") shift; RESPONDER_URLS+=("$1");; - "--start-docker") START_DOCKER=1;; - "--daemon-user") shift; DAEMON_USER=$1;; - "--") STOP=1;; - *) echo "unrecognized option: $1"; usage;; + "--no-config") CONFIG=0;; + "--no-config-secret") CONFIG_SECRET=0;; + "--no-config-es") CONFIG_ES=0;; + "--es-hosts") echo "--es-hosts is deprecated, please use --es-uri" + usage;; + "--es-uri") shift; ES_URI=$1;; + "--es-hostname") shift; ES_HOSTNAME=$1;; + "--secret") shift; SECRET=$1;; + "--show-secret") SHOW_SECRET=1;; + "--job-directory") shift; JOB_DIRECTORY=$1;; + "--docker-job-directory") shift; DOCKER_JOB_DIRECTORY=$1;; + "--kubernetes-job-pvc") shift; KUBERNETES_JOB_PVC=$1;; + "--docker-image-registry-prefix") shift; DOCKER_IMAGE_REGISTRY_PREFIX=$1;; + "--analyzer-path") echo "--analyzer-path is deprecated, please use --analyzer-url" + shift; ANALYZER_URLS+=("$1");; + "--responder-path") echo "--responder-path is deprecated, please use --responder-url" + shift; RESPONDER_URLS+=("$1");; + "--analyzer-url") shift; ANALYZER_URLS+=("$1");; + "--responder-url") shift; RESPONDER_URLS+=("$1");; + "--start-docker") START_DOCKER=1;; + "--daemon-user") shift; DAEMON_USER=$1;; + "--") STOP=1;; + *) echo "unrecognized option: $1"; usage;; esac shift done @@ -116,6 +119,7 @@ then test -n "$JOB_DIRECTORY" && echo "job.directory=\"$JOB_DIRECTORY\"" >> "$CONFIG_FILE" test -n "$DOCKER_JOB_DIRECTORY" && echo "job.dockerDirectory=\"$DOCKER_JOB_DIRECTORY\"" >> "$CONFIG_FILE" test -n "$KUBERNETES_JOB_PVC" && echo "job.kubernetes.persistentVolumeClaimName=\"$KUBERNETES_JOB_PVC\"" >> "$CONFIG_FILE" + test -n "$DOCKER_IMAGE_REGISTRY_PREFIX" && echo "docker.imageRegistryPrefix=\"$DOCKER_IMAGE_REGISTRY_PREFIX\"" >> "$CONFIG_FILE" function join_urls { echo -n "\"$1\"" diff --git a/project/Dependencies.scala b/project/Dependencies.scala index d79fd3f83..e48794c1f 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -1,10 +1,14 @@ import sbt.* object Dependencies { - val scalaVersion = "2.13.17" - val dockerJavaVersion = "3.6.0" - val pekkoVersion = "1.2.1" - val nettyVersion = "4.1.128.Final" + val scalaVersion = "2.13.17" + val dockerJavaVersion = "3.6.0" + val pekkoVersion = "1.2.1" + val nettyVersion = "4.1.135.Final" + val bouncyCastleVersion = "1.84" + val vertxVersion = "4.5.27" + val lz4Version = "1.11.0" + val jacksonVersion = "2.22.0" object Play { val version: String = play.core.PlayVersion.current @@ -24,6 +28,7 @@ object Dependencies { val dockerJavaClient = "com.github.docker-java" % "docker-java-core" % dockerJavaVersion val dockerJavaTransport = "com.github.docker-java" % "docker-java-transport-zerodep" % dockerJavaVersion val k8sClient = "io.fabric8" % "kubernetes-client" % "7.4.0" + val lz4Java = "at.yawk.lz4" % "lz4-java" % lz4Version val pekkoCluster = "org.apache.pekko" %% "pekko-cluster" % pekkoVersion val pekkoClusterTyped = "org.apache.pekko" %% "pekko-cluster-typed" % pekkoVersion } diff --git a/project/plugins.sbt b/project/plugins.sbt index a23618210..cf63769ca 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -9,7 +9,7 @@ def maybeEnable(pair: (String, ModuleID)): Seq[Setting[?]] = Seq[(String, ModuleID)]( "sbom" -> "com.github.sbt" %% "sbt-sbom" % "0.5.0", - "depcheck" -> "net.nmoncho" % "sbt-dependency-check" % "1.8.3" + "depcheck" -> "net.nmoncho" % "sbt-dependency-check" % "1.9.0" ).flatMap(maybeEnable) // The Play plugin diff --git a/test/org/thp/cortex/services/JobRunnerSrvSpec.scala b/test/org/thp/cortex/services/JobRunnerSrvSpec.scala new file mode 100644 index 000000000..ad34c2f64 --- /dev/null +++ b/test/org/thp/cortex/services/JobRunnerSrvSpec.scala @@ -0,0 +1,31 @@ +package org.thp.cortex.services + +import org.junit.runner.RunWith +import org.specs2.runner.JUnitRunner +import play.api.test.PlaySpecification + +@RunWith(classOf[JUnitRunner]) +class JobRunnerSrvSpec extends PlaySpecification { + + "JobRunnerSrv.applyImagePrefix" should { + + "return the original image when prefix is empty" in { + JobRunnerSrv.applyImagePrefix("", "cortexneurons/abuseipdb:1") must_== "cortexneurons/abuseipdb:1" + } + + "prepend the prefix to the image" in { + JobRunnerSrv.applyImagePrefix("harbor.example.com/docker.io/", "cortexneurons/abuseipdb:1") must_== + "harbor.example.com/docker.io/cortexneurons/abuseipdb:1" + } + + "prepend the prefix to an image that already includes docker.io" in { + JobRunnerSrv.applyImagePrefix("harbor.example.com/", "docker.io/cortexneurons/abuseipdb:1") must_== + "harbor.example.com/docker.io/cortexneurons/abuseipdb:1" + } + + "handle prefix without trailing slash" in { + JobRunnerSrv.applyImagePrefix("harbor.example.com", "cortexneurons/abuseipdb:1") must_== + "harbor.example.comcortexneurons/abuseipdb:1" + } + } +} diff --git a/test/org/thp/cortex/services/K8sJobRunnerSrvSpec.scala b/test/org/thp/cortex/services/K8sJobRunnerSrvSpec.scala new file mode 100644 index 000000000..e95c3904f --- /dev/null +++ b/test/org/thp/cortex/services/K8sJobRunnerSrvSpec.scala @@ -0,0 +1,161 @@ +package org.thp.cortex.services + +import com.typesafe.config.ConfigFactory +import io.fabric8.kubernetes.api.model.PodSecurityContext +import org.junit.runner.RunWith +import org.specs2.runner.JUnitRunner +import play.api.Configuration +import play.api.test.PlaySpecification + +@RunWith(classOf[JUnitRunner]) +class K8sJobRunnerSrvSpec extends PlaySpecification { + + "K8sJobRunnerSrv.parseLabels" should { + + "parse HOCON keys with slashes and dots without extra quotes" in { + val config = Configuration( + ConfigFactory.parseString( + """job.kubernetes.labels { + | "app.kubernetes.io/instance" = "cortex" + | "app.kubernetes.io/managed-by" = "Helm" + |}""".stripMargin + ) + ) + val labels = K8sJobRunnerSrv.parseLabels(config) + labels must_== Map("app.kubernetes.io/instance" -> "cortex", "app.kubernetes.io/managed-by" -> "Helm") + forall(labels.keys)(k => k must not(startWith("\""))) + } + + "return empty map when labels config is absent" in { + val config = Configuration(ConfigFactory.parseString("job.kubernetes {}")) + K8sJobRunnerSrv.parseLabels(config) must_== Map.empty[String, String] + } + + "return empty map when labels block is empty" in { + val config = Configuration(ConfigFactory.parseString("job.kubernetes.labels {}")) + K8sJobRunnerSrv.parseLabels(config) must_== Map.empty[String, String] + } + } + + "K8sJobRunnerSrv.buildJobLabels" should { + + val cortexLabels = Map( + "cortex-job-id" -> "abc123", + "cortex-worker-id" -> "worker1", + "cortex-neuron-job" -> "true" + ) + + "return only cortex labels when extra labels is empty" in { + K8sJobRunnerSrv.buildJobLabels(cortexLabels, Map.empty) must_== cortexLabels + } + + "merge extra labels with cortex labels" in { + val extra = Map( + "app.kubernetes.io/instance" -> "cortex", + "app.kubernetes.io/managed-by" -> "Helm" + ) + val result = K8sJobRunnerSrv.buildJobLabels(cortexLabels, extra) + result must_== cortexLabels ++ extra + } + + "allow extra labels to override cortex labels" in { + val extra = Map("cortex-neuron-job" -> "custom-value") + val result = K8sJobRunnerSrv.buildJobLabels(cortexLabels, extra) + result("cortex-neuron-job") must_== "custom-value" + } + + "preserve all cortex labels when extra labels don't overlap" in { + val extra = Map("team" -> "soc") + val result = K8sJobRunnerSrv.buildJobLabels(cortexLabels, extra) + result("cortex-job-id") must_== "abc123" + result("cortex-worker-id") must_== "worker1" + result("cortex-neuron-job") must_== "true" + result("team") must_== "soc" + } + } + + "K8sJobRunnerSrv.parsePodSecurityContext" should { + + "return None when securityContext config is absent" in { + val config = Configuration(ConfigFactory.parseString("job.kubernetes {}")) + K8sJobRunnerSrv.parsePodSecurityContext(config) must beNone + } + + "parse runAsUser, runAsGroup, and fsGroup" in { + val config = Configuration( + ConfigFactory.parseString( + """job.kubernetes.securityContext { + | runAsUser = 1000 + | runAsGroup = 1000 + | fsGroup = 1000 + |}""".stripMargin + ) + ) + val ctx = K8sJobRunnerSrv.parsePodSecurityContext(config) + ctx must beSome[PodSecurityContext] + ctx.get.getRunAsUser must_== 1000L + ctx.get.getRunAsGroup must_== 1000L + ctx.get.getFsGroup must_== 1000L + } + + "parse runAsNonRoot" in { + val config = Configuration( + ConfigFactory.parseString( + """job.kubernetes.securityContext { + | runAsNonRoot = true + |}""".stripMargin + ) + ) + val ctx = K8sJobRunnerSrv.parsePodSecurityContext(config) + ctx must beSome[PodSecurityContext] + ctx.get.getRunAsNonRoot must_== true + } + + "handle partial config with only runAsUser" in { + val config = Configuration( + ConfigFactory.parseString( + """job.kubernetes.securityContext { + | runAsUser = 65534 + |}""".stripMargin + ) + ) + val ctx = K8sJobRunnerSrv.parsePodSecurityContext(config) + ctx must beSome[PodSecurityContext] + ctx.get.getRunAsUser must_== 65534L + ctx.get.getRunAsGroup must beNull + ctx.get.getFsGroup must beNull + } + + "return None when securityContext block is empty" in { + val config = Configuration(ConfigFactory.parseString("job.kubernetes.securityContext {}")) + K8sJobRunnerSrv.parsePodSecurityContext(config) must beNone + } + + "still build the context when an unknown key is present (ignored with a warning)" in { + val config = Configuration( + ConfigFactory.parseString( + """job.kubernetes.securityContext { + | runAsUser = 1000 + | unknownField = "something" + |}""".stripMargin + ) + ) + val ctx = K8sJobRunnerSrv.parsePodSecurityContext(config) + ctx must beSome[PodSecurityContext] + ctx.get.getRunAsUser must_== 1000L + } + + "still build the context when given a negative UID (logs a warning but does not reject)" in { + val config = Configuration( + ConfigFactory.parseString( + """job.kubernetes.securityContext { + | runAsUser = -1 + |}""".stripMargin + ) + ) + val ctx = K8sJobRunnerSrv.parsePodSecurityContext(config) + ctx must beSome[PodSecurityContext] + ctx.get.getRunAsUser must_== -1L + } + } +} diff --git a/version.sbt b/version.sbt index d3b09bd42..ccaba4a0c 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -ThisBuild / version := "4.0.1-1" +ThisBuild / version := "4.1.0-1" diff --git a/www/package.json b/www/package.json old mode 100755 new mode 100644 index 68585da47..a7fd5042b --- a/www/package.json +++ b/www/package.json @@ -76,6 +76,6 @@ "style-loader": "^0.13.1", "url-loader": "^0.5.9", "webpack": "3.5.0", - "webpack-dev-server": "2.2.0" + "webpack-dev-server": "5.2.5" } } diff --git a/www/src/app/core/core.module.js b/www/src/app/core/core.module.js index 55bd52a2d..f7f68a618 100755 --- a/www/src/app/core/core.module.js +++ b/www/src/app/core/core.module.js @@ -10,6 +10,7 @@ import userAvatarDirective from './directives/user-avatar/user-avatar.directive' import tlpDirective from './directives/tlp/tlp.directive'; import taxonomieDirective from './directives/taxonomie/taxonomie.directive'; import autofocusDirective from './directives/autofocus/autofocus.directive'; +import nullIfEmpty from './directives/null-if-empty/null-if-empty.directive'; import constants from './services/constants'; @@ -44,6 +45,7 @@ userAvatarDirective(core) tlpDirective(core); taxonomieDirective(core); autofocusDirective(core); +nullIfEmpty(core); /* Common services */ diff --git a/www/src/app/core/directives/null-if-empty/null-if-empty.directive.js b/www/src/app/core/directives/null-if-empty/null-if-empty.directive.js new file mode 100644 index 000000000..a767b18d0 --- /dev/null +++ b/www/src/app/core/directives/null-if-empty/null-if-empty.directive.js @@ -0,0 +1,20 @@ +'use strict'; + +export default function (app) { + app.directive('nullIfEmpty', nullIfEmpty); + + function nullIfEmpty() { + return { + require: 'ngModel', + link: function (scope, el, attrs, ngModel) { + ngModel.$parsers.push(function (value) { + return value === '' ? null : value; + }); + + ngModel.$formatters.push(function (value) { + return !value ? '' : value; + }); + } + }; + } +} diff --git a/www/src/app/pages/admin/organizations/components/analyzers/analyzer-config-form.html b/www/src/app/pages/admin/organizations/components/analyzers/analyzer-config-form.html index a907ccf10..687b1cbe0 100644 --- a/www/src/app/pages/admin/organizations/components/analyzers/analyzer-config-form.html +++ b/www/src/app/pages/admin/organizations/components/analyzers/analyzer-config-form.html @@ -63,13 +63,13 @@

- +
- +
diff --git a/www/src/app/pages/admin/organizations/components/analyzers/config-list.html b/www/src/app/pages/admin/organizations/components/analyzers/config-list.html index dd068cd71..a803c6af3 100644 --- a/www/src/app/pages/admin/organizations/components/analyzers/config-list.html +++ b/www/src/app/pages/admin/organizations/components/analyzers/config-list.html @@ -32,7 +32,7 @@

{{config.configurationItems.length}}

-
+

{{config.name === 'global' ? 'Global Configuration' : config.name}}

diff --git a/www/src/app/pages/admin/organizations/components/config-form.html b/www/src/app/pages/admin/organizations/components/config-form.html index 1af0d81a2..a25376dcd 100644 --- a/www/src/app/pages/admin/organizations/components/config-form.html +++ b/www/src/app/pages/admin/organizations/components/config-form.html @@ -7,7 +7,7 @@
- +
diff --git a/www/src/app/pages/admin/organizations/components/responders/config-list.html b/www/src/app/pages/admin/organizations/components/responders/config-list.html index b14dc77e6..e91140083 100644 --- a/www/src/app/pages/admin/organizations/components/responders/config-list.html +++ b/www/src/app/pages/admin/organizations/components/responders/config-list.html @@ -32,7 +32,7 @@

{{config.configurationItems.length}}

-
+

{{config.name === 'global' ? 'Global Configuration' : config.name}}

diff --git a/www/src/app/pages/admin/organizations/components/responders/responder-config-form.html b/www/src/app/pages/admin/organizations/components/responders/responder-config-form.html index 0972d6cfd..b2bb90878 100644 --- a/www/src/app/pages/admin/organizations/components/responders/responder-config-form.html +++ b/www/src/app/pages/admin/organizations/components/responders/responder-config-form.html @@ -70,13 +70,13 @@

- +
- +
diff --git a/www/src/app/pages/admin/organizations/components/responders/responders-list.controller.js b/www/src/app/pages/admin/organizations/components/responders/responders-list.controller.js index b9c794909..8320d06ff 100644 --- a/www/src/app/pages/admin/organizations/components/responders/responders-list.controller.js +++ b/www/src/app/pages/admin/organizations/components/responders/responders-list.controller.js @@ -33,9 +33,7 @@ export default class OrganizationRespondersController { $onInit() { this.activeResponders = _.keyBy(this.responders, 'workerDefinitionId'); this.definitionsIds = _.keys(this.responderDefinitions).sort(); - this.invalidResponders = _.filter(this.responders, a => - _.isEmpty(a.dataTypeList) - ); + this.obsoleteResponders = _.filter(this.responders, a => !this.definitionsIds.includes(a.workerDefinitionId)); } openModal(mode, definition, responder) {