diff --git a/.gitignore b/.gitignore index 9c5272c..444b667 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ target/ target-shaded/ project/sbt-launch-*.jar .idea/ +.DS_Store .bsp/ diff --git a/.jvmopts b/.jvmopts index d4a3746..b2cb4a3 100644 --- a/.jvmopts +++ b/.jvmopts @@ -1,10 +1,4 @@ --Dfile.encoding=UTF8 --Xms1G --Xmx4G --XX:ReservedCodeCacheSize=250M --XX:+TieredCompilation --XX:-UseGCOverheadLimit -# effectively adds GC to Perm space --XX:+CMSClassUnloadingEnabled -# must be enabled for CMSClassUnloadingEnabled to work --XX:+UseConcMarkSweepGC +-Xms512M +-Xmx4096M +-Xss2M +-XX:MaxMetaspaceSize=1024M diff --git a/.scalafmt.conf b/.scalafmt.conf index 1818d46..63f357d 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,2 +1,3 @@ +version = "2.6.4" style = default maxColumn = 100 diff --git a/build.sbt b/build.sbt index 844c6dd..725b7c8 100644 --- a/build.sbt +++ b/build.sbt @@ -1,8 +1,8 @@ lazy val commonSettings = Seq( organization := "com.whisk", - version := "0.9.9", + version := "0.10.0-RC", scalaVersion := "2.13.6", - crossScalaVersions := Seq("2.13.6", "2.12.15", "2.11.12", "3.0.2"), + crossScalaVersions := Seq("2.13.6", "2.12.15", "2.11.12"), scalacOptions ++= Seq("-feature", "-deprecation"), Test / fork := true, licenses += ("MIT", url("http://opensource.org/licenses/MIT")), @@ -36,53 +36,29 @@ lazy val root = .in(file(".")) .settings(commonSettings: _*) .settings(publish := {}, publishLocal := {}, packagedArtifacts := Map.empty) - .aggregate(core, - testkitSpotifyImpl, - testkitSpotifyShadedImpl, - testkitDockerJavaImpl, - config, - scalatest, - specs2, - samples) + .aggregate(core, scalatest, samples, coreShaded) lazy val core = project - .settings(commonSettings: _*) - .settings(name := "docker-testkit-core", - libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.22") - -lazy val testkitSpotifyImpl = - project - .in(file("impl/spotify")) - .settings(commonSettings: _*) - .settings(name := "docker-testkit-impl-spotify", - libraryDependencies ++= - Seq("com.spotify" % "docker-client" % "8.11.5", - "com.google.code.findbugs" % "jsr305" % "3.0.1")) - .dependsOn(core) - -lazy val testkitSpotifyShadedImpl = - project - .in(file("impl/spotify")) .settings(commonSettings: _*) .settings( - name := "docker-testkit-impl-spotify-shaded", - libraryDependencies ++= - Seq("com.spotify" % "docker-client" % "8.11.5" classifier "shaded", - "com.google.code.findbugs" % "jsr305" % "3.0.1"), - target := baseDirectory.value / "target-shaded" + name := "docker-testkit-core", + libraryDependencies ++= Seq( + "org.slf4j" % "slf4j-api" % "1.7.25", + "com.spotify" % "docker-client" % "8.16.0", + "com.google.code.findbugs" % "jsr305" % "3.0.1" + ) ) - .dependsOn(core) -lazy val testkitDockerJavaImpl = +lazy val scalatest = project - .in(file("impl/docker-java")) .settings(commonSettings: _*) .settings( - name := "docker-testkit-impl-docker-java", - libraryDependencies ++= - Seq("com.github.docker-java" % "docker-java" % "3.2.3", - "com.google.code.findbugs" % "jsr305" % "3.0.1") + name := "docker-testkit-scalatest", + libraryDependencies ++= Seq( + "org.scalatest" %% "scalatest" % "3.1.2", + "ch.qos.logback" % "logback-classic" % "1.2.3" % "test" + ) ) .dependsOn(core) @@ -90,45 +66,30 @@ lazy val samples = project .settings(commonSettings: _*) .settings(name := "docker-testkit-samples") - .dependsOn(core) + .dependsOn(core, scalatest) -lazy val scalatest = +lazy val tests = project .settings(commonSettings: _*) .settings( - name := "docker-testkit-scalatest", - libraryDependencies ++= - Seq( - "org.scalatest" %% "scalatest" % "3.2.9", - "ch.qos.logback" % "logback-classic" % "1.2.1" % "test", - "org.postgresql" % "postgresql" % "9.4.1210" % "test", - "javax.activation" % "activation" % "1.1.1" % "test" - ) + name := "docker-testkit-tests", + libraryDependencies ++= Seq( + "org.postgresql" % "postgresql" % "42.1.4" % "test", + "mysql" % "mysql-connector-java" % "5.1.44" % "test" + ) ) - .dependsOn(core, testkitSpotifyShadedImpl % "test", testkitDockerJavaImpl % "test", samples % "test") + .dependsOn(core, scalatest, samples % "test") -lazy val specs2 = +lazy val coreShaded = project + .in(file("core")) .settings(commonSettings: _*) .settings( - name := "docker-testkit-specs2", + name := "docker-testkit-core-shaded", libraryDependencies ++= Seq( - "org.specs2" %% "specs2-core" % (if (scalaVersion.value.startsWith("2.1")) "4.5.1" - else "5.0.0-RC-11"), - "ch.qos.logback" % "logback-classic" % "1.2.1" % "test", - "org.postgresql" % "postgresql" % "9.4.1210" % "test", - "javax.activation" % "activation" % "1.1.1" % "test" - ) - ) - .dependsOn(core, samples % "test", testkitDockerJavaImpl % "test") - -lazy val config = - project - .settings(commonSettings: _*) - .settings( - name := "docker-testkit-config", - libraryDependencies ++= - Seq("com.typesafe" % "config" % "1.4.1", "org.scalatest" %% "scalatest" % "3.2.9" % "test") + "com.spotify" % "docker-client" % "8.16.0" classifier "shaded", + "com.google.code.findbugs" % "jsr305" % "3.0.1" + ), + target := baseDirectory.value / "target-shaded" ) - .dependsOn(core, testkitDockerJavaImpl) diff --git a/config/src/main/scala/com/whisk/docker/config/DockerKitConfig.scala b/config/src/main/scala/com/whisk/docker/config/DockerKitConfig.scala deleted file mode 100644 index 80c527f..0000000 --- a/config/src/main/scala/com/whisk/docker/config/DockerKitConfig.scala +++ /dev/null @@ -1,69 +0,0 @@ -package com.whisk.docker.config - -import scala.collection.JavaConverters._ - -import com.typesafe.config.{Config, ConfigFactory} -import com.whisk.docker.{DockerContainer, VolumeMapping} -import com.whisk.docker.config.DockerTypesafeConfig._ -import com.whisk.docker.impl.dockerjava.DockerKitDockerJava - -trait DockerKitConfig extends DockerKitDockerJava { - def dockerConfig = ConfigFactory.load() - - implicit class RichConfig(c: Config) { - def getOpt[T](fn: (Config, String) => T, path: String): Option[T] = - if (c.hasPath(path)) Some(fn(c, path)) else None - } - - private def toPortMapping(c: Config): DockerConfigPortMap = DockerConfigPortMap( - c.getInt("internal"), - c.getOpt(_ getInt _, "external") - ) - private def toPortMappings(c: Config): Map[String, DockerConfigPortMap] = - c.entrySet() - .asScala - .map { e => - val k = e.getKey.split('.').head - k -> toPortMapping(c.getConfig(k)) - } - .toMap - private def toReadyCheckerLooped(c: Config): DockerConfigReadyCheckerLooped = - DockerConfigReadyCheckerLooped(c.getInt("attempts"), c.getInt("delay")) - private def toReadyResponse(c: Config): DockerConfigHttpResponseReady = - DockerConfigHttpResponseReady( - c.getInt("port"), - c.getOpt(_ getString _, "path") getOrElse "/", - c.getOpt(_ getString _, "host"), - c.getOpt(_ getInt _, "code").getOrElse(200), - c.getOpt(_ getInt _, "within"), - c.getOpt((c, s) => toReadyCheckerLooped(c.getConfig(s)), "looped") - ) - private def toReadyChecker(c: Config): DockerConfigReadyChecker = DockerConfigReadyChecker( - c.getOpt(_ getString _, "log-line"), - c.getOpt((c, s) => toReadyResponse(c.getConfig(s)), "http-response-code") - ) - private def toVolumeMaps(c: Config): VolumeMapping = - VolumeMapping(c.getString("host"), - c.getString("container"), - c.getOpt(_ getBoolean _, "rw").getOrElse(false)) - private def toDockerConfig(c: Config): DockerConfig = DockerConfig( - c.getString("image-name"), - c.getOpt(_ getString _, "container-name"), - c.getOpt((c, s) => c.getStringList(s).asScala.toSeq, "command"), - c.getOpt((c, s) => c.getStringList(s).asScala.toSeq, "entrypoint"), - c.getOpt((c, s) => c.getStringList(s).asScala.toSeq, "environmental-variables") - .getOrElse(Nil), - c.getOpt((c, s) => toPortMappings(c.getConfig(s)), "port-maps"), - c.getOpt((c, s) => toReadyChecker(c.getConfig(s)), "ready-checker"), - c.getOpt((c, s) => c.getConfigList(s).asScala.toSeq.map(toVolumeMaps), "volume-maps") - .getOrElse(Nil), - c.getOpt(_ getLong _, "memory"), - c.getOpt(_ getLong _, "memory-reservation") - ) - def configureDockerContainer(configurationName: String): DockerContainer = { - val config = - if (configurationName == "." || configurationName == "") dockerConfig - else dockerConfig.getConfig(configurationName) - toDockerConfig(config).toDockerContainer() - } -} diff --git a/config/src/main/scala/com/whisk/docker/config/DockerTypesafeConfig.scala b/config/src/main/scala/com/whisk/docker/config/DockerTypesafeConfig.scala deleted file mode 100644 index 0b3a223..0000000 --- a/config/src/main/scala/com/whisk/docker/config/DockerTypesafeConfig.scala +++ /dev/null @@ -1,90 +0,0 @@ -package com.whisk.docker.config - -import com.whisk.docker.impl.dockerjava.DockerKitDockerJava -import com.whisk.docker.{ - DockerContainer, - DockerPortMapping, - DockerReadyChecker, - HostConfig, - VolumeMapping -} - -import scala.concurrent.duration._ - -object DockerTypesafeConfig extends DockerKitDockerJava { - val EmptyPortBindings: Map[Int, Option[Int]] = Map.empty - val AlwaysReady = DockerReadyChecker.Always - - case class DockerConfigPortMap(internal: Int, external: Option[Int]) { - def asTuple = (internal, external) - } - - case class DockerConfigReadyCheckerLooped(attempts: Int, delay: Int) - - case class DockerConfigHttpResponseReady(port: Int, - path: String = "/", - host: Option[String], - code: Int = 200, - within: Option[Int], - looped: Option[DockerConfigReadyCheckerLooped]) - - case class DockerConfigReadyChecker(`log-line`: Option[String], - `http-response-code`: Option[DockerConfigHttpResponseReady]) { - - def httpResponseCodeReadyChecker(rr: DockerConfigHttpResponseReady) = { - val codeChecker: DockerReadyChecker = - DockerReadyChecker.HttpResponseCode(rr.port, rr.path, rr.host, rr.code) - val within = rr.within.fold(codeChecker)(w => codeChecker.within(w.millis)) - rr.looped.fold(within)(l => within.looped(l.attempts, l.delay.millis)) - } - - // log line checker takes priority - def toReadyChecker = { - (`log-line`, `http-response-code`) match { - case (None, None) => DockerReadyChecker.Always - case (None, Some(rr)) => httpResponseCodeReadyChecker(rr) - case (Some(ll), _) => DockerReadyChecker.LogLineContains(ll) - } - } - } - - case class DockerConfig(`image-name`: String, - `container-name`: Option[String], - command: Option[Seq[String]], - entrypoint: Option[Seq[String]], - `environmental-variables`: Seq[String] = Seq.empty, - `port-maps`: Option[Map[String, DockerConfigPortMap]], - `ready-checker`: Option[DockerConfigReadyChecker], - `volume-maps`: Seq[VolumeMapping] = Seq.empty, - memory: Option[Long], - `memory-reservation`: Option[Long], - privileged: Boolean = false) { - - def toDockerContainer(): DockerContainer = { - val bindPorts = `port-maps`.fold(EmptyPortBindings) { _.values.map(_.asTuple).toMap }.toSeq.map { - case (internalPort, maybeHostPort) => - internalPort -> DockerPortMapping(maybeHostPort) - }.toMap - - val readyChecker = `ready-checker`.fold[DockerReadyChecker](AlwaysReady) { _.toReadyChecker } - - val hostConfig = HostConfig( - memory = memory, - memoryReservation = `memory-reservation`, - privileged = privileged - ) - - DockerContainer( - image = `image-name`, - name = `container-name`, - command = command, - entrypoint = entrypoint, - bindPorts = bindPorts, - env = `environmental-variables`, - readyChecker = readyChecker, - volumeMappings = `volume-maps`, - hostConfig = Some(hostConfig) - ) - } - } -} diff --git a/config/src/test/resources/application.conf b/config/src/test/resources/application.conf deleted file mode 100644 index afe4988..0000000 --- a/config/src/test/resources/application.conf +++ /dev/null @@ -1,123 +0,0 @@ -docker { - - postgres { - image-name = "postgres:9.4.4" - environmental-variables = ["POSTGRES_USER=nph", "POSTGRES_PASSWORD=suitup"] - ready-checker { - log-line = "database system is ready to accept connections" - } - port-maps { - default-postgres-port { - internal = 5432 - } - } - } - - zookeeper { - image-name = "jplock/zookeeper:3.4.6" - ready-checker { - log-line = "binding to port" - } - port-maps { - default-zookeeper-port { - internal = 2181 - } - } - } - - cassandra { - image-name = "whisk/cassandra:2.1.8" - ready-checker { - log-line = "Starting listening for CQL clients on" - } - port-maps { - default-cql-port { - internal = 9042 - } - } - volume-maps = [ - { - container = "/opt/data" - host = "/opt/docker/data" - }, - { - container = "/opt/log" - host = "/opt/docker/log" - rw = true - } - ] - } - - mongodb { - image-name = "mongo:3.0.6" - command = ["mongod", "--nojournal", "--smallfiles", "--syncdelay", "0"] - ready-checker { - log-line = "waiting for connections on port" - } - port-maps { - default-mongodb-port { - internal = 27017 - } - } - } - - neo4j { - image-name = "whisk/neo4j:2.1.8" - ready-checker { - http-response-code { - port = 7474 - path = "/db/data/" - within = 100 - looped { - attempts = 20 - delay = 1250 - } - } - } - port-maps { - default-neo4j-port { - internal = 7474 - } - } - } - - elasticsearch { - image-name = "elasticsearch:1.7.1" - entrypoint = [ "my", "custom", "entrypoint" ] - memory = 536870912 # 512MB - memory-reservation = 268435456 # 256MB - ready-checker { - http-response-code { - port = 9200 - path = "/" - within = 100 - looped { - attempts = 20 - delay = 1250 - } - } - } - port-maps { - default-elasticsearch-http-port { - internal = 9200 - } - default-elasticsearch-client-port { - internal = 9300 - } - } - } - - kafka { - image-name = "wurstmeister/kafka:0.8.2.1" - environmental-variables = ["KAFKA_ADVERTISED_PORT=9092", KAFKA_ADVERTISED_HOST_NAME"="${?DOCKER_IP}] - ready-checker { - log-line = "started (kafka.server.KafkaServer)" - } - port-maps { - default-kafka-port { - internal = 9092 - external = 9092 - } - } - } -} diff --git a/config/src/test/scala/com/whisk/docker/config/test/DockerConfigSpec.scala b/config/src/test/scala/com/whisk/docker/config/test/DockerConfigSpec.scala deleted file mode 100644 index 2ffeb82..0000000 --- a/config/src/test/scala/com/whisk/docker/config/test/DockerConfigSpec.scala +++ /dev/null @@ -1,56 +0,0 @@ -package com.whisk.docker.config.test - -import com.whisk.docker.{DockerContainer, DockerReadyChecker, HostConfig, VolumeMapping} -import com.whisk.docker.config.DockerKitConfig - -import scala.concurrent.duration._ -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers - -class DockerConfigSpec extends AnyFlatSpec with Matchers with DockerKitConfig { - - "Config-based configurations" should "produce same containers as code-based ones" in { - val volumes = Seq( - VolumeMapping(container = "/opt/data", host = "/opt/docker/data", rw = false), - VolumeMapping(container = "/opt/log", host = "/opt/docker/log", rw = true) - ) - - val cassandraExpected = DockerContainer("whisk/cassandra:2.1.8") - .withPorts(9042 -> None) - .withReadyChecker(DockerReadyChecker.LogLineContains("Starting listening for CQL clients on")) - .withVolumes(volumes) - .withHostConfig(HostConfig(None, None, None)) - - configureDockerContainer("docker.cassandra") shouldBe cassandraExpected - - val postgresExpected = DockerContainer("postgres:9.4.4") - .withPorts((5432, None)) - .withEnv(s"POSTGRES_USER=nph", s"POSTGRES_PASSWORD=suitup") - .withReadyChecker( - DockerReadyChecker.LogLineContains("database system is ready to accept connections")) - .withHostConfig(HostConfig(None, None, None)) - - configureDockerContainer("docker.postgres") shouldBe postgresExpected - - val mongodbExpected = DockerContainer("mongo:3.0.6") - .withPorts(27017 -> None) - .withReadyChecker(DockerReadyChecker.LogLineContains("waiting for connections on port")) - .withCommand("mongod", "--nojournal", "--smallfiles", "--syncdelay", "0") - .withHostConfig(HostConfig(None, None, None)) - - configureDockerContainer("docker.mongodb") shouldBe mongodbExpected - - val elasticExpected = DockerContainer("elasticsearch:1.7.1") - .withEntrypoint("my", "custom", "entrypoint") - .withPorts(9200 -> None, 9300 -> None) - .withHostConfig(HostConfig(memory = Some(536870912), memoryReservation = Some(268435456))) - .withReadyChecker( - DockerReadyChecker - .HttpResponseCode(9200, "/") - .within(100.millis) - .looped(20, 1250.millis) - ) - - configureDockerContainer("docker.elasticsearch") shouldBe elasticExpected - } -} diff --git a/core/src/main/scala/com/whisk/docker/DockerCommandExecutor.scala b/core/src/main/scala/com/whisk/docker/DockerCommandExecutor.scala deleted file mode 100644 index 1f0b41e..0000000 --- a/core/src/main/scala/com/whisk/docker/DockerCommandExecutor.scala +++ /dev/null @@ -1,57 +0,0 @@ -package com.whisk.docker - -import scala.concurrent.duration.Duration -import scala.concurrent.{ExecutionContext, Future} - -object PortProtocol extends Enumeration { - val TCP, UDP = Value -} - -case class ContainerPort(port: Int, protocol: PortProtocol.Value) - -object ContainerPort { - def parse(str: String) = { - val Array(p, rest @ _*) = str.split("/") - val proto = rest.headOption - .flatMap(pr => PortProtocol.values.find(_.toString.equalsIgnoreCase(pr))) - .getOrElse(PortProtocol.TCP) - ContainerPort(p.toInt, proto) - } -} - -case class PortBinding(hostIp: String, hostPort: Int) - -case class InspectContainerResult(running: Boolean, - ports: Map[ContainerPort, Seq[PortBinding]], - name: String, - ipAddresses: Seq[String]) - -trait DockerCommandExecutor { - - def host: String - - def createContainer(spec: DockerContainer)(implicit ec: ExecutionContext, timeout: Duration): Future[String] - - def startContainer(id: String)(implicit ec: ExecutionContext, timeout: Duration): Future[Unit] - - def inspectContainer(id: String)( - implicit ec: ExecutionContext, timeout: Duration): Future[Option[InspectContainerResult]] - - def withLogStreamLines(id: String, withErr: Boolean)(f: String => Unit)( - implicit docker: DockerCommandExecutor, - ec: ExecutionContext, timeout: Duration - ): Unit - - def withLogStreamLinesRequirement(id: String, withErr: Boolean)(f: String => Boolean)( - implicit docker: DockerCommandExecutor, - ec: ExecutionContext, timeout: Duration): Future[Unit] - - def listImages()(implicit ec: ExecutionContext, timeout: Duration): Future[Set[String]] - - def pullImage(image: String)(implicit ec: ExecutionContext, timeout: Duration): Future[Unit] - - def remove(id: String, force: Boolean = true, removeVolumes: Boolean = true)( - implicit ec: ExecutionContext, timeout: Duration): Future[Unit] - - def close(): Unit -} diff --git a/core/src/main/scala/com/whisk/docker/DockerContainer.scala b/core/src/main/scala/com/whisk/docker/DockerContainer.scala deleted file mode 100644 index a08819e..0000000 --- a/core/src/main/scala/com/whisk/docker/DockerContainer.scala +++ /dev/null @@ -1,82 +0,0 @@ -package com.whisk.docker - -case class VolumeMapping(host: String, container: String, rw: Boolean = false) - -case class ContainerLink(container: DockerContainer, alias: String) { - require(container.name.nonEmpty, "Container must have a name") -} - -case class LogLineReceiver(withErr: Boolean, f: String => Unit) - -case class DockerPortMapping(hostPort: Option[Int] = None, address: String = "0.0.0.0") - -case class HostConfig( - tmpfs: Option[Map[String, String]] = None, - /** - * the hard limit on memory usage (in bytes) - */ - memory: Option[Long] = None, - /** - * the soft limit on memory usage (in bytes) - */ - memoryReservation: Option[Long] = None, - - /** - * whether to run in privileged mode - */ - privileged: Boolean = false - -) - -case class DockerContainer(image: String, - name: Option[String] = None, - command: Option[Seq[String]] = None, - entrypoint: Option[Seq[String]] = None, - bindPorts: Map[Int, DockerPortMapping] = Map.empty, - tty: Boolean = false, - stdinOpen: Boolean = false, - links: Seq[ContainerLink] = Seq.empty, - unlinkedDependencies: Seq[DockerContainer] = Seq.empty, - env: Seq[String] = Seq.empty, - networkMode: Option[String] = None, - readyChecker: DockerReadyChecker = DockerReadyChecker.Always, - volumeMappings: Seq[VolumeMapping] = Seq.empty, - logLineReceiver: Option[LogLineReceiver] = None, - user: Option[String] = None, - hostname: Option[String] = None, - hostConfig: Option[HostConfig] = None) { - - def withCommand(cmd: String*) = copy(command = Some(cmd)) - - def withEntrypoint(entrypoint: String*) = copy(entrypoint = Some(entrypoint)) - - def withPorts(ps: (Int, Option[Int])*) = - copy(bindPorts = ps.map { case (internalPort, hostPort) => internalPort -> DockerPortMapping(hostPort) }.toMap) - - def withPortMapping(ps: (Int, DockerPortMapping)*) = copy(bindPorts = ps.toMap) - - def withLinks(links: ContainerLink*) = copy(links = links.toSeq) - - def withUnlinkedDependencies(unlinkedDependencies: DockerContainer*) = - copy(unlinkedDependencies = unlinkedDependencies.toSeq) - - def dependencies: Seq[DockerContainer] = links.map(_.container) ++ unlinkedDependencies - - def withReadyChecker(checker: DockerReadyChecker) = copy(readyChecker = checker) - - def withEnv(env: String*) = copy(env = env) - - def withNetworkMode(networkMode: String) = copy(networkMode = Some(networkMode)) - - def withVolumes(volumeMappings: Seq[VolumeMapping]) = copy(volumeMappings = volumeMappings) - - def withLogLineReceiver(logLineReceiver: LogLineReceiver) = - copy(logLineReceiver = Some(logLineReceiver)) - - def withUser(user: String) = copy(user = Some(user)) - - def withHostname(hostname: String) = copy(hostname = Some(hostname)) - - def withHostConfig(hostConfig: HostConfig) = copy(hostConfig = Some(hostConfig)) - -} diff --git a/core/src/main/scala/com/whisk/docker/DockerContainerManager.scala b/core/src/main/scala/com/whisk/docker/DockerContainerManager.scala deleted file mode 100644 index 00a7128..0000000 --- a/core/src/main/scala/com/whisk/docker/DockerContainerManager.scala +++ /dev/null @@ -1,126 +0,0 @@ -package com.whisk.docker - -import org.slf4j.LoggerFactory - -import scala.annotation.tailrec -import scala.concurrent.{Await, ExecutionContext, Future} -import scala.language.postfixOps -import scala.concurrent.duration._ - -class DockerContainerManager(containers: Seq[DockerContainer], executor: DockerCommandExecutor)( - implicit ec: ExecutionContext) { - - private lazy val log = LoggerFactory.getLogger(this.getClass) - private implicit val dockerExecutor: DockerCommandExecutor = executor - - private val dockerStatesMap: Map[DockerContainer, DockerContainerState] = - containers.map(c => c -> new DockerContainerState(c)).toMap - - val states = dockerStatesMap.values.toList - - def getContainerState(container: DockerContainer): DockerContainerState = { - dockerStatesMap(container) - } - - def isReady(container: DockerContainer): Future[Boolean] = { - dockerStatesMap(container).isReady() - } - - def pullImages(timeout: Duration): Future[Seq[String]] = { - implicit val tout = timeout - executor.listImages().flatMap { images => - val imagesToPull: Seq[String] = containers.map(_.image).filterNot { image => - val cImage = if (image.contains(":")) image else image + ":latest" - images(cImage) - } - Future.traverse(imagesToPull)(i => executor.pullImage(i)).map(_ => imagesToPull) - } - } - - def initReadyAll( - containerStartTimeout: Duration): Future[Seq[(DockerContainerState, Boolean)]] = { - import DockerContainerManager._ - - implicit val timeout = containerStartTimeout - - @tailrec - def initGraph(graph: ContainerDependencyGraph, - previousInits: Future[Seq[DockerContainerState]] = Future.successful(Seq.empty)) - : Future[Seq[DockerContainerState]] = { - - val initializedContainers = previousInits.flatMap { prev => - Future.traverse(graph.containers.map(dockerStatesMap))(_.init()).map(prev ++ _) - } - - graph.dependants match { - case None => initializedContainers - case Some(dependants) => - val readyInits: Future[Seq[Future[Boolean]]] = - initializedContainers.map(_.map(state => state.isReady())) - val simplifiedReadyInits: Future[Seq[Boolean]] = readyInits.flatMap(Future.sequence(_)) - - Await.result(simplifiedReadyInits, containerStartTimeout) - initGraph(dependants, initializedContainers) - } - } - - initGraph(buildDependencyGraph(containers)).flatMap(Future.traverse(_) { c => - c.isReady().map(c -> _).recover { - case e => - log.error(e.getMessage, e) - c -> false - } - }) - } - - def stopRmAll(timeout: Duration): Future[Unit] = { - implicit val tout = timeout - val future = Future.traverse(states)(_.remove(force = true, removeVolumes = true)).map(_ => ()) - future.onComplete { _ => - executor.close() - } - future - } - -} - -object DockerContainerManager { - case class ContainerDependencyGraph(containers: Seq[DockerContainer], - dependants: Option[ContainerDependencyGraph] = None) - - def buildDependencyGraph(containers: Seq[DockerContainer]): ContainerDependencyGraph = { - @tailrec def buildDependencyGraph(graph: ContainerDependencyGraph): ContainerDependencyGraph = - graph match { - case ContainerDependencyGraph(containers, dependants) => - containers.partition(_.dependencies.isEmpty) match { - case (containersWithoutLinks, Nil) => graph - case (containersWithoutLinks, containersWithLinks) => - val linkedContainers = containers.foldLeft(Seq[DockerContainer]()) { - case (links, container) => (links ++ container.dependencies) - } - val (containersWithLinksAndLinked, containersWithLinksNotLinked) = - containersWithLinks.partition(linkedContainers.contains) - val (containersToBeLeftAtCurrentPosition, containersToBeMovedUpALevel) = dependants - .map(_.containers) - .getOrElse(List.empty) - .partition( - _.dependencies.exists(containersWithLinksNotLinked.contains) - ) - - buildDependencyGraph( - ContainerDependencyGraph( - containers = containersWithoutLinks ++ containersWithLinksAndLinked, - dependants = Some( - ContainerDependencyGraph( - containers = containersWithLinksNotLinked ++ containersToBeMovedUpALevel, - dependants = - dependants.map(_.copy(containers = containersToBeLeftAtCurrentPosition)) - )) - ) - ) - } - } - - buildDependencyGraph(ContainerDependencyGraph(containers)) - } -} diff --git a/core/src/main/scala/com/whisk/docker/DockerContainerState.scala b/core/src/main/scala/com/whisk/docker/DockerContainerState.scala deleted file mode 100644 index 7cbce04..0000000 --- a/core/src/main/scala/com/whisk/docker/DockerContainerState.scala +++ /dev/null @@ -1,117 +0,0 @@ -package com.whisk.docker - -import java.util.concurrent.atomic.AtomicBoolean - -import org.slf4j.LoggerFactory - -import scala.concurrent.duration.Duration -import scala.concurrent.{ExecutionContext, Future, Promise} - -class DockerContainerState(spec: DockerContainer) { - - private lazy val log = LoggerFactory.getLogger(this.getClass) - - class SinglePromise[T] { - val promise = Promise[T]() - - def future = promise.future - - val flag = new AtomicBoolean(false) - - def init(f: => Future[T]): Future[T] = { - if (!flag.getAndSet(true)) { - promise.tryCompleteWith(f) - } - future - } - } - - object SinglePromise { - def apply[T] = new SinglePromise[T] - } - - private val _id = SinglePromise[String] - - def id: Future[String] = _id.future - - private val _image = SinglePromise[Unit] - - private val _isReady = SinglePromise[Boolean] - - def isReady(): Future[Boolean] = _isReady.future - - def isRunning()(implicit docker: DockerCommandExecutor, ec: ExecutionContext, timeout: Duration): Future[Boolean] = { - getRunningContainer().map(_.isDefined) - } - - def init()(implicit docker: DockerCommandExecutor, ec: ExecutionContext, timeout: Duration): Future[this.type] = { - for { - s <- _id.init(docker.createContainer(spec)) - _ <- docker.startContainer(s) - } yield { - spec.logLineReceiver.foreach { - case LogLineReceiver(withErr, f) => docker.withLogStreamLines(s, withErr)(f) - } - runReadyCheck() - this - } - } - - private def runReadyCheck()(implicit docker: DockerCommandExecutor, - ec: ExecutionContext, timeout: Duration): Future[Boolean] = { - - _isReady.init( - (for { - r <- isRunning() if r - b <- spec.readyChecker(this) if b - } yield b) recoverWith { - case _: NoSuchElementException => - log.error("Not ready: " + { - spec.image - }) - Future.successful(false) - case e => - log.error(e.getMessage, e) - Future.successful(false) - } - ) - } - - protected def getRunningContainer()( - implicit docker: DockerCommandExecutor, - ec: ExecutionContext, timeout: Duration): Future[Option[InspectContainerResult]] = - id.flatMap(docker.inspectContainer) - - def getName()(implicit docker: DockerCommandExecutor, ec: ExecutionContext, timeout: Duration): Future[String] = - getRunningContainer().flatMap { - case Some(res) => Future.successful(res.name) - case None => Future.failed(new RuntimeException(s"Container ${spec.image} is not running")) - } - - def getIpAddresses()(implicit docker: DockerCommandExecutor, - ec: ExecutionContext, timeout: Duration): Future[Seq[String]] = getRunningContainer().flatMap { - case Some(res) => Future.successful(res.ipAddresses) - case None => Future.failed(new RuntimeException(s"Container ${spec.image} is not running")) - } - - private val _ports = SinglePromise[Map[Int, Int]] - - def getPorts()(implicit docker: DockerCommandExecutor, - ec: ExecutionContext, timeout: Duration): Future[Map[Int, Int]] = { - def portsFuture: Future[Map[Int, Int]] = getRunningContainer().flatMap { - case None => Future.failed(new RuntimeException(s"Container ${spec.image} is not running")) - case Some(c) => - val ports: Map[Int, Int] = c.ports.collect { - case (exposedPort, Seq(binding, _*)) => - exposedPort.port -> binding.hostPort - } - Future.successful(ports) - } - _ports.init(portsFuture) - } - - def remove(force: Boolean = true, removeVolumes: Boolean = true)( - implicit docker: DockerCommandExecutor, - ec: ExecutionContext, timeout: Duration): Future[Unit] = - id.flatMap(x => docker.remove(x, force, removeVolumes)) -} diff --git a/core/src/main/scala/com/whisk/docker/DockerFactory.scala b/core/src/main/scala/com/whisk/docker/DockerFactory.scala deleted file mode 100644 index 6673924..0000000 --- a/core/src/main/scala/com/whisk/docker/DockerFactory.scala +++ /dev/null @@ -1,6 +0,0 @@ -package com.whisk.docker - -trait DockerFactory { - - def createExecutor(): DockerCommandExecutor -} diff --git a/core/src/main/scala/com/whisk/docker/DockerKit.scala b/core/src/main/scala/com/whisk/docker/DockerKit.scala deleted file mode 100644 index 078dda9..0000000 --- a/core/src/main/scala/com/whisk/docker/DockerKit.scala +++ /dev/null @@ -1,72 +0,0 @@ -package com.whisk.docker - -import java.util.concurrent.Executors - -import org.slf4j.LoggerFactory - -import scala.concurrent.duration._ -import scala.concurrent.{Await, ExecutionContext, Future} -import scala.language.implicitConversions - -trait DockerKit { - implicit def dockerFactory: DockerFactory - - private lazy val log = LoggerFactory.getLogger(this.getClass) - - val PullImagesTimeout = 20.minutes - val StartContainersTimeout = 20.seconds - val StopContainersTimeout = 10.seconds - - def dockerContainers: List[DockerContainer] = Nil - - // we need ExecutionContext in order to run docker.init() / docker.stop() there - implicit lazy val dockerExecutionContext: ExecutionContext = { - // using Math.max to prevent unexpected zero length of docker containers - ExecutionContext.fromExecutor( - Executors.newFixedThreadPool(Math.max(1, dockerContainers.length * 2))) - } - implicit lazy val dockerExecutor: DockerCommandExecutor = dockerFactory.createExecutor() - - lazy val containerManager = new DockerContainerManager(dockerContainers, dockerExecutor) - - def isContainerReady(container: DockerContainer): Future[Boolean] = - containerManager.isReady(container) - - def getContainerState(container: DockerContainer): DockerContainerState = { - containerManager.getContainerState(container) - } - - implicit def containerToState(c: DockerContainer): DockerContainerState = { - getContainerState(c) - } - - def startAllOrFail(): Unit = { - Await.result(containerManager.pullImages(PullImagesTimeout), PullImagesTimeout) - val allRunning: Boolean = { - val future: Future[Boolean] = - containerManager.initReadyAll(StartContainersTimeout).map(_.map(_._2).forall(identity)).recover { - case e: Exception => - log.error("Exception during container initialization", e) - false - } - sys.addShutdownHook( - Await.ready(containerManager.stopRmAll(StopContainersTimeout), StopContainersTimeout) - ) - Await.result(future, StartContainersTimeout) - } - if (!allRunning) { - Await.ready(containerManager.stopRmAll(StopContainersTimeout), StopContainersTimeout) - throw new RuntimeException("Cannot run all required containers") - } - } - - def stopAllQuietly(): Unit = { - try { - Await.ready(containerManager.stopRmAll(StopContainersTimeout), StopContainersTimeout) - } catch { - case e: Throwable => - log.error(e.getMessage, e) - } - } - -} diff --git a/core/src/main/scala/com/whisk/docker/DockerReadyChecker.scala b/core/src/main/scala/com/whisk/docker/DockerReadyChecker.scala deleted file mode 100644 index 724737f..0000000 --- a/core/src/main/scala/com/whisk/docker/DockerReadyChecker.scala +++ /dev/null @@ -1,181 +0,0 @@ -package com.whisk.docker - -import java.net.{HttpURLConnection, URL} -import java.util.{Timer, TimerTask} - -import scala.concurrent.duration.{Duration, FiniteDuration} -import scala.concurrent.{ExecutionContext, Future, Promise, TimeoutException} - -trait DockerReadyChecker { - - def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor, - ec: ExecutionContext, timeout: Duration): Future[Boolean] - - @deprecated("this method will be removed. Use DockerReadyChecker.And(a, b)", "0.9.6") - def and(other: DockerReadyChecker): DockerReadyChecker = { - DockerReadyChecker.And(this, other) - } - - @deprecated("this method will be removed. Use DockerReadyChecker.Or(a, b)", "0.9.6") - def or(other: DockerReadyChecker): DockerReadyChecker = { - DockerReadyChecker.Or(this, other) - } - - def within(duration: FiniteDuration): DockerReadyChecker = { - DockerReadyChecker.TimeLimited(this, duration) - } - - def looped(attempts: Int, delay: FiniteDuration): DockerReadyChecker = { - DockerReadyChecker.Looped(this, attempts, delay) - } -} - -object RetryUtils { - - def withDelay[T](delay: Long)(f: => Future[T]): Future[T] = { - val timer = new Timer() - val promise = Promise[T]() - timer.schedule(new TimerTask { - override def run(): Unit = { - promise.completeWith(f) - timer.cancel() - } - }, delay) - promise.future - } - - def runWithin[T](future: => Future[T], deadline: FiniteDuration)( - implicit ec: ExecutionContext): Future[T] = { - val bail = Promise[T]() - withDelay(deadline.toMillis)( - bail - .tryCompleteWith(Future.failed(new TimeoutException(s"timed out after $deadline"))) - .future) - Future.firstCompletedOf(future :: bail.future :: Nil) - } - - def looped[T](future: => Future[T], attempts: Int, delay: FiniteDuration)( - implicit ec: ExecutionContext): Future[T] = { - def attempt(rest: Int): Future[T] = { - future.recoverWith { - case e => - rest match { - case 0 => - Future.failed(e match { - case _: NoSuchElementException => - new NoSuchElementException( - s"Ready checker returned false after $attempts attempts, delayed $delay each") - case _ => e - }) - case n => - withDelay(delay.toMillis)(attempt(n - 1)) - } - } - } - - attempt(attempts) - } -} - -object DockerReadyChecker { - - case class And(r1: DockerReadyChecker, r2: DockerReadyChecker) extends DockerReadyChecker { - override def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor, - ec: ExecutionContext, timeout: Duration): Future[Boolean] = { - val aF = r1(container) - val bF = r2(container) - for { - a <- aF - b <- bF - } yield a && b - } - } - - case class Or(r1: DockerReadyChecker, r2: DockerReadyChecker) extends DockerReadyChecker { - override def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor, - ec: ExecutionContext, timeout: Duration): Future[Boolean] = { - val aF = r1(container) - val bF = r2(container) - val p = Promise[Boolean]() - aF.map { - case true => p.trySuccess(true) - case _ => - } - bF.map { - case true => p.trySuccess(true) - case _ => - } - p.future - } - } - - object Always extends DockerReadyChecker { - override def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor, - ec: ExecutionContext, timeout: Duration): Future[Boolean] = - Future.successful(true) - } - - case class HttpResponseCode(port: Int, - path: String = "/", - host: Option[String] = None, - code: Int = 200) - extends DockerReadyChecker { - override def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor, - ec: ExecutionContext, timeout: Duration): Future[Boolean] = { - container.getPorts().map(_(port)).flatMap { p => - val url = new URL("http", host.getOrElse(docker.host), p, path) - Future { - val con = url.openConnection().asInstanceOf[HttpURLConnection] - try { - con.getResponseCode == code - } catch { - case e: java.net.ConnectException => - false - } - } - } - } - } - - case class LogLineContains(str: String) extends DockerReadyChecker { - override def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor, - ec: ExecutionContext, timeout: Duration): Future[Boolean] = { - for { - id <- container.id - _ <- docker.withLogStreamLinesRequirement(id, withErr = true)(_.contains(str)) - } yield { - true - } - } - } - - private[docker] case class TimeLimited(underlying: DockerReadyChecker, duration: FiniteDuration) - extends DockerReadyChecker { - - override def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor, - ec: ExecutionContext, timeout: Duration): Future[Boolean] = { - RetryUtils.runWithin(underlying(container), duration).recover { - case _: TimeoutException => - false - } - } - } - - private[docker] case class Looped(underlying: DockerReadyChecker, - attempts: Int, - delay: FiniteDuration) - extends DockerReadyChecker { - - override def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor, - ec: ExecutionContext, timeout: Duration): Future[Boolean] = { - RetryUtils.looped(underlying(container).filter(identity), attempts, delay) - } - } - - case class F(f: DockerContainerState => Future[Boolean]) extends DockerReadyChecker { - override def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor, - ec: ExecutionContext, timeout: Duration): Future[Boolean] = - f(container) - } - -} diff --git a/core/src/main/scala/com/whisk/docker/package.scala b/core/src/main/scala/com/whisk/docker/package.scala deleted file mode 100644 index e869ab5..0000000 --- a/core/src/main/scala/com/whisk/docker/package.scala +++ /dev/null @@ -1,55 +0,0 @@ -package com.whisk - -import java.util.TimerTask -import java.util.concurrent.{Callable, CancellationException, FutureTask} - -import scala.concurrent.{ExecutionContext, Future, Promise, TimeoutException} -import scala.concurrent.duration.{Duration, FiniteDuration} -import scala.util.{Failure, Try} - -/** - * General utility functions - */ -package object docker { - implicit class OptionalOps[A](val content: A) extends AnyVal { - def withOption[B](optional: Option[B])(f: (A, B) => A): A = optional match { - case None => content - case Some(x) => f(content, x) - } - } - - private[docker] object PerishableFuture { - - def apply[T](body: => T)(implicit ec: ExecutionContext, timeout: Duration): Future[T] = timeout match { - case finiteTimeout: FiniteDuration => - val promise = Promise[T] - - val futureTask = new FutureTask[T](new Callable[T] { - override def call(): T = body - }) { - override def done(): Unit = promise.tryComplete { - Try(get()).recoverWith { - case _: CancellationException => Failure(new TimeoutException()) - } - } - } - - val reaperTask = new TimerTask { - override def run(): Unit = { - futureTask.cancel(true) - promise.tryFailure(new TimeoutException()) - } - } - - timer.schedule(reaperTask, finiteTimeout.toMillis) - ec.execute(futureTask) - - promise.future - - case _ => Future.apply(body) - } - - private val timer = new java.util.Timer(true) - - } -} diff --git a/core/src/main/scala/com/whisk/docker/testkit/BaseContainer.scala b/core/src/main/scala/com/whisk/docker/testkit/BaseContainer.scala new file mode 100644 index 0000000..8590792 --- /dev/null +++ b/core/src/main/scala/com/whisk/docker/testkit/BaseContainer.scala @@ -0,0 +1,116 @@ +package com.whisk.docker.testkit + +import java.util.concurrent.atomic.AtomicReference + +import com.spotify.docker.client.messages.ContainerInfo +import org.slf4j.LoggerFactory + +import scala.collection.JavaConverters._ + +sealed trait ContainerState + +object ContainerState { + + trait HasId extends ContainerState { + val id: String + } + + trait IsRunning extends HasId { + val info: ContainerInfo + override val id: String = info.id + } + + object NotStarted extends ContainerState + case class Created(id: String) extends ContainerState with HasId + case class Starting(id: String) extends ContainerState with HasId + case class Running(info: ContainerInfo) extends ContainerState with IsRunning + case class Ready(info: ContainerInfo) extends ContainerState with IsRunning + case class Failed(id: String) extends ContainerState + object Stopped extends ContainerState +} + +abstract class BaseContainer { + + def spec: ContainerSpec + + private lazy val log = LoggerFactory.getLogger(this.getClass) + + private val _state = new AtomicReference[ContainerState](ContainerState.NotStarted) + + def state(): ContainerState = { + _state.get() + } + + private def updateState(state: ContainerState): Unit = { + _state.set(state) + } + + private[docker] def created(id: String): Unit = { + updateState(ContainerState.Created(id)) + } + + private[docker] def starting(id: String): Unit = { + updateState(ContainerState.Starting(id)) + } + + private[docker] def running(info: ContainerInfo): Unit = { + updateState(ContainerState.Running(info)) + } + + private[docker] def ready(info: ContainerInfo): Unit = { + updateState(ContainerState.Ready(info)) + } + + private def addresses(info: ContainerInfo): Seq[String] = { + val addrs: Iterable[String] = for { + networks <- Option(info.networkSettings().networks()).map(_.asScala).toSeq + (key, network) <- networks + ip <- Option(network.ipAddress) + } yield { + ip + } + addrs.toList + } + + private def portsFrom(info: ContainerInfo): Map[Int, Int] = { + info + .networkSettings() + .ports() + .asScala + .collect { + case (portStr, bindings) if Option(bindings).exists(!_.isEmpty) => + val port = ContainerPort.parsed(portStr).port + val hostPort = bindings.get(0).hostPort().toInt + port -> hostPort + } + .toMap + } + + def ipAddresses(): Seq[String] = { + state() match { + case s: ContainerState.IsRunning => + addresses(s.info) + case _ => + throw new Exception("can't get addresses of not running container") + } + } + + def mappedPorts(): Map[Int, Int] = { + state() match { + case s: ContainerState.IsRunning => + portsFrom(s.info) + case _ => + throw new Exception("can't get ports of not running container") + } + } + + def mappedPort(port: Int): Int = { + mappedPorts().apply(port) + } + + def mappedPortOpt(port: Int): Option[Int] = { + mappedPorts().get(port) + } + + def toManagedContainer: SingleContainer = SingleContainer(this) +} diff --git a/core/src/main/scala/com/whisk/docker/testkit/Container.scala b/core/src/main/scala/com/whisk/docker/testkit/Container.scala new file mode 100644 index 0000000..a58e078 --- /dev/null +++ b/core/src/main/scala/com/whisk/docker/testkit/Container.scala @@ -0,0 +1,3 @@ +package com.whisk.docker.testkit + +class Container(override val spec: ContainerSpec) extends BaseContainer diff --git a/core/src/main/scala/com/whisk/docker/testkit/ContainerCommandExecutor.scala b/core/src/main/scala/com/whisk/docker/testkit/ContainerCommandExecutor.scala new file mode 100644 index 0000000..a6307b4 --- /dev/null +++ b/core/src/main/scala/com/whisk/docker/testkit/ContainerCommandExecutor.scala @@ -0,0 +1,119 @@ +package com.whisk.docker.testkit + +import java.nio.charset.StandardCharsets +import java.util.concurrent.TimeUnit + +import com.google.common.io.Closeables +import com.spotify.docker.client.DockerClient.{AttachParameter, RemoveContainerParam} +import com.spotify.docker.client.messages._ +import com.spotify.docker.client.{DockerClient, LogMessage, LogStream} + +import scala.concurrent.{ExecutionContext, Future, Promise} + +class StartFailedException(msg: String) extends Exception(msg) + +class ContainerCommandExecutor(val client: DockerClient) { + + def createContainer( + spec: ContainerSpec + )(implicit ec: ExecutionContext): Future[ContainerCreation] = { + Future( + scala.concurrent.blocking(client.createContainer(spec.containerConfig(), spec.name.orNull)) + ) + } + + def startContainer(id: String)(implicit ec: ExecutionContext): Future[Unit] = { + Future(scala.concurrent.blocking(client.startContainer(id))) + } + + def runningContainer(id: String)(implicit ec: ExecutionContext): Future[ContainerInfo] = { + def inspect() = { + Future(scala.concurrent.blocking(client.inspectContainer(id))).flatMap { info => + val status = info.state().status() + val badStates = Set("removing", "paused", "exited", "dead") + if (status == "running") { + Future.successful(info) + } else if (badStates(status)) { + Future.failed(new StartFailedException("container is in unexpected state: " + status)) + } else { + Future.failed(new Exception("not running yet")) + } + } + } + + def attempt(rest: Int): Future[ContainerInfo] = { + inspect().recoverWith { + case e: StartFailedException => Future.failed(e) + case _ if rest > 0 => + RetryUtils.withDelay(TimeUnit.SECONDS.toMillis(1))(attempt(rest - 1)) + case _ => + Future.failed(new StartFailedException("failed to get container in running state")) + } + } + + attempt(10) + } + + private def logStreamFuture(id: String, withErr: Boolean)(implicit + ec: ExecutionContext + ): Future[LogStream] = { + val baseParams = List(AttachParameter.STDOUT, AttachParameter.STREAM, AttachParameter.LOGS) + val logParams = if (withErr) AttachParameter.STDERR :: baseParams else baseParams + Future(scala.concurrent.blocking(client.attachContainer(id, logParams: _*))) + } + + def withLogStreamLines(id: String, withErr: Boolean)( + f: String => Unit + )(implicit ec: ExecutionContext): Unit = { + + logStreamFuture(id, withErr).foreach { stream => + stream.forEachRemaining(new java.util.function.Consumer[LogMessage] { + + override def accept(t: LogMessage): Unit = { + val str = StandardCharsets.US_ASCII.decode(t.content()).toString + f(s"[$id] $str") + } + }) + } + } + + def withLogStreamLinesRequirement(id: String, withErr: Boolean)( + f: String => Boolean + )(implicit ec: ExecutionContext): Future[Unit] = { + + logStreamFuture(id, withErr).flatMap { stream => + val p = Promise[Unit]() + Future { + stream.forEachRemaining(new java.util.function.Consumer[LogMessage] { + + override def accept(t: LogMessage): Unit = { + val str = StandardCharsets.US_ASCII.decode(t.content()).toString + if (f(str)) { + p.trySuccess(()) + Closeables.close(stream, true) + } + } + }) + } + p.future + } + } + + def remove(id: String, force: Boolean, removeVolumes: Boolean)(implicit + ec: ExecutionContext + ): Future[Unit] = { + Future( + scala.concurrent.blocking( + client.removeContainer( + id, + RemoveContainerParam.forceKill(force), + RemoveContainerParam.removeVolumes(removeVolumes) + ) + ) + ) + } + + def close(): Unit = { + Closeables.close(client, true) + } +} diff --git a/core/src/main/scala/com/whisk/docker/testkit/ContainerPort.scala b/core/src/main/scala/com/whisk/docker/testkit/ContainerPort.scala new file mode 100644 index 0000000..b7d5d97 --- /dev/null +++ b/core/src/main/scala/com/whisk/docker/testkit/ContainerPort.scala @@ -0,0 +1,17 @@ +package com.whisk.docker.testkit + +object PortProtocol extends Enumeration { + val TCP, UDP = Value +} + +case class ContainerPort(port: Int, protocol: PortProtocol.Value) + +object ContainerPort { + def parsed(str: String): ContainerPort = { + val Array(p, rest @ _*) = str.split("/") + val proto = rest.headOption + .flatMap(pr => PortProtocol.values.find(_.toString.equalsIgnoreCase(pr))) + .getOrElse(PortProtocol.TCP) + ContainerPort(p.toInt, proto) + } +} diff --git a/core/src/main/scala/com/whisk/docker/testkit/ContainerSpec.scala b/core/src/main/scala/com/whisk/docker/testkit/ContainerSpec.scala new file mode 100644 index 0000000..a7a387c --- /dev/null +++ b/core/src/main/scala/com/whisk/docker/testkit/ContainerSpec.scala @@ -0,0 +1,85 @@ +package com.whisk.docker.testkit + +import java.util.Collections + +import com.spotify.docker.client.messages.{ContainerConfig, HostConfig, PortBinding} +import com.spotify.docker.client.messages.HostConfig.Bind + +import scala.collection.JavaConverters._ + +case class ContainerSpec(image: String) { + + private val builder: ContainerConfig.Builder = ContainerConfig.builder().image(image) + private val hostConfigBuilder: HostConfig.Builder = HostConfig.builder() + + private var _readyChecker: Option[DockerReadyChecker] = None + private var _name: Option[String] = None + + def withCommand(cmd: String*): ContainerSpec = { + builder.cmd(cmd: _*) + this + } + + def withExposedPorts(ports: Int*): ContainerSpec = { + val binds: Seq[(Int, PortBinding)] = + ports.map(p => p -> PortBinding.randomPort("0.0.0.0")).toSeq + withPortBindings(binds: _*) + } + + def withPortBindings(ps: (Int, PortBinding)*): ContainerSpec = { + val binds: Map[String, java.util.List[PortBinding]] = ps.map { + case (guestPort, binding) => + guestPort.toString -> Collections.singletonList(binding) + }.toMap + + hostConfigBuilder.portBindings(binds.asJava) + builder.exposedPorts(binds.keySet.asJava) + this + } + + def withVolumeBindings(vs: Bind*): ContainerSpec = { + hostConfigBuilder.binds(vs: _*) + this + } + + def withReadyChecker(checker: DockerReadyChecker): ContainerSpec = { + _readyChecker = Some(checker) + this + } + + def withName(name: String): ContainerSpec = { + _name = Some(name) + this + } + + def withEnv(env: String*): ContainerSpec = { + builder.env(env: _*) + this + } + + def withConfiguration( + withBuilder: ContainerConfig.Builder => ContainerConfig.Builder + ): ContainerSpec = { + withBuilder(builder) + this + } + + def withHostConfiguration( + withBuilder: HostConfig.Builder => HostConfig.Builder + ): ContainerSpec = { + withBuilder(hostConfigBuilder) + this + } + + def name: Option[String] = _name + + def readyChecker: Option[DockerReadyChecker] = _readyChecker + + def containerConfig(): ContainerConfig = { + builder.hostConfig(hostConfigBuilder.build()).build() + } + + def toContainer: Container = new Container(this) + + def toManagedContainer: SingleContainer = SingleContainer(this.toContainer) +} diff --git a/core/src/main/scala/com/whisk/docker/testkit/DockerContainerManager.scala b/core/src/main/scala/com/whisk/docker/testkit/DockerContainerManager.scala new file mode 100644 index 0000000..f410eb9 --- /dev/null +++ b/core/src/main/scala/com/whisk/docker/testkit/DockerContainerManager.scala @@ -0,0 +1,142 @@ +package com.whisk.docker.testkit + +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} + +import com.spotify.docker.client.exceptions.ImageNotFoundException +import com.spotify.docker.client.messages.ContainerCreation +import org.slf4j.LoggerFactory + +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.collection.JavaConverters._ +import scala.language.postfixOps + +trait ManagedContainers + +case class SingleContainer(container: BaseContainer) extends ManagedContainers + +case class ContainerGroup(containers: Seq[BaseContainer]) extends ManagedContainers { + require(containers.nonEmpty, "container group should be non-empty") +} + +object ContainerGroup { + + def of(containers: BaseContainer*): ContainerGroup = ContainerGroup(containers) +} + +class DockerContainerManager( + managedContainers: ManagedContainers, + executor: ContainerCommandExecutor, + dockerTestTimeouts: DockerTestTimeouts, + executionContext: ExecutionContext +) { + + private implicit val ec: ExecutionContext = executionContext + + private lazy val log = LoggerFactory.getLogger(this.getClass) + + private val registeredContainers = new ConcurrentHashMap[String, String]() + + private def waitUntilReady(container: BaseContainer): Future[Unit] = { + container.spec.readyChecker match { + case None => + Future.successful(()) + case Some(checker) => + checker(container)(executor, executionContext) + } + } + + private def printWarningsIfExist(creation: ContainerCreation): Unit = { + Option(creation.warnings()) + .map(_.asScala.toList) + .getOrElse(Nil) + .foreach(w => log.warn(s"creating container: $w")) + } + + private def ensureImage(image: String): Future[Unit] = { + Future(scala.concurrent.blocking(executor.client.inspectImage(image))) + .map(_ => ()) + .recoverWith { + case x: ImageNotFoundException => + log.info(s"image [$image] not found. pulling...") + Future(scala.concurrent.blocking(executor.client.pull(image))) + } + } + + //TODO log listeners + def startContainer(container: BaseContainer): Future[Unit] = { + val image = container.spec.image + val startTime = System.nanoTime() + log.debug("Starting container: {}", image) + for { + creation <- executor.createContainer(container.spec) + id = creation.id() + _ = registeredContainers.put(id, image) + _ = container.created(id) + _ = printWarningsIfExist(creation) + _ = log.info(s"starting container with id: $id") + _ <- executor.startContainer(id) + _ = container.starting(id) + _ = log.info(s"container is starting. id=$id") + runningContainer <- executor.runningContainer(id) + _ = log.debug(s"container entered running state. id=$id") + _ = container.running(runningContainer) + _ = log.debug(s"preparing to execute ready check for container") + res <- waitUntilReady(container) + _ = log.debug(s"container is ready. id=$id") + } yield { + container.ready(runningContainer) + val timeTaken = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) + log.info(s"container $image is ready after ${timeTaken / 1000.0}s") + res + } + + } + + def start(): Unit = { + log.debug("Starting containers") + val containers: Seq[BaseContainer] = managedContainers match { + case SingleContainer(c) => Seq(c) + case ContainerGroup(cs) => cs + case _ => throw new Exception("unsupported type of managed containers") + } + + val imagesF = Future.traverse(containers.map(_.spec.image))(ensureImage) + Await.result(imagesF, dockerTestTimeouts.pull) + + val startedContainersF = Future.traverse(containers)(startContainer) + + sys.addShutdownHook( + stop() + ) + + try { + Await.result(startedContainersF, dockerTestTimeouts.init) + } catch { + case e: Exception => + log.error("Exception during container initialization", e) + stop() + throw new RuntimeException("Cannot run all required containers") + } + } + + def stop(): Unit = { + try { + Await.ready(stopRmAll(), dockerTestTimeouts.stop) + } catch { + case e: Throwable => + log.error(e.getMessage, e) + } + } + + def stopRmAll(): Future[Unit] = { + val future = Future.traverse(registeredContainers.asScala.toSeq) { + case (cid, _) => + executor.remove(cid, force = true, removeVolumes = true) + } + future.onComplete { _ => + executor.close() + } + future.map(_ => ()) + } + +} diff --git a/core/src/main/scala/com/whisk/docker/testkit/DockerReadyChecker.scala b/core/src/main/scala/com/whisk/docker/testkit/DockerReadyChecker.scala new file mode 100644 index 0000000..f8793da --- /dev/null +++ b/core/src/main/scala/com/whisk/docker/testkit/DockerReadyChecker.scala @@ -0,0 +1,233 @@ +package com.whisk.docker.testkit + +import java.net.{HttpURLConnection, URL} +import java.sql.DriverManager +import java.util.{Timer, TimerTask} + +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ExecutionContext, Future, Promise, TimeoutException} + +class FailFastCheckException(m: String) extends Exception(m) + +trait DockerReadyChecker { + + def apply( + container: BaseContainer + )(implicit docker: ContainerCommandExecutor, ec: ExecutionContext): Future[Unit] + + def within(duration: FiniteDuration): DockerReadyChecker = { + DockerReadyChecker.TimeLimited(this, duration) + } + + def looped(attempts: Int, delay: FiniteDuration): DockerReadyChecker = { + DockerReadyChecker.Looped(this, attempts, delay) + } +} + +object RetryUtils { + + def withDelay[T](delay: Long)(f: => Future[T]): Future[T] = { + val timer = new Timer() + val promise = Promise[T]() + timer.schedule( + new TimerTask { + override def run(): Unit = { + promise.completeWith(f) + timer.cancel() + } + }, + delay + ) + promise.future + } + + def runWithin[T](future: => Future[T], deadline: FiniteDuration)(implicit + ec: ExecutionContext + ): Future[T] = { + val bail = Promise[T]() + withDelay(deadline.toMillis)( + bail + .tryCompleteWith(Future.failed(new TimeoutException(s"timed out after $deadline"))) + .future + ) + Future.firstCompletedOf(future :: bail.future :: Nil) + } + + def looped[T](future: => Future[T], attempts: Int, delay: FiniteDuration)(implicit + ec: ExecutionContext + ): Future[T] = { + def attempt(rest: Int): Future[T] = { + future.recoverWith { + case e: FailFastCheckException => Future.failed(e) + case e if rest > 0 => + withDelay(delay.toMillis)(attempt(rest - 1)) + case e => + Future.failed(e) + } + } + + attempt(attempts) + } +} + +object DockerReadyChecker { + + case class And(r1: DockerReadyChecker, r2: DockerReadyChecker) extends DockerReadyChecker { + + override def apply( + container: BaseContainer + )(implicit docker: ContainerCommandExecutor, ec: ExecutionContext): Future[Unit] = { + val aF = r1(container) + val bF = r2(container) + for { + a <- aF + b <- bF + } yield { + () + } + } + } + + object Always extends DockerReadyChecker { + override def apply( + container: BaseContainer + )(implicit docker: ContainerCommandExecutor, ec: ExecutionContext): Future[Unit] = + Future.successful(()) + } + + case class HttpResponseCode( + port: Int, + path: String = "/", + host: Option[String] = None, + code: Int = 200 + ) extends DockerReadyChecker { + + override def apply( + container: BaseContainer + )(implicit docker: ContainerCommandExecutor, ec: ExecutionContext): Future[Unit] = { + + val p = container.mappedPorts()(port) + val url = new URL("http", host.getOrElse(docker.client.getHost), p, path) + Future { + scala.concurrent.blocking { + val con = url.openConnection().asInstanceOf[HttpURLConnection] + try { + if (con.getResponseCode != code) + throw new Exception("unexpected response code: " + con.getResponseCode) + } catch { + case e: java.net.ConnectException => + throw e + } + } + } + } + } + + case class LogLineContains(str: String) extends DockerReadyChecker { + + override def apply( + container: BaseContainer + )(implicit docker: ContainerCommandExecutor, ec: ExecutionContext): Future[Unit] = { + container.state() match { + case ContainerState.Ready(_) => + Future.successful(()) + case state: ContainerState.HasId => + docker + .withLogStreamLinesRequirement(state.id, withErr = true)(_.contains(str)) + .map(_ => ()) + case _ => + Future.failed( + new FailFastCheckException("can't initialise LogStream to container without Id") + ) + } + } + } + + private[docker] case class TimeLimited(underlying: DockerReadyChecker, duration: FiniteDuration) + extends DockerReadyChecker { + + override def apply( + container: BaseContainer + )(implicit docker: ContainerCommandExecutor, ec: ExecutionContext): Future[Unit] = { + RetryUtils.runWithin(underlying(container), duration) + } + } + + private[docker] case class Looped( + underlying: DockerReadyChecker, + attempts: Int, + delay: FiniteDuration + ) extends DockerReadyChecker { + + override def apply( + container: BaseContainer + )(implicit docker: ContainerCommandExecutor, ec: ExecutionContext): Future[Unit] = { + + def attempt(attemptsLeft: Int): Future[Unit] = { + underlying(container) + .recoverWith { + case e: FailFastCheckException => Future.failed(e) + case e if attemptsLeft > 0 => + RetryUtils.withDelay(delay.toMillis)(attempt(attemptsLeft - 1)) + case e => + Future.failed(e) + } + } + + attempt(attempts) + } + } + + private[docker] case class F(f: BaseContainer => Future[Unit]) extends DockerReadyChecker { + override def apply( + container: BaseContainer + )(implicit docker: ContainerCommandExecutor, ec: ExecutionContext): Future[Unit] = + f(container) + } + + case class Jdbc( + driverClass: String, + user: String, + password: Option[String], + database: Option[String] = None, + port: Option[Int] = None + ) extends DockerReadyChecker { + + private val driverLower = driverClass.toLowerCase + private[Jdbc] val dbms: String = if (driverLower.contains("mysql")) { + "mysql" + } else if (driverLower.contains("postgres")) { + "postgresql" + } else { + throw new IllegalArgumentException("unsupported database for ready check") + } + + override def apply( + container: BaseContainer + )(implicit docker: ContainerCommandExecutor, ec: ExecutionContext): Future[Unit] = { + + Future(scala.concurrent.blocking { + try { + Class.forName(driverClass) + val p = port match { + case Some(v) => container.mappedPort(v) + case None => container.mappedPorts().head._2 + } + + val url = "jdbc:" + dbms + "://" + docker.client.getHost + ":" + p + "/" + database + .getOrElse("") + + val connection = Option(DriverManager.getConnection(url, user, password.orNull)) + connection.foreach(_.close()) + if (connection.isEmpty) { + throw new Exception(s"can't establish jdbc connection to $url") + } + } catch { + case e: ClassNotFoundException => + throw new FailFastCheckException(s"jdbc class $driverClass not found") + } + }) + } + } + +} diff --git a/core/src/main/scala/com/whisk/docker/testkit/DockerTestTimeouts.scala b/core/src/main/scala/com/whisk/docker/testkit/DockerTestTimeouts.scala new file mode 100644 index 0000000..85f641d --- /dev/null +++ b/core/src/main/scala/com/whisk/docker/testkit/DockerTestTimeouts.scala @@ -0,0 +1,14 @@ +package com.whisk.docker.testkit + +import scala.concurrent.duration._ + +case class DockerTestTimeouts( + pull: FiniteDuration = 5.minutes, + init: FiniteDuration = 60.seconds, + stop: FiniteDuration = 10.seconds +) + +object DockerTestTimeouts { + + val Default = DockerTestTimeouts() +} diff --git a/core/src/main/scala/com/whisk/docker/testkit/package.scala b/core/src/main/scala/com/whisk/docker/testkit/package.scala new file mode 100644 index 0000000..34a1c7a --- /dev/null +++ b/core/src/main/scala/com/whisk/docker/testkit/package.scala @@ -0,0 +1,38 @@ +package com.whisk.docker + +import java.util.concurrent.atomic.AtomicBoolean + +import scala.concurrent.{Future, Promise} + +/** + * General utility functions + */ +package object testkit { + implicit class OptionalOps[A](val content: A) extends AnyVal { + def withOption[B](optional: Option[B])(f: (A, B) => A): A = + optional match { + case None => content + case Some(x) => f(content, x) + } + } + + private[docker] class SinglePromise[T] { + val promise: Promise[T] = Promise[T]() + + def future: Future[T] = promise.future + + val flag = new AtomicBoolean(false) + + def init(f: => Future[T]): Future[T] = { + if (!flag.getAndSet(true)) { + promise.tryCompleteWith(f) + } + future + } + } + + private[docker] object SinglePromise { + def apply[T] = new SinglePromise[T] + } + +} diff --git a/impl/docker-java/src/main/scala/com/whisk/docker/impl/dockerjava/Docker.scala b/impl/docker-java/src/main/scala/com/whisk/docker/impl/dockerjava/Docker.scala deleted file mode 100644 index 1ba081f..0000000 --- a/impl/docker-java/src/main/scala/com/whisk/docker/impl/dockerjava/Docker.scala +++ /dev/null @@ -1,14 +0,0 @@ -package com.whisk.docker.impl.dockerjava - -import java.net.InetAddress - -import com.github.dockerjava.api.command.DockerCmdExecFactory -import com.github.dockerjava.core.{DockerClientBuilder, DockerClientConfig} -import com.github.dockerjava.jaxrs.JerseyDockerCmdExecFactory - -class Docker(val config: DockerClientConfig, - protected val factory: DockerCmdExecFactory = new JerseyDockerCmdExecFactory) { - val client = DockerClientBuilder.getInstance(config).withDockerCmdExecFactory(factory).build() - val host = - Option(config.getDockerHost.getHost).getOrElse(InetAddress.getLoopbackAddress.getHostAddress) -} diff --git a/impl/docker-java/src/main/scala/com/whisk/docker/impl/dockerjava/DockerJavaExecutor.scala b/impl/docker-java/src/main/scala/com/whisk/docker/impl/dockerjava/DockerJavaExecutor.scala deleted file mode 100644 index 8d04610..0000000 --- a/impl/docker-java/src/main/scala/com/whisk/docker/impl/dockerjava/DockerJavaExecutor.scala +++ /dev/null @@ -1,195 +0,0 @@ -package com.whisk.docker.impl.dockerjava - -import java.util.concurrent.TimeUnit - -import com.github.dockerjava.api.DockerClient -import com.github.dockerjava.api.exception.NotFoundException -import com.github.dockerjava.api.model.{ContainerPort => _, PortBinding => _, _} -import com.github.dockerjava.core.command.{LogContainerResultCallback, PullImageResultCallback} -import com.google.common.io.Closeables -import com.whisk.docker._ -import scala.collection.JavaConverters._ -import scala.concurrent.duration.{Duration, FiniteDuration} -import scala.concurrent.{ExecutionContext, Future, Promise} -import scala.reflect.ClassTag - -class DockerJavaExecutor(override val host: String, client: DockerClient) - extends DockerCommandExecutor { - - override def createContainer(spec: DockerContainer)( - implicit ec: ExecutionContext, timeout: Duration): Future[String] = { - val volumeToBind: Seq[(Volume, Bind)] = spec.volumeMappings.map { mapping => - val volume: Volume = new Volume(mapping.container) - (volume, new Bind(mapping.host, volume, AccessMode.fromBoolean(mapping.rw))) - } - - val hostConfig = new com.github.dockerjava.api.model.HostConfig() - .withOption(spec.networkMode)({ case (config, value) => config.withNetworkMode(value) }) - .withPortBindings(spec.bindPorts.foldLeft(new Ports()) { - case (ps, (guestPort, DockerPortMapping(Some(hostPort), address))) => - ps.bind(ExposedPort.tcp(guestPort), Ports.Binding.bindPort(hostPort)) - ps - case (ps, (guestPort, DockerPortMapping(None, address))) => - ps.bind(ExposedPort.tcp(guestPort), Ports.Binding.empty()) - ps - }) - .withLinks( - new Links(spec.links.map { - case ContainerLink(container, alias) => - new Link(container.name.get, alias) - }: _*) - ) - .withBinds(new Binds(volumeToBind.map(_._2): _*)) - .withOption(spec.hostConfig.flatMap(_.memory)) { - case (config, memory) => config.withMemory(memory) - } - .withOption(spec.hostConfig.flatMap(_.memoryReservation)) { - case (config, memoryReservation) => config.withMemoryReservation(memoryReservation) - } - .withOption(spec.hostConfig.map(_.privileged)) { - case (config, privileged) => config.withPrivileged(privileged) - } - - val cmd = client - .createContainerCmd(spec.image) - .withHostConfig(hostConfig) - .withPortSpecs(spec.bindPorts - .map({ - case (guestPort, DockerPortMapping(Some(hostPort), address)) => - s"$address:$hostPort:$guestPort" - case (guestPort, DockerPortMapping(None, address)) => s"$address::$guestPort" - }) - .toSeq: _*) - .withExposedPorts(spec.bindPorts.keys.map(ExposedPort.tcp).toSeq: _*) - .withTty(spec.tty) - .withStdinOpen(spec.stdinOpen) - .withEnv(spec.env: _*) - .withVolumes(volumeToBind.map(_._1): _*) - .withOption(spec.user) { case (config, user) => config.withUser(user) } - .withOption(spec.hostname) { case (config, hostName) => config.withHostName(hostName) } - .withOption(spec.name) { case (config, name) => config.withName(name) } - .withOption(spec.command) { case (config, c) => config.withCmd(c: _*) } - .withOption(spec.entrypoint) { - case (config, entrypoint) => config.withEntrypoint(entrypoint: _*) - } - - PerishableFuture(cmd.exec()).map { resp => - if (resp.getId != null && resp.getId != "") { - resp.getId - } else { - throw new RuntimeException( - s"Cannot run container ${spec.image}: ${resp.getWarnings.mkString(", ")}") - } - } - } - - override def startContainer(id: String)(implicit ec: ExecutionContext, timeout: Duration): Future[Unit] = { - PerishableFuture(client.startContainerCmd(id).exec()).map(_ => ()) - } - - override def inspectContainer(id: String)( - implicit ec: ExecutionContext, timeout: Duration): Future[Option[InspectContainerResult]] = { - val resp = PerishableFuture(Some(client.inspectContainerCmd(id).exec())).recover { - case x: NotFoundException => None - } - val future = resp.map(_.map { result => - val containerBindings = Option(result.getNetworkSettings.getPorts) - .map(_.getBindings.asScala.toMap) - .getOrElse(Map()) - val portMap = containerBindings.collect { - case (exposedPort, bindings) if Option(bindings).isDefined => - val p = - ContainerPort(exposedPort.getPort, - PortProtocol.withName(exposedPort.getProtocol.toString.toUpperCase)) - val hostBindings: Seq[PortBinding] = bindings.map { b => - PortBinding(b.getHostIp, b.getHostPortSpec.toInt) - } - p -> hostBindings - } - - val addresses: Iterable[String] = for { - networks <- Option(result.getNetworkSettings.getNetworks).map(_.asScala).toSeq - (key, network) <- networks - ip <- Option(network.getIpAddress) - } yield { - ip - } - - InspectContainerResult(running = true, - ports = portMap, - name = result.getName, - ipAddresses = addresses.toSeq) - }) - RetryUtils.looped( - future.flatMap { - case Some(x) if x.running => Future.successful(Some(x)) - case None => Future.successful(None) - case _ => Future.failed(throw new Exception("container is not running")) - }, - 5, - FiniteDuration(2, TimeUnit.SECONDS) - ) - } - - override def withLogStreamLines(id: String, withErr: Boolean)(f: String => Unit)( - implicit docker: DockerCommandExecutor, - ec: ExecutionContext, timeout: Duration - ): Unit = { - val cmd = - client.logContainerCmd(id).withStdOut(true).withStdErr(withErr).withFollowStream(true) - - cmd.exec(new LogContainerResultCallback { - override def onNext(item: Frame): Unit = { - super.onNext(item) - f(s"[$id] ${item.toString}") - } - }) - } - - override def withLogStreamLinesRequirement(id: String, withErr: Boolean)(f: String => Boolean)( - implicit docker: DockerCommandExecutor, - ec: ExecutionContext, timeout: Duration): Future[Unit] = { - - val cmd = - client.logContainerCmd(id).withStdOut(true).withStdErr(withErr).withFollowStream(true) - for { - - res <- { - val p = Promise[Unit]() - cmd.exec(new LogContainerResultCallback { - override def onNext(item: Frame): Unit = { - super.onNext(item) - if (f(item.toString)) { - p.trySuccess(()) - onComplete() - } - } - }) - p.future - } - } yield { - res - } - } - - override def listImages()(implicit ec: ExecutionContext, timeout: Duration): Future[Set[String]] = { - PerishableFuture( - client - .listImagesCmd() - .exec() - .asScala - .flatMap(img => Option(img.getRepoTags).getOrElse(Array.empty(ClassTag(classOf[String])))) - .toSet) - } - - override def pullImage(image: String)(implicit ec: ExecutionContext, timeout: Duration): Future[Unit] = { - PerishableFuture(client.pullImageCmd(image).exec(new PullImageResultCallback()).awaitSuccess()) - } - - override def remove(id: String, force: Boolean, removeVolumes: Boolean)( - implicit ec: ExecutionContext, timeout: Duration): Future[Unit] = { - PerishableFuture(client.removeContainerCmd(id).withForce(force).withRemoveVolumes(true).exec()) - } - - override def close(): Unit = Closeables.close(client, true) -} diff --git a/impl/docker-java/src/main/scala/com/whisk/docker/impl/dockerjava/DockerJavaExecutorFactory.scala b/impl/docker-java/src/main/scala/com/whisk/docker/impl/dockerjava/DockerJavaExecutorFactory.scala deleted file mode 100644 index 275a32b..0000000 --- a/impl/docker-java/src/main/scala/com/whisk/docker/impl/dockerjava/DockerJavaExecutorFactory.scala +++ /dev/null @@ -1,9 +0,0 @@ -package com.whisk.docker.impl.dockerjava - -import com.whisk.docker.{DockerCommandExecutor, DockerFactory} - -class DockerJavaExecutorFactory(docker: Docker) extends DockerFactory { - override def createExecutor(): DockerCommandExecutor = { - new DockerJavaExecutor(docker.host, docker.client) - } -} diff --git a/impl/docker-java/src/main/scala/com/whisk/docker/impl/dockerjava/DockerKitDockerJava.scala b/impl/docker-java/src/main/scala/com/whisk/docker/impl/dockerjava/DockerKitDockerJava.scala deleted file mode 100644 index 5f03853..0000000 --- a/impl/docker-java/src/main/scala/com/whisk/docker/impl/dockerjava/DockerKitDockerJava.scala +++ /dev/null @@ -1,10 +0,0 @@ -package com.whisk.docker.impl.dockerjava - -import com.github.dockerjava.core.DefaultDockerClientConfig -import com.whisk.docker.{DockerFactory, DockerKit} - -trait DockerKitDockerJava extends DockerKit { - - override implicit val dockerFactory: DockerFactory = new DockerJavaExecutorFactory( - new Docker(DefaultDockerClientConfig.createDefaultConfigBuilder().build())) -} diff --git a/impl/spotify/src/main/scala/com/whisk/docker/impl/spotify/DockerKitSpotify.scala b/impl/spotify/src/main/scala/com/whisk/docker/impl/spotify/DockerKitSpotify.scala deleted file mode 100644 index 7026a53..0000000 --- a/impl/spotify/src/main/scala/com/whisk/docker/impl/spotify/DockerKitSpotify.scala +++ /dev/null @@ -1,10 +0,0 @@ -package com.whisk.docker.impl.spotify - -import com.spotify.docker.client.DefaultDockerClient -import com.whisk.docker.{DockerFactory, DockerKit} - -trait DockerKitSpotify extends DockerKit { - - override implicit val dockerFactory: DockerFactory = new SpotifyDockerFactory( - DefaultDockerClient.fromEnv().build()) -} diff --git a/impl/spotify/src/main/scala/com/whisk/docker/impl/spotify/SpotifyDockerCommandExecutor.scala b/impl/spotify/src/main/scala/com/whisk/docker/impl/spotify/SpotifyDockerCommandExecutor.scala deleted file mode 100644 index a51bac2..0000000 --- a/impl/spotify/src/main/scala/com/whisk/docker/impl/spotify/SpotifyDockerCommandExecutor.scala +++ /dev/null @@ -1,209 +0,0 @@ -package com.whisk.docker.impl.spotify - -import java.nio.charset.StandardCharsets -import java.util -import java.util.Collections -import java.util.concurrent.TimeUnit -import java.util.function.Consumer - -import com.google.common.io.Closeables -import com.spotify.docker.client.DockerClient.{AttachParameter, RemoveContainerParam} -import com.spotify.docker.client.exceptions.ContainerNotFoundException -import com.spotify.docker.client.messages.{ContainerConfig, HostConfig, PortBinding} -import com.spotify.docker.client.{DockerClient, LogMessage} -import com.whisk.docker._ - -import scala.collection.JavaConverters._ -import scala.concurrent.duration.{Duration, FiniteDuration} -import scala.concurrent.{ExecutionContext, Future, Promise} - -class SpotifyDockerCommandExecutor(override val host: String, client: DockerClient) - extends DockerCommandExecutor { - - override def createContainer(spec: DockerContainer)( - implicit ec: ExecutionContext, timeout: Duration): Future[String] = { - val portBindings: Map[String, util.List[PortBinding]] = spec.bindPorts.map { - case (guestPort, DockerPortMapping(Some(hostPort), address)) => - guestPort.toString -> Collections.singletonList(PortBinding.of(address, hostPort)) - case (guestPort, DockerPortMapping(None, address)) => - guestPort.toString -> Collections.singletonList(PortBinding.randomPort(address)) - } - val binds: Seq[String] = spec.volumeMappings.map { volumeMapping => - val rw = if (volumeMapping.rw) ":rw" else "" - volumeMapping.host + ":" + volumeMapping.container + rw - } - - val hostConfig = { - val hostConfigBase = - HostConfig.builder().portBindings(portBindings.asJava).binds(binds.asJava) - - val links = spec.links.map { - case ContainerLink(container, alias) => s"${container.name.get}:$alias" - } - - val hostConfigBuilder = - if (links.isEmpty) hostConfigBase else hostConfigBase.links(links.asJava) - hostConfigBuilder - .withOption(spec.networkMode) { - case (config, networkMode) => config.networkMode(networkMode) - } - .withOption(spec.hostConfig.flatMap(_.tmpfs)) { - case (config, value) => config.tmpfs(value.asJava) - } - .withOption(spec.hostConfig.flatMap(_.memory)) { - case (config, memory) => config.memory(memory) - } - .withOption(spec.hostConfig.flatMap(_.memoryReservation)) { - case (config, reservation) => config.memoryReservation(reservation) - } - .withOption(spec.hostConfig.map(_.privileged)) { - case (config, privileged) => config.privileged(privileged) - } - .build() - } - - val containerConfig = ContainerConfig - .builder() - .image(spec.image) - .hostConfig(hostConfig) - .exposedPorts(spec.bindPorts.map(_._1.toString).toSeq: _*) - .tty(spec.tty) - .attachStdin(spec.stdinOpen) - .env(spec.env: _*) - .withOption(spec.user) { case (config, user) => config.user(user) } - .withOption(spec.hostname) { case (config, hostname) => config.hostname(hostname) } - .withOption(spec.command) { case (config, command) => config.cmd(command: _*) } - .withOption(spec.entrypoint) { - case (config, entrypoint) => config.entrypoint(entrypoint: _*) - } - .build() - - val creation = PerishableFuture( - spec.name.fold(client.createContainer(containerConfig))( - client.createContainer(containerConfig, _)) - ) - - creation.map(_.id) - } - - override def startContainer(id: String)(implicit ec: ExecutionContext, timeout: Duration): Future[Unit] = { - PerishableFuture(client.startContainer(id)) - } - - override def inspectContainer(id: String)( - implicit ec: ExecutionContext, timeout: Duration): Future[Option[InspectContainerResult]] = { - - def inspect() = - PerishableFuture(client.inspectContainer(id)) - .flatMap { info => - val networkPorts = Option(info.networkSettings().ports()) - networkPorts match { - case Some(p) => - val ports = info - .networkSettings() - .ports() - .asScala - .collect { - case (cPort, bindings) if Option(bindings).exists(!_.isEmpty) => - val binds = bindings.asScala - .map(b => com.whisk.docker.PortBinding(b.hostIp(), b.hostPort().toInt)) - .toList - ContainerPort.parse(cPort) -> binds - } - .toMap - - val addresses: Iterable[String] = for { - networks <- Option(info.networkSettings().networks()).map(_.asScala).toSeq - (key, network) <- networks - ip <- Option(network.ipAddress) - } yield { - ip - } - - Future.successful( - Some( - InspectContainerResult(info.state().running(), - ports, - info.name(), - addresses.toSeq))) - case None => - Future.failed(new Exception("can't extract ports")) - } - } - .recover { - case t: ContainerNotFoundException => - None - } - - RetryUtils.looped(inspect(), attempts = 5, delay = FiniteDuration(1, TimeUnit.SECONDS)) - } - - override def withLogStreamLines(id: String, withErr: Boolean)( - f: String => Unit)(implicit docker: DockerCommandExecutor, ec: ExecutionContext, timeout: Duration): Unit = { - val baseParams = List(AttachParameter.STDOUT, AttachParameter.STREAM, AttachParameter.LOGS) - val logParams = if (withErr) AttachParameter.STDERR :: baseParams else baseParams - val streamF = PerishableFuture(client.attachContainer(id, logParams: _*)) - - streamF.flatMap { stream => - PerishableFuture { - stream.forEachRemaining(new Consumer[LogMessage] { - override def accept(t: LogMessage): Unit = { - val str = StandardCharsets.US_ASCII.decode(t.content()).toString - f(s"[$id] $str") - } - }) - } - } - } - - override def withLogStreamLinesRequirement(id: String, withErr: Boolean)(f: (String) => Boolean)( - implicit docker: DockerCommandExecutor, - ec: ExecutionContext, timeout: Duration): Future[Unit] = { - - val baseParams = List(AttachParameter.STDOUT, AttachParameter.STREAM, AttachParameter.LOGS) - val logParams = if (withErr) AttachParameter.STDERR :: baseParams else baseParams - - val streamF = PerishableFuture(client.attachContainer(id, logParams: _*)) - - streamF.flatMap { stream => - val p = Promise[Unit]() - PerishableFuture { - stream.forEachRemaining(new Consumer[LogMessage] { - override def accept(t: LogMessage): Unit = { - val str = StandardCharsets.US_ASCII.decode(t.content()).toString - if (f(str)) { - p.trySuccess(()) - Closeables.close(stream, true) - } - } - }) - } - p.future - } - } - - override def listImages()(implicit ec: ExecutionContext, timeout: Duration): Future[Set[String]] = { - PerishableFuture( - client - .listImages() - .asScala - .flatMap(img => Option(img.repoTags()).map(_.asScala).getOrElse(Seq.empty)) - .toSet) - } - - override def pullImage(image: String)(implicit ec: ExecutionContext, timeout: Duration): Future[Unit] = { - PerishableFuture(client.pull(image)) - } - - override def remove(id: String, force: Boolean, removeVolumes: Boolean)( - implicit ec: ExecutionContext, timeout: Duration): Future[Unit] = { - PerishableFuture( - client.removeContainer(id, - RemoveContainerParam.forceKill(force), - RemoveContainerParam.removeVolumes(removeVolumes))) - } - - override def close(): Unit = { - Closeables.close(client, true) - } -} diff --git a/impl/spotify/src/main/scala/com/whisk/docker/impl/spotify/SpotifyDockerFactory.scala b/impl/spotify/src/main/scala/com/whisk/docker/impl/spotify/SpotifyDockerFactory.scala deleted file mode 100644 index ba9d866..0000000 --- a/impl/spotify/src/main/scala/com/whisk/docker/impl/spotify/SpotifyDockerFactory.scala +++ /dev/null @@ -1,11 +0,0 @@ -package com.whisk.docker.impl.spotify - -import com.spotify.docker.client.DockerClient -import com.whisk.docker.{DockerCommandExecutor, DockerFactory} - -class SpotifyDockerFactory(client: DockerClient) extends DockerFactory { - - override def createExecutor(): DockerCommandExecutor = { - new SpotifyDockerCommandExecutor(client.getHost, client) - } -} diff --git a/notes/0.10.0.markdown b/notes/0.10.0.markdown new file mode 100644 index 0000000..ff1852e --- /dev/null +++ b/notes/0.10.0.markdown @@ -0,0 +1,11 @@ +#scala #docker #integration-testing + +## Highlights + +* global inner refactoring, packages were changed +* removed publishing of next artifacts: + - docker-testkit-config + - docker-testkit-impl-spotify + - docker-testkit-impl-spotify-shaded + - docker-testkit-impl-docker-java +* `"com.spotify" % "docker-client" % "8.16.0"` used as default backend diff --git a/samples/src/main/scala/com/whisk/docker/DockerCassandraService.scala b/samples/src/main/scala/com/whisk/docker/DockerCassandraService.scala deleted file mode 100644 index 1975c5b..0000000 --- a/samples/src/main/scala/com/whisk/docker/DockerCassandraService.scala +++ /dev/null @@ -1,12 +0,0 @@ -package com.whisk.docker - -trait DockerCassandraService extends DockerKit { - - val DefaultCqlPort = 9042 - - val cassandraContainer = DockerContainer("whisk/cassandra:2.1.8") - .withPorts(DefaultCqlPort -> None) - .withReadyChecker(DockerReadyChecker.LogLineContains("Starting listening for CQL clients on")) - - abstract override def dockerContainers = cassandraContainer :: super.dockerContainers -} diff --git a/samples/src/main/scala/com/whisk/docker/DockerElasticsearchService.scala b/samples/src/main/scala/com/whisk/docker/DockerElasticsearchService.scala deleted file mode 100644 index b895465..0000000 --- a/samples/src/main/scala/com/whisk/docker/DockerElasticsearchService.scala +++ /dev/null @@ -1,23 +0,0 @@ -package com.whisk.docker - -import scala.concurrent.duration._ - -trait DockerElasticsearchService extends DockerKit { - - val DefaultElasticsearchHttpPort = 9200 - val DefaultElasticsearchClientPort = 9300 - - val elasticsearchContainer: DockerContainer = DockerContainer("docker.elastic.co/elasticsearch/elasticsearch-oss:6.2.4") - .withPortMapping( - DefaultElasticsearchHttpPort -> DockerPortMapping(Some(DefaultElasticsearchHttpPort)), - DefaultElasticsearchClientPort -> DockerPortMapping(Some(DefaultElasticsearchClientPort))) - .withEnv("discovery.type=single-node") - .withReadyChecker( - DockerReadyChecker - .HttpResponseCode(DefaultElasticsearchHttpPort, "/", Some("0.0.0.0")) - .within(100.millis) - .looped(20, 1250.millis)) - - abstract override def dockerContainers: List[DockerContainer] = - elasticsearchContainer :: super.dockerContainers -} diff --git a/samples/src/main/scala/com/whisk/docker/DockerKafkaService.scala b/samples/src/main/scala/com/whisk/docker/DockerKafkaService.scala deleted file mode 100644 index 2359fa5..0000000 --- a/samples/src/main/scala/com/whisk/docker/DockerKafkaService.scala +++ /dev/null @@ -1,15 +0,0 @@ -package com.whisk.docker - -trait DockerKafkaService extends DockerKit { - - def KafkaAdvertisedPort = 9092 - val ZookeeperDefaultPort = 2181 - - lazy val kafkaContainer = DockerContainer("spotify/kafka") - .withPorts(KafkaAdvertisedPort -> Some(KafkaAdvertisedPort), ZookeeperDefaultPort -> None) - .withEnv(s"ADVERTISED_PORT=$KafkaAdvertisedPort", s"ADVERTISED_HOST=${dockerExecutor.host}") - .withReadyChecker(DockerReadyChecker.LogLineContains("kafka entered RUNNING state")) - - abstract override def dockerContainers: List[DockerContainer] = - kafkaContainer :: super.dockerContainers -} diff --git a/samples/src/main/scala/com/whisk/docker/DockerMongodbService.scala b/samples/src/main/scala/com/whisk/docker/DockerMongodbService.scala deleted file mode 100644 index b5cb30c..0000000 --- a/samples/src/main/scala/com/whisk/docker/DockerMongodbService.scala +++ /dev/null @@ -1,14 +0,0 @@ -package com.whisk.docker - -trait DockerMongodbService extends DockerKit { - - val DefaultMongodbPort = 27017 - - val mongodbContainer = DockerContainer("mongo:3.0.6") - .withPorts(DefaultMongodbPort -> None) - .withReadyChecker(DockerReadyChecker.LogLineContains("waiting for connections on port")) - .withCommand("mongod", "--nojournal", "--smallfiles", "--syncdelay", "0") - - abstract override def dockerContainers: List[DockerContainer] = - mongodbContainer :: super.dockerContainers -} diff --git a/samples/src/main/scala/com/whisk/docker/DockerNeo4jService.scala b/samples/src/main/scala/com/whisk/docker/DockerNeo4jService.scala deleted file mode 100644 index a2c32f3..0000000 --- a/samples/src/main/scala/com/whisk/docker/DockerNeo4jService.scala +++ /dev/null @@ -1,21 +0,0 @@ -package com.whisk.docker - -import scala.concurrent.duration._ - -trait DockerNeo4jService extends DockerKit { - - val DefaultNeo4jHttpPort = 7474 - - val neo4jContainer = DockerContainer("neo4j:3.0.3") - .withPorts(DefaultNeo4jHttpPort -> None) - .withEnv("NEO4J_AUTH=none") - .withReadyChecker( - DockerReadyChecker - .HttpResponseCode(DefaultNeo4jHttpPort, "/db/data/") - .within(100.millis) - .looped(20, 1250.millis) - ) - - abstract override def dockerContainers: List[DockerContainer] = - neo4jContainer :: super.dockerContainers -} diff --git a/samples/src/main/scala/com/whisk/docker/DockerPostgresService.scala b/samples/src/main/scala/com/whisk/docker/DockerPostgresService.scala deleted file mode 100644 index 83b013f..0000000 --- a/samples/src/main/scala/com/whisk/docker/DockerPostgresService.scala +++ /dev/null @@ -1,41 +0,0 @@ -package com.whisk.docker - -import java.sql.DriverManager - -import scala.concurrent.ExecutionContext -import scala.concurrent.duration.Duration -import scala.util.Try - -trait DockerPostgresService extends DockerKit { - import scala.concurrent.duration._ - def PostgresAdvertisedPort = 5432 - def PostgresExposedPort = 44444 - val PostgresUser = "nph" - val PostgresPassword = "suitup" - - val postgresContainer = DockerContainer("postgres:9.5.3") - .withPorts((PostgresAdvertisedPort, Some(PostgresExposedPort))) - .withEnv(s"POSTGRES_USER=$PostgresUser", s"POSTGRES_PASSWORD=$PostgresPassword") - .withReadyChecker( - new PostgresReadyChecker(PostgresUser, PostgresPassword, Some(PostgresExposedPort)) - .looped(15, 1.second) - ) - - abstract override def dockerContainers: List[DockerContainer] = - postgresContainer :: super.dockerContainers -} - -class PostgresReadyChecker(user: String, password: String, port: Option[Int] = None) - extends DockerReadyChecker { - - override def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor, - ec: ExecutionContext, timeout: Duration) = - container - .getPorts() - .map(ports => - Try { - Class.forName("org.postgresql.Driver") - val url = s"jdbc:postgresql://${docker.host}:${port.getOrElse(ports.values.head)}/" - Option(DriverManager.getConnection(url, user, password)).map(_.close).isDefined - }.getOrElse(false)) -} diff --git a/samples/src/main/scala/com/whisk/docker/DockerZookeeperService.scala b/samples/src/main/scala/com/whisk/docker/DockerZookeeperService.scala deleted file mode 100644 index f90ea37..0000000 --- a/samples/src/main/scala/com/whisk/docker/DockerZookeeperService.scala +++ /dev/null @@ -1,11 +0,0 @@ -package com.whisk.docker - -trait DockerZookeeperService extends DockerKit { - - val zookeeperContainer = DockerContainer("jplock/zookeeper:3.4.6") - .withPorts(2181 -> None) - .withReadyChecker(DockerReadyChecker.LogLineContains("binding to port")) - - abstract override def dockerContainers: List[DockerContainer] = - zookeeperContainer :: super.dockerContainers -} diff --git a/samples/src/main/scala/com/whisk/docker/testkit/DockerElasticsearchService.scala b/samples/src/main/scala/com/whisk/docker/testkit/DockerElasticsearchService.scala new file mode 100644 index 0000000..6c71e3c --- /dev/null +++ b/samples/src/main/scala/com/whisk/docker/testkit/DockerElasticsearchService.scala @@ -0,0 +1,38 @@ +package com.whisk.docker.testkit + +import java.util.UUID + +import com.whisk.docker.testkit.scalatest.DockerTestKitForAll +import org.scalatest.Suite + +import scala.concurrent.duration._ + +trait DockerElasticsearchService extends DockerTestKitForAll { + self: Suite => + + val DefaultElasticsearchHttpPort = 9200 + val DefaultElasticsearchClientPort = 9300 + val EsClusterName: String = UUID.randomUUID().toString + + protected val elasticsearchContainer = + ContainerSpec("docker.elastic.co/elasticsearch/elasticsearch:6.2.4") + .withExposedPorts(DefaultElasticsearchHttpPort, DefaultElasticsearchClientPort) + .withEnv( + "http.host=0.0.0.0", + "xpack.security.enabled=false", + "http.cors.enabled: true", + "http.cors.allow-origin: \"*\"", + s"cluster.name=$EsClusterName", + "discovery.type=single-node", + "ES_JAVA_OPTS=-Xms512m -Xmx512m" + ) + .withReadyChecker( + DockerReadyChecker + .HttpResponseCode(DefaultElasticsearchHttpPort, "/") + .within(100.millis) + .looped(20, 1250.millis) + ) + .toContainer + + override val managedContainers: ManagedContainers = elasticsearchContainer.toManagedContainer +} diff --git a/samples/src/main/scala/com/whisk/docker/testkit/DockerMongodbService.scala b/samples/src/main/scala/com/whisk/docker/testkit/DockerMongodbService.scala new file mode 100644 index 0000000..03411f5 --- /dev/null +++ b/samples/src/main/scala/com/whisk/docker/testkit/DockerMongodbService.scala @@ -0,0 +1,17 @@ +package com.whisk.docker.testkit + +import com.whisk.docker.testkit.scalatest.DockerTestKitForAll +import org.scalatest.Suite + +trait DockerMongodbService extends DockerTestKitForAll { + self: Suite => + + val DefaultMongodbPort = 27017 + + val mongodbContainer = ContainerSpec("mongo:3.4.8") + .withExposedPorts(DefaultMongodbPort) + .withReadyChecker(DockerReadyChecker.LogLineContains("waiting for connections on port")) + .toContainer + + override val managedContainers: ManagedContainers = mongodbContainer.toManagedContainer +} diff --git a/samples/src/main/scala/com/whisk/docker/testkit/DockerMysqlService.scala b/samples/src/main/scala/com/whisk/docker/testkit/DockerMysqlService.scala new file mode 100644 index 0000000..c3e5188 --- /dev/null +++ b/samples/src/main/scala/com/whisk/docker/testkit/DockerMysqlService.scala @@ -0,0 +1,36 @@ +package com.whisk.docker.testkit + +import com.whisk.docker.testkit.scalatest.DockerTestKitForAll +import org.scalatest.Suite + +import scala.concurrent.duration._ + +class MysqlContainer(image: String) extends BaseContainer { + + val AdvertisedPort = 3306 + val User = "root" + val Password = "test" + val Database = "test" + + override val spec: ContainerSpec = { + ContainerSpec(image) + .withExposedPorts(AdvertisedPort) + .withReadyChecker( + DockerReadyChecker + .Jdbc( + driverClass = "com.mysql.jdbc.Driver", + user = User, + password = Some(Password), + database = Some(Database) + ) + .looped(25, 1.second) + ) + } +} + +trait DockerMysqlService extends DockerTestKitForAll { self: Suite => + + val mysqlContainer = new MysqlContainer("quay.io/whisk/fastboot-mysql:5.7.19") + + override val managedContainers: ManagedContainers = mysqlContainer.toManagedContainer +} diff --git a/samples/src/main/scala/com/whisk/docker/testkit/DockerPostgresService.scala b/samples/src/main/scala/com/whisk/docker/testkit/DockerPostgresService.scala new file mode 100644 index 0000000..2eb0fb0 --- /dev/null +++ b/samples/src/main/scala/com/whisk/docker/testkit/DockerPostgresService.scala @@ -0,0 +1,31 @@ +package com.whisk.docker.testkit + +import com.spotify.docker.client.messages.PortBinding +import com.whisk.docker.testkit.scalatest.DockerTestKitForAll +import org.scalatest.Suite + +import scala.concurrent.duration._ + +trait DockerPostgresService extends DockerTestKitForAll { self: Suite => + + def PostgresAdvertisedPort = 5432 + def PostgresExposedPort = 44444 + val PostgresUser = "nph" + val PostgresPassword = "suitup" + + val postgresContainer = ContainerSpec("postgres:9.6.5") + .withPortBindings((PostgresAdvertisedPort, PortBinding.of("0.0.0.0", PostgresExposedPort))) + .withEnv(s"POSTGRES_USER=$PostgresUser", s"POSTGRES_PASSWORD=$PostgresPassword") + .withReadyChecker( + DockerReadyChecker + .Jdbc( + driverClass = "org.postgresql.Driver", + user = PostgresUser, + password = Some(PostgresPassword) + ) + .looped(15, 1.second) + ) + .toContainer + + override val managedContainers: ManagedContainers = postgresContainer.toManagedContainer +} diff --git a/scalatest/src/main/scala/com/whisk/docker/scalatest/DockerTestKit.scala b/scalatest/src/main/scala/com/whisk/docker/scalatest/DockerTestKit.scala deleted file mode 100644 index aac8267..0000000 --- a/scalatest/src/main/scala/com/whisk/docker/scalatest/DockerTestKit.scala +++ /dev/null @@ -1,29 +0,0 @@ -package com.whisk.docker.scalatest - -import com.whisk.docker.DockerKit -import org.scalatest.concurrent.ScalaFutures -import org.scalatest.time._ -import org.scalatest.{BeforeAndAfterAll, Suite} -import org.slf4j.LoggerFactory - -trait DockerTestKit extends BeforeAndAfterAll with ScalaFutures with DockerKit { self: Suite => - - private lazy val log = LoggerFactory.getLogger(this.getClass) - - def dockerInitPatienceInterval = - PatienceConfig(scaled(Span(20, Seconds)), scaled(Span(10, Millis))) - - def dockerPullImagesPatienceInterval = - PatienceConfig(scaled(Span(1200, Seconds)), scaled(Span(250, Millis))) - - override def beforeAll(): Unit = { - super.beforeAll() - startAllOrFail() - } - - override def afterAll(): Unit = { - stopAllQuietly() - super.afterAll() - - } -} diff --git a/scalatest/src/main/scala/com/whisk/docker/testkit/scalatest/DockerTestKitForAll.scala b/scalatest/src/main/scala/com/whisk/docker/testkit/scalatest/DockerTestKitForAll.scala new file mode 100644 index 0000000..b13d5ec --- /dev/null +++ b/scalatest/src/main/scala/com/whisk/docker/testkit/scalatest/DockerTestKitForAll.scala @@ -0,0 +1,50 @@ +package com.whisk.docker.testkit.scalatest + +import java.util.concurrent.ForkJoinPool + +import com.spotify.docker.client.{DefaultDockerClient, DockerClient} +import com.whisk.docker.testkit._ +import org.scalatest.{Args, Status, Suite, SuiteMixin} + +import scala.concurrent.ExecutionContext +import scala.language.implicitConversions + +trait DockerTestKitForAll extends SuiteMixin { self: Suite => + + val dockerClient: DockerClient = DefaultDockerClient.fromEnv().build() + + val dockerExecutionContext: ExecutionContext = ExecutionContext.fromExecutor(new ForkJoinPool()) + + val managedContainers: ManagedContainers + + val dockerTestTimeouts: DockerTestTimeouts = DockerTestTimeouts.Default + + implicit lazy val dockerExecutor: ContainerCommandExecutor = + new ContainerCommandExecutor(dockerClient) + + lazy val containerManager = new DockerContainerManager( + managedContainers, + dockerExecutor, + dockerTestTimeouts, + dockerExecutionContext + ) + + abstract override def run(testName: Option[String], args: Args): Status = { + containerManager.start() + afterStart() + try { + super.run(testName, args) + } finally { + try { + beforeStop() + } finally { + containerManager.stop() + } + } + } + + def afterStart(): Unit = {} + + def beforeStop(): Unit = {} + +} diff --git a/scalatest/src/test/resources/logback-test.xml b/scalatest/src/test/resources/logback-test.xml deleted file mode 100644 index aea0412..0000000 --- a/scalatest/src/test/resources/logback-test.xml +++ /dev/null @@ -1,14 +0,0 @@ - - - - - - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n - - - - - - - \ No newline at end of file diff --git a/scalatest/src/test/scala/com/whisk/docker/AllAtOnceSpec.scala b/scalatest/src/test/scala/com/whisk/docker/AllAtOnceSpec.scala deleted file mode 100644 index 5a5a900..0000000 --- a/scalatest/src/test/scala/com/whisk/docker/AllAtOnceSpec.scala +++ /dev/null @@ -1,24 +0,0 @@ -package com.whisk.docker - -import com.whisk.docker.impl.spotify.DockerKitSpotify -import org.scalatest.time.{Second, Seconds, Span} -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers - -class AllAtOnceSpec - extends AnyFlatSpec - with Matchers - with DockerKitSpotify - with DockerElasticsearchService - with DockerCassandraService - with DockerNeo4jService - with DockerMongodbService - with PingContainerKit { - - implicit val pc: PatienceConfig = PatienceConfig(Span(20, Seconds), Span(1, Second)) - - "all containers" should "be ready at the same time" in { - dockerContainers.map(_.image).foreach(println) - dockerContainers.forall(c => isContainerReady(c).futureValue) shouldBe true - } -} diff --git a/scalatest/src/test/scala/com/whisk/docker/CassandraServiceSpec.scala b/scalatest/src/test/scala/com/whisk/docker/CassandraServiceSpec.scala deleted file mode 100644 index 1b0352a..0000000 --- a/scalatest/src/test/scala/com/whisk/docker/CassandraServiceSpec.scala +++ /dev/null @@ -1,26 +0,0 @@ -package com.whisk.docker - -import com.github.dockerjava.core.DefaultDockerClientConfig -import com.github.dockerjava.netty.NettyDockerCmdExecFactory -import com.whisk.docker.impl.dockerjava.{Docker, DockerJavaExecutorFactory} -import com.whisk.docker.scalatest.DockerTestKit -import org.scalatest.time.{Second, Seconds, Span} -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers - -class CassandraServiceSpec - extends AnyFlatSpec - with Matchers - with DockerCassandraService - with DockerTestKit { - - implicit val pc: PatienceConfig = PatienceConfig(Span(20, Seconds), Span(1, Second)) - - override implicit val dockerFactory: DockerFactory = new DockerJavaExecutorFactory( - new Docker(DefaultDockerClientConfig.createDefaultConfigBuilder().build(), - factory = new NettyDockerCmdExecFactory())) - - "cassandra node" should "be ready with log line checker" in { - isContainerReady(cassandraContainer).futureValue shouldBe true - } -} diff --git a/scalatest/src/test/scala/com/whisk/docker/DependencyGraphReadyCheckSpec.scala b/scalatest/src/test/scala/com/whisk/docker/DependencyGraphReadyCheckSpec.scala deleted file mode 100644 index a843dad..0000000 --- a/scalatest/src/test/scala/com/whisk/docker/DependencyGraphReadyCheckSpec.scala +++ /dev/null @@ -1,67 +0,0 @@ -package com.whisk.docker - -import com.whisk.docker.impl.spotify.DockerKitSpotify -import org.slf4j.LoggerFactory - -import scala.concurrent.Await -import scala.concurrent.duration._ -import scala.language.postfixOps -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers - -class DependencyGraphReadyCheckSpec extends AnyFlatSpec with Matchers with DockerKitSpotify { - - override val StartContainersTimeout = 45 seconds - - private lazy val log = LoggerFactory.getLogger(this.getClass) - - val zookeeperContainer = - DockerContainer("confluentinc/cp-zookeeper:3.1.2", name = Some("zookeeper")) - .withEnv("ZOOKEEPER_TICK_TIME=2000", "ZOOKEEPER_CLIENT_PORT=2181") - .withReadyChecker(DockerReadyChecker.LogLineContains("binding to port")) - - val kafkaContainer = DockerContainer("confluentinc/cp-kafka:3.1.2", name = Some("kafka")) - .withEnv("KAFKA_BROKER_ID=1", - "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181", - "KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092") - .withLinks(ContainerLink(zookeeperContainer, "zookeeper")) - .withHostname("kafka") - .withReadyChecker(DockerReadyChecker.LogLineContains("[Kafka Server 1], started")) - - val schemaRegistryContainer = DockerContainer("confluentinc/cp-schema-registry:3.1.2", - name = Some("schema_registry")) - .withEnv("SCHEMA_REGISTRY_HOST_NAME=schema_registry", - "SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181") - .withLinks(ContainerLink(zookeeperContainer, "zookeeper"), - ContainerLink(kafkaContainer, "kafka")) - .withReadyChecker(DockerReadyChecker.LogLineContains("Session establishment complete on server")) - - override def dockerContainers = - schemaRegistryContainer :: kafkaContainer :: zookeeperContainer :: super.dockerContainers - - "all containers except the leaves of the dependency graph" should "be ready after initialization" in { - startAllOrFail() - - try { - containerManager.isReady(zookeeperContainer).isCompleted shouldBe true - containerManager.isReady(kafkaContainer).isCompleted shouldBe true - containerManager.isReady(schemaRegistryContainer).isCompleted shouldBe false - - Await.ready(containerManager.isReady(schemaRegistryContainer), 45 seconds) - - containerManager.isReady(schemaRegistryContainer).isCompleted shouldBe true - } catch { - case e: RuntimeException => log.error("Test failed during readychecks", e) - } finally { - Await.ready(containerManager.stopRmAll(StopContainersTimeout), StopContainersTimeout) - } - } - - override def startAllOrFail(): Unit = { - Await.result(containerManager.pullImages(PullImagesTimeout), PullImagesTimeout) - containerManager.initReadyAll(StartContainersTimeout).map(_.map(_._2).forall(identity)) - sys.addShutdownHook( - containerManager.stopRmAll(StopContainersTimeout) - ) - } -} diff --git a/scalatest/src/test/scala/com/whisk/docker/DockerContainerLinkingSpec.scala b/scalatest/src/test/scala/com/whisk/docker/DockerContainerLinkingSpec.scala deleted file mode 100644 index dc830db..0000000 --- a/scalatest/src/test/scala/com/whisk/docker/DockerContainerLinkingSpec.scala +++ /dev/null @@ -1,46 +0,0 @@ -package com.whisk.docker - -import org.scalatest._ -import time._ - -import impl.dockerjava._ -import impl.spotify._ -import scalatest.DockerTestKit -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers -import scala.concurrent.duration.Duration - -abstract class DockerContainerLinkingSpec extends AnyFlatSpec with Matchers with DockerTestKit { - - lazy val cmdExecutor = implicitly[DockerCommandExecutor] - implicit val pc: PatienceConfig = PatienceConfig(Span(20, Seconds), Span(1, Second)) - implicit val defaultOpsTimeout: Duration = Duration.Inf - - val pingName = "ping" - val pongName = "pong" - val pingAlias = "pang" - - val pingService = DockerContainer("nginx:1.7.11", name = Some(pingName)) - - val pongService = DockerContainer("nginx:1.7.11", name = Some(pongName)) - .withLinks(ContainerLink(pingService, pingAlias)) - - override def dockerContainers = pingService :: pongService :: super.dockerContainers - - "A DockerContainer" should "be linked to the specified containers upon start" in { - val ping = cmdExecutor.inspectContainer(pingName) - val pongPing = cmdExecutor.inspectContainer(s"$pongName/$pingAlias") - - whenReady(ping) { pingState => - whenReady(pongPing) { pongPingState => - pingState should not be empty - pingState shouldBe pongPingState - } - } - } -} - -class SpotifyDockerContainerLinkingSpec extends DockerContainerLinkingSpec with DockerKitSpotify -class DockerJavaDockerContainerLinkingSpec - extends DockerContainerLinkingSpec - with DockerKitDockerJava diff --git a/scalatest/src/test/scala/com/whisk/docker/DockerContainerManagerSpec.scala b/scalatest/src/test/scala/com/whisk/docker/DockerContainerManagerSpec.scala deleted file mode 100644 index dd40e3f..0000000 --- a/scalatest/src/test/scala/com/whisk/docker/DockerContainerManagerSpec.scala +++ /dev/null @@ -1,115 +0,0 @@ -package com.whisk.docker - -import com.whisk.docker.DockerContainerManager._ -import com.whisk.docker.impl.dockerjava._ -import org.scalatest._ -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpecLike - -class DockerContainerManagerSpec extends AnyWordSpecLike with Matchers { - - "The DockerContainerManager" should { - "a list of containers with dependencies" should { - val linkedContainer1 = DockerContainer("nginx:1.7.11", name = Some("linkedContainer1")) - val linkedContainer2a = DockerContainer("nginx:1.7.11", name = Some("linkedContainer2a")) - .withLinks(ContainerLink(linkedContainer1, "linkedContainer1")) - val linkedContainer2b = DockerContainer("nginx:1.7.11", name = Some("linkedContainer2b")) - .withLinks(ContainerLink(linkedContainer1, "linkedContainer1")) - val linkedContainer3 = DockerContainer("nginx:1.7.11", name = Some("linkedContainer3")) - .withLinks(ContainerLink(linkedContainer2a, "linkedContainer2a")) - val linkedContainer4 = DockerContainer("nginx:1.7.11", name = Some("linkedContainer4")) - .withLinks(ContainerLink(linkedContainer3, "linkedContainer4")) - val linkedContainer5 = DockerContainer("nginx:1.7.11", name = Some("linkedContainer5")) - val linkedContainers = List(linkedContainer1, - linkedContainer2a, - linkedContainer2b, - linkedContainer3, - linkedContainer4, - linkedContainer5) - - "build a dependency graph from a list of containers with dependencies" in { - buildDependencyGraph(linkedContainers) shouldBe ContainerDependencyGraph( - containers = Seq(linkedContainer1, linkedContainer5), - dependants = Some( - ContainerDependencyGraph( - containers = Seq(linkedContainer2a, linkedContainer2b), - dependants = Some( - ContainerDependencyGraph( - containers = Seq(linkedContainer3), - dependants = Some(ContainerDependencyGraph( - containers = Seq(linkedContainer4) - )) - )) - )) - ) - } - - "build the dependency graph from an empty list of containers" in { - buildDependencyGraph(Seq.empty) shouldBe ContainerDependencyGraph( - containers = Seq.empty, - dependants = None - ) - } - - "initialize all containers taking into account their dependencies" in { - val dockerKit = new DockerKit with DockerKitDockerJava { - override def dockerContainers = linkedContainers ++ super.dockerContainers - } - dockerKit.startAllOrFail() - dockerKit.stopAllQuietly() - } - } - - "a list of containers with links" should { - - val unlinkedContainer1 = DockerContainer("nginx:1.7.11", name = Some("unlinkedContainer1")) - val unlinkedContainer2a = DockerContainer("nginx:1.7.11", name = Some("unlinkedContainer2a")) - .withUnlinkedDependencies(unlinkedContainer1) - val unlinkedContainer2b = DockerContainer("nginx:1.7.11", name = Some("unlinkedContainer2b")) - .withUnlinkedDependencies(unlinkedContainer1) - val unlinkedContainer3 = DockerContainer("nginx:1.7.11", name = Some("unlinkedContainer3")) - .withUnlinkedDependencies(unlinkedContainer2a) - val unlinkedContainer4 = DockerContainer("nginx:1.7.11", name = Some("unlinkedContainer4")) - .withUnlinkedDependencies(unlinkedContainer3) - val unlinkedContainer5 = DockerContainer("nginx:1.7.11", name = Some("unlinkedContainer5")) - val unlinkedContainers = List(unlinkedContainer1, - unlinkedContainer2a, - unlinkedContainer2b, - unlinkedContainer3, - unlinkedContainer4, - unlinkedContainer5) - - "build a dependency graph from a list of containers" in { - buildDependencyGraph(unlinkedContainers) shouldBe ContainerDependencyGraph( - containers = Seq(unlinkedContainer1, unlinkedContainer5), - dependants = Some( - ContainerDependencyGraph( - containers = Seq(unlinkedContainer2a, unlinkedContainer2b), - dependants = Some( - ContainerDependencyGraph( - containers = Seq(unlinkedContainer3), - dependants = Some(ContainerDependencyGraph( - containers = Seq(unlinkedContainer4) - )) - )) - )) - ) - } - - "build the dependency graph from an empty list of containers" in { - buildDependencyGraph(Seq.empty) shouldBe ContainerDependencyGraph( - containers = Seq.empty, - dependants = None - ) - } - - "initialize all containers taking into account their dependencies" in { - val dockerKit = new DockerKit with DockerKitDockerJava { - override def dockerContainers = unlinkedContainers ++ super.dockerContainers - } - dockerKit.startAllOrFail() - dockerKit.stopAllQuietly() - } - } - } -} diff --git a/scalatest/src/test/scala/com/whisk/docker/ElasticsearchServiceSpec.scala b/scalatest/src/test/scala/com/whisk/docker/ElasticsearchServiceSpec.scala deleted file mode 100644 index cc1f90b..0000000 --- a/scalatest/src/test/scala/com/whisk/docker/ElasticsearchServiceSpec.scala +++ /dev/null @@ -1,28 +0,0 @@ -package com.whisk.docker - -import com.whisk.docker.impl.dockerjava.DockerKitDockerJava -import com.whisk.docker.scalatest.DockerTestKit -import org.scalatest._ -import org.scalatest.time._ -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers - -import scala.concurrent.duration.Duration - -class ElasticsearchServiceSpec - extends AnyFlatSpec - with Matchers - with DockerElasticsearchService - with DockerTestKit - with DockerKitDockerJava { - - implicit val pc: PatienceConfig = PatienceConfig(Span(20, Seconds), Span(1, Second)) - implicit val defaultOpsTimeout: Duration = Duration.Inf - - "elasticsearch container" should "be ready" in { - isContainerReady(elasticsearchContainer).futureValue shouldBe true - elasticsearchContainer.getPorts().futureValue.get(9300) should not be empty - elasticsearchContainer.getIpAddresses().futureValue should not be (Seq.empty) - } - -} diff --git a/scalatest/src/test/scala/com/whisk/docker/KafkaServiceSpec.scala b/scalatest/src/test/scala/com/whisk/docker/KafkaServiceSpec.scala deleted file mode 100644 index f532eb9..0000000 --- a/scalatest/src/test/scala/com/whisk/docker/KafkaServiceSpec.scala +++ /dev/null @@ -1,23 +0,0 @@ -package com.whisk.docker - -import com.whisk.docker.impl.spotify.DockerKitSpotify -import com.whisk.docker.scalatest.DockerTestKit -import org.scalatest._ -import org.scalatest.time._ -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers - -class KafkaServiceSpec - extends AnyFlatSpec - with Matchers - with DockerKafkaService - with DockerTestKit - with DockerKitSpotify { - - implicit val pc: PatienceConfig = PatienceConfig(Span(20, Seconds), Span(1, Second)) - - "kafka container" should "be ready" in { - isContainerReady(kafkaContainer).futureValue shouldBe true - } - -} diff --git a/scalatest/src/test/scala/com/whisk/docker/MongodbServiceSpec.scala b/scalatest/src/test/scala/com/whisk/docker/MongodbServiceSpec.scala deleted file mode 100644 index cf11427..0000000 --- a/scalatest/src/test/scala/com/whisk/docker/MongodbServiceSpec.scala +++ /dev/null @@ -1,26 +0,0 @@ -package com.whisk.docker - -import com.whisk.docker.impl.spotify.DockerKitSpotify -import com.whisk.docker.scalatest.DockerTestKit -import org.scalatest.time.{Second, Seconds, Span} -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers - -import scala.concurrent.duration.Duration - -class MongodbServiceSpec - extends AnyFlatSpec - with Matchers - with DockerTestKit - with DockerKitSpotify - with DockerMongodbService { - - implicit val pc: PatienceConfig = PatienceConfig(Span(20, Seconds), Span(1, Second)) - implicit val defaultOpsTimeout: Duration = Duration.Inf - - "mongodb node" should "be ready with log line checker" in { - isContainerReady(mongodbContainer).futureValue shouldBe true - mongodbContainer.getPorts().futureValue.get(27017) should not be empty - mongodbContainer.getIpAddresses().futureValue should not be Seq.empty - } -} diff --git a/scalatest/src/test/scala/com/whisk/docker/Neo4jServiceSpec.scala b/scalatest/src/test/scala/com/whisk/docker/Neo4jServiceSpec.scala deleted file mode 100644 index 5e2458b..0000000 --- a/scalatest/src/test/scala/com/whisk/docker/Neo4jServiceSpec.scala +++ /dev/null @@ -1,23 +0,0 @@ -package com.whisk.docker - -import com.spotify.docker.client.DefaultDockerClient -import com.whisk.docker.impl.spotify.SpotifyDockerFactory -import com.whisk.docker.scalatest.DockerTestKit -import org.scalatest._ -import org.scalatest.concurrent.ScalaFutures -import org.scalatest.time._ -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers - -class Neo4jServiceSpec extends AnyFlatSpec with Matchers with DockerTestKit with DockerNeo4jService { - - implicit val pc: PatienceConfig = PatienceConfig(Span(20, Seconds), Span(1, Second)) - - override implicit val dockerFactory: DockerFactory = new SpotifyDockerFactory( - DefaultDockerClient.fromEnv().build()) - - "neo4j container" should "be ready" in { - isContainerReady(neo4jContainer).futureValue shouldBe true - } - -} diff --git a/scalatest/src/test/scala/com/whisk/docker/PingContainerKit.scala b/scalatest/src/test/scala/com/whisk/docker/PingContainerKit.scala deleted file mode 100644 index 6fd3742..0000000 --- a/scalatest/src/test/scala/com/whisk/docker/PingContainerKit.scala +++ /dev/null @@ -1,16 +0,0 @@ -package com.whisk.docker - -import com.whisk.docker.scalatest.DockerTestKit -import org.scalatest.Suite - -trait PingContainerKit extends DockerTestKit { self: Suite => - - val pingContainer = DockerContainer("nginx:1.7.11") - - val pongContainer = DockerContainer("nginx:1.7.11") - .withPorts(80 -> None) - .withReadyChecker( - DockerReadyChecker.HttpResponseCode(port = 80, path = "/", host = None, code = 200)) - - abstract override def dockerContainers = pingContainer :: pongContainer :: super.dockerContainers -} diff --git a/scalatest/src/test/scala/com/whisk/docker/PostgresServiceSpec.scala b/scalatest/src/test/scala/com/whisk/docker/PostgresServiceSpec.scala deleted file mode 100644 index cec9d18..0000000 --- a/scalatest/src/test/scala/com/whisk/docker/PostgresServiceSpec.scala +++ /dev/null @@ -1,24 +0,0 @@ -package com.whisk.docker - -import com.spotify.docker.client.DefaultDockerClient -import com.whisk.docker.impl.spotify.SpotifyDockerFactory -import com.whisk.docker.scalatest.DockerTestKit -import org.scalatest.time.{Second, Seconds, Span} -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers - -class PostgresServiceSpec - extends AnyFlatSpec - with Matchers - with DockerTestKit - with DockerPostgresService { - - implicit val pc: PatienceConfig = PatienceConfig(Span(20, Seconds), Span(1, Second)) - - override implicit val dockerFactory: DockerFactory = new SpotifyDockerFactory( - DefaultDockerClient.fromEnv().build()) - - "postgres node" should "be ready with log line checker" in { - isContainerReady(postgresContainer).futureValue shouldBe true - } -} diff --git a/scalatest/src/test/scala/com/whisk/docker/ZookeeperServiceSpec.scala b/scalatest/src/test/scala/com/whisk/docker/ZookeeperServiceSpec.scala deleted file mode 100644 index 7aed8a4..0000000 --- a/scalatest/src/test/scala/com/whisk/docker/ZookeeperServiceSpec.scala +++ /dev/null @@ -1,23 +0,0 @@ -package com.whisk.docker - -import com.whisk.docker.impl.dockerjava.DockerKitDockerJava -import com.whisk.docker.scalatest.DockerTestKit -import org.scalatest._ -import org.scalatest.time._ -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers - -class ZookeeperServiceSpec - extends AnyFlatSpec - with Matchers - with DockerZookeeperService - with DockerTestKit - with DockerKitDockerJava { - - implicit val pc: PatienceConfig = PatienceConfig(Span(20, Seconds), Span(1, Second)) - - "zookeeper container" should "be ready" in { - isContainerReady(zookeeperContainer).futureValue shouldBe true - } - -} diff --git a/specs2/src/main/scala/com/whisk/docker/specs2/BeforeAfterAllStopOnError.scala b/specs2/src/main/scala/com/whisk/docker/specs2/BeforeAfterAllStopOnError.scala deleted file mode 100644 index c96edee..0000000 --- a/specs2/src/main/scala/com/whisk/docker/specs2/BeforeAfterAllStopOnError.scala +++ /dev/null @@ -1,18 +0,0 @@ -package com.whisk.docker.specs2 - -import org.specs2.specification.core.{Fragments, SpecificationStructure} -import org.specs2.specification.create.FragmentsFactory - -trait BeforeAfterAllStopOnError extends SpecificationStructure with FragmentsFactory { - - def beforeAll(): Unit - def afterAll(): Unit - - override def map(fs: => Fragments) = - super - .map(fs) - .prepend( - fragmentFactory.step(beforeAll()).stopOnError - ) - .append(fragmentFactory.step(afterAll())) -} diff --git a/specs2/src/main/scala/com/whisk/docker/specs2/DockerTestKit.scala b/specs2/src/main/scala/com/whisk/docker/specs2/DockerTestKit.scala deleted file mode 100644 index 9cdf503..0000000 --- a/specs2/src/main/scala/com/whisk/docker/specs2/DockerTestKit.scala +++ /dev/null @@ -1,16 +0,0 @@ -package com.whisk.docker.specs2 - -import com.whisk.docker.DockerKit -import org.slf4j.LoggerFactory - -trait DockerTestKit extends BeforeAfterAllStopOnError with DockerKit { - private lazy val log = LoggerFactory.getLogger(this.getClass) - - def beforeAll(): Unit = { - startAllOrFail() - } - - def afterAll(): Unit = { - stopAllQuietly() - } -} diff --git a/specs2/src/test/scala/com/whisk/docker/AllAtOnceSpec.scala b/specs2/src/test/scala/com/whisk/docker/AllAtOnceSpec.scala deleted file mode 100644 index faf5eb8..0000000 --- a/specs2/src/test/scala/com/whisk/docker/AllAtOnceSpec.scala +++ /dev/null @@ -1,29 +0,0 @@ -package com.whisk.docker - -import org.specs2._ -import org.specs2.specification.core.Env -import scala.concurrent._ -import scala.concurrent.duration._ - -import org.specs2.concurrent.ExecutionEnv - -class AllAtOnceSpec(env: Env) - extends Specification - with DockerTestKitDockerJava - with DockerElasticsearchService - with DockerCassandraService - with DockerNeo4jService - with DockerMongodbService - with PingContainerKit { - - implicit val ee: ExecutionEnv = env.executionEnv - implicit val ec: ExecutionContext = env.executionContext - - def is = s2""" - The all containers should be ready at the same time $x1 - """ - def x1 = { - dockerContainers.map(_.image).foreach(println) - Future.sequence(dockerContainers.map(isContainerReady)) must contain(beTrue).forall.await - } -} diff --git a/specs2/src/test/scala/com/whisk/docker/CassandraServiceSpec.scala b/specs2/src/test/scala/com/whisk/docker/CassandraServiceSpec.scala deleted file mode 100644 index c711146..0000000 --- a/specs2/src/test/scala/com/whisk/docker/CassandraServiceSpec.scala +++ /dev/null @@ -1,22 +0,0 @@ -package com.whisk.docker - -import com.whisk.docker.specs2.DockerTestKit -import org.specs2._ -import org.specs2.concurrent.ExecutionEnv -import org.specs2.specification.core.Env - -class CassandraServiceSpec(env: Env) - extends Specification - with DockerTestKitDockerJava - with DockerCassandraService - with DockerTestKit { - - implicit val ee: ExecutionEnv = env.executionEnv - - def is = - s2""" - The cassandra node should be ready with log line checker $x1 - """ - - def x1 = isContainerReady(cassandraContainer) must beTrue.await -} diff --git a/specs2/src/test/scala/com/whisk/docker/DockerTestKitDockerJava.scala b/specs2/src/test/scala/com/whisk/docker/DockerTestKitDockerJava.scala deleted file mode 100644 index dfe28ca..0000000 --- a/specs2/src/test/scala/com/whisk/docker/DockerTestKitDockerJava.scala +++ /dev/null @@ -1,6 +0,0 @@ -package com.whisk.docker - -import com.whisk.docker.impl.dockerjava.DockerKitDockerJava -import com.whisk.docker.specs2.DockerTestKit - -trait DockerTestKitDockerJava extends DockerTestKit with DockerKitDockerJava {} diff --git a/specs2/src/test/scala/com/whisk/docker/ElasticsearchServiceSpec.scala b/specs2/src/test/scala/com/whisk/docker/ElasticsearchServiceSpec.scala deleted file mode 100644 index a647253..0000000 --- a/specs2/src/test/scala/com/whisk/docker/ElasticsearchServiceSpec.scala +++ /dev/null @@ -1,24 +0,0 @@ -package com.whisk.docker - -import com.whisk.docker.specs2.DockerTestKit -import org.specs2._ -import org.specs2.specification.core.Env -import scala.concurrent._ - -import org.specs2.concurrent.ExecutionEnv - -class ElasticsearchServiceSpec(env: Env) - extends Specification - with DockerTestKitDockerJava - with DockerElasticsearchService - with DockerTestKit { - - implicit val ee: ExecutionEnv = env.executionEnv - - def is = - s2""" - The elasticsearch container should be ready $x1 - """ - - def x1 = isContainerReady(elasticsearchContainer) must beTrue.await -} diff --git a/specs2/src/test/scala/com/whisk/docker/KafkaServiceSpec.scala b/specs2/src/test/scala/com/whisk/docker/KafkaServiceSpec.scala deleted file mode 100644 index df5322b..0000000 --- a/specs2/src/test/scala/com/whisk/docker/KafkaServiceSpec.scala +++ /dev/null @@ -1,22 +0,0 @@ -package com.whisk.docker - -import com.whisk.docker.specs2.DockerTestKit -import org.specs2._ -import org.specs2.concurrent.ExecutionEnv -import org.specs2.specification.core.Env - -class KafkaServiceSpec(env: Env) - extends Specification - with DockerTestKitDockerJava - with DockerKafkaService - with DockerTestKit { - - implicit val ee: ExecutionEnv = env.executionEnv - - def is = - s2""" - The Kafka container should be ready $x1 - """ - - def x1 = isContainerReady(kafkaContainer) must beTrue.await -} diff --git a/specs2/src/test/scala/com/whisk/docker/MongodbServiceSpec.scala b/specs2/src/test/scala/com/whisk/docker/MongodbServiceSpec.scala deleted file mode 100644 index 93ec332..0000000 --- a/specs2/src/test/scala/com/whisk/docker/MongodbServiceSpec.scala +++ /dev/null @@ -1,23 +0,0 @@ -package com.whisk.docker - -import com.whisk.docker.specs2.DockerTestKit -import org.specs2._ -import org.specs2.specification.core.Env -import scala.concurrent._ - -import org.specs2.concurrent.ExecutionEnv - -class MongodbServiceSpec(env: Env) - extends Specification - with DockerTestKitDockerJava - with DockerTestKit - with DockerMongodbService { - - implicit val ee: ExecutionEnv = env.executionEnv - - def is = s2""" - The mongodb container should be ready $x1 - """ - - def x1 = isContainerReady(mongodbContainer) must beTrue.await -} diff --git a/specs2/src/test/scala/com/whisk/docker/Neo4jServiceSpec.scala b/specs2/src/test/scala/com/whisk/docker/Neo4jServiceSpec.scala deleted file mode 100644 index aa7a241..0000000 --- a/specs2/src/test/scala/com/whisk/docker/Neo4jServiceSpec.scala +++ /dev/null @@ -1,26 +0,0 @@ -package com.whisk.docker - -import com.whisk.docker.specs2.DockerTestKit -import org.specs2._ -import org.specs2.specification.core.Env -import scala.concurrent._ -import scala.concurrent.duration._ - -import org.specs2.concurrent.ExecutionEnv - -class Neo4jServiceSpec(env: Env) - extends Specification - with DockerTestKitDockerJava - with DockerTestKit - with DockerNeo4jService { - - implicit val ee: ExecutionEnv = env.executionEnv - implicit val ec: ExecutionContext = env.executionContext - - def is = s2""" - The neo4j container should - be ready $x1 - """ - - def x1 = isContainerReady(neo4jContainer) must beTrue.await -} diff --git a/specs2/src/test/scala/com/whisk/docker/PingContainerKit.scala b/specs2/src/test/scala/com/whisk/docker/PingContainerKit.scala deleted file mode 100644 index a8dfcb1..0000000 --- a/specs2/src/test/scala/com/whisk/docker/PingContainerKit.scala +++ /dev/null @@ -1,15 +0,0 @@ -package com.whisk.docker - -import com.whisk.docker.specs2.DockerTestKit - -trait PingContainerKit extends DockerTestKit { - - val pingContainer = DockerContainer("nginx:1.7.11") - - val pongContainer = DockerContainer("nginx:1.7.11") - .withPorts(80 -> None) - .withReadyChecker( - DockerReadyChecker.HttpResponseCode(port = 80, path = "/", host = None, code = 200)) - - abstract override def dockerContainers = pingContainer :: pongContainer :: super.dockerContainers -} diff --git a/specs2/src/test/scala/com/whisk/docker/PostgresServiceSpec.scala b/specs2/src/test/scala/com/whisk/docker/PostgresServiceSpec.scala deleted file mode 100644 index c543630..0000000 --- a/specs2/src/test/scala/com/whisk/docker/PostgresServiceSpec.scala +++ /dev/null @@ -1,19 +0,0 @@ -package com.whisk.docker - -import org.specs2._ -import org.specs2.concurrent.ExecutionEnv -import org.specs2.specification.core.Env - -class PostgresServiceSpec(env: Env) - extends Specification - with DockerTestKitDockerJava - with DockerPostgresService { - - implicit val ee: ExecutionEnv = env.executionEnv - - def is = s2""" - The Postgres node should be ready with log line checker $x1 - """ - - def x1 = isContainerReady(postgresContainer) must beTrue.await -} diff --git a/specs2/src/test/scala/com/whisk/docker/ZookeeperServiceSpec.scala b/specs2/src/test/scala/com/whisk/docker/ZookeeperServiceSpec.scala deleted file mode 100644 index f603486..0000000 --- a/specs2/src/test/scala/com/whisk/docker/ZookeeperServiceSpec.scala +++ /dev/null @@ -1,19 +0,0 @@ -package com.whisk.docker - -import org.specs2._ -import org.specs2.concurrent.ExecutionEnv -import org.specs2.specification.core.Env - -class ZookeeperServiceSpec(env: Env) - extends Specification - with DockerTestKitDockerJava - with DockerZookeeperService { - - implicit val ee: ExecutionEnv = env.executionEnv - - def is = s2""" - The Zookeeper container should be ready $x1 - """ - - def x1 = isContainerReady(zookeeperContainer) must beTrue.await -} diff --git a/tests/src/test/scala/com/whisk/docker/testkit/test/ElasticsearchServiceTest.scala b/tests/src/test/scala/com/whisk/docker/testkit/test/ElasticsearchServiceTest.scala new file mode 100644 index 0000000..e42b2cb --- /dev/null +++ b/tests/src/test/scala/com/whisk/docker/testkit/test/ElasticsearchServiceTest.scala @@ -0,0 +1,13 @@ +package com.whisk.docker.testkit.test + +import com.whisk.docker.testkit.{ContainerState, DockerElasticsearchService} +import org.scalatest.funsuite.AnyFunSuite + +class ElasticsearchServiceTest extends AnyFunSuite with DockerElasticsearchService { + + test("test container started") { + assert(elasticsearchContainer.state().isInstanceOf[ContainerState.Ready], + "elasticsearch container is ready") + assert(elasticsearchContainer.mappedPortOpt(9200).nonEmpty, "elasticsearch port is exposed") + } +} diff --git a/tests/src/test/scala/com/whisk/docker/testkit/test/MongodbServiceTest.scala b/tests/src/test/scala/com/whisk/docker/testkit/test/MongodbServiceTest.scala new file mode 100644 index 0000000..716e03e --- /dev/null +++ b/tests/src/test/scala/com/whisk/docker/testkit/test/MongodbServiceTest.scala @@ -0,0 +1,12 @@ +package com.whisk.docker.testkit.test + +import com.whisk.docker.testkit.{ContainerState, DockerMongodbService} +import org.scalatest.funsuite.AnyFunSuite + +class MongodbServiceTest extends AnyFunSuite with DockerMongodbService { + + test("test container started") { + assert(mongodbContainer.state().isInstanceOf[ContainerState.Ready], "mongodb is ready") + assert(mongodbContainer.mappedPortOpt(27017).nonEmpty, "port 2017 is exposed") + } +} diff --git a/tests/src/test/scala/com/whisk/docker/testkit/test/MultiContainerTest.scala b/tests/src/test/scala/com/whisk/docker/testkit/test/MultiContainerTest.scala new file mode 100644 index 0000000..4d4af1d --- /dev/null +++ b/tests/src/test/scala/com/whisk/docker/testkit/test/MultiContainerTest.scala @@ -0,0 +1,21 @@ +package com.whisk.docker.testkit.test + +import com.whisk.docker.testkit._ +import org.scalatest.funsuite.AnyFunSuite + +class MultiContainerTest + extends AnyFunSuite + with DockerElasticsearchService + with DockerMongodbService { + + override val managedContainers = ContainerGroup.of(elasticsearchContainer, mongodbContainer) + + test("both containers should be ready") { + assert(elasticsearchContainer.state().isInstanceOf[ContainerState.Ready], + "elasticsearch container is ready") + assert(elasticsearchContainer.mappedPortOpt(9200).nonEmpty, "elasticsearch port is exposed") + + assert(mongodbContainer.state().isInstanceOf[ContainerState.Ready], "mongodb is ready") + assert(mongodbContainer.mappedPortOpt(27017).nonEmpty, "port 2017 is exposed") + } +} diff --git a/tests/src/test/scala/com/whisk/docker/testkit/test/MysqlServiceTest.scala b/tests/src/test/scala/com/whisk/docker/testkit/test/MysqlServiceTest.scala new file mode 100644 index 0000000..61b438a --- /dev/null +++ b/tests/src/test/scala/com/whisk/docker/testkit/test/MysqlServiceTest.scala @@ -0,0 +1,13 @@ +package com.whisk.docker.testkit.test + +import com.whisk.docker.testkit.{ContainerState, DockerMysqlService} +import org.scalatest.funsuite.AnyFunSuite + +class MysqlServiceTest extends AnyFunSuite with DockerMysqlService { + + test("test container started") { + assert(mysqlContainer.state().isInstanceOf[ContainerState.Ready], "mysql is ready") + assert(mysqlContainer.mappedPortOpt(mysqlContainer.AdvertisedPort).nonEmpty, + "mysql port exposed") + } +} diff --git a/tests/src/test/scala/com/whisk/docker/testkit/test/PostgresServiceTest.scala b/tests/src/test/scala/com/whisk/docker/testkit/test/PostgresServiceTest.scala new file mode 100644 index 0000000..f1ce660 --- /dev/null +++ b/tests/src/test/scala/com/whisk/docker/testkit/test/PostgresServiceTest.scala @@ -0,0 +1,13 @@ +package com.whisk.docker.testkit.test + +import com.whisk.docker.testkit.{ContainerState, DockerPostgresService} +import org.scalatest.funsuite.AnyFunSuite + +class PostgresServiceTest extends AnyFunSuite with DockerPostgresService { + + test("test container started") { + assert(postgresContainer.state().isInstanceOf[ContainerState.Ready], "postgres is ready") + assert(postgresContainer.mappedPortOpt(PostgresAdvertisedPort) === Some(PostgresExposedPort), + "postgres port exposed") + } +}