Introduction Last updated: 2022-12-29

Kuzminki is feature-rich query builder and access library for PostgreSQL written in Scala.

It is available with integration for ZIO 1 and ZIO 2 and a version that relies only on Scala ExecutionContext.The API follows the logic of SQL statements. As a result the code is easy to read and memorise while the resulting SQL statement is predictable. It is as type-checked as possible and does not use any wild-card types resulting in confusing errors.

Features:

  • Cached queries, meaning that the SQL string is built only once, for improved performance and re-usability.
  • Streaming from and to the database
  • Extensive support for JSONB fields
  • Support for Array fields
  • Rich support for timestamp fields
  • Support for sub-queries
  • Support for transactions
Kuzminki projects on GitHub

On Github for ZIO 1 https://github.com/karimagnusson/kuzminki-zio

On Github for ZIO 2 https://github.com/karimagnusson/kuzminki-zio-2

On Github for EC https://github.com/karimagnusson/kuzminki-ec

About

Kuzminki is feature-rich query builder and access library for PostgreSQL written in Scala. The approach of this library is to write SQL statements in Scala with the same structure as SQL is written. Rather than having the API natural to Scala logic, for example similar to how you work with collections, this approach seeks to make the Scala code as similar to the resulting SQL statement as possible. The goal of this approach is to make it easy to read the code and understand the resulting SQL statement. It should also make the API intuitive and easy to memorise. With this approach, it becomes practical to write quite complicated queries while still being able to understand what they do. For example subqueries and queries where column values are modified. With a different approach, the user would have to learn the libraries own unique logic. But since the logic of SQL is already known, such complexity becomes practical. With a feature-rich API, the user can find solutions and avoid writing raw SQL statements that are not checked by the compiler. The goal of this project is to offer a productive and readable API to work with Postgresql and take advantage of its many features.

Features

Kuzminki supports jsonb and array fields. When doing insert, update or delete, it offers one or more column values to be returned. It offers many options for searching and returning datetime column values. It supports subqueries, both as a condition in a search and collect values to be returned to the client. Transactions are supported, both as a way to do bulk-operations and to do multiple different operations. Data can be streamed to and from the database. Rows can be delivered to the client as, tuples, case classes or vectors. Data for insert can be a tuple or a case class. Types are checked as much as possible and wild-card types that result in unclear errors are not used. Kuzminki comes with native support for ZIO as a layer.

JSON

Postgresql comes with a jsonb field to store and query data in JSON format. Being able to take advantage of the jsonb field opens up great flexibility in organizing your data. You can work with structured and unstructured data to get the best of both worlds. Kuzminki offers extensive support for querying and updating jsonb fields. Also, Kuzminki offers the ability to return rows as JSON strings. This can be useful when, for example, you need to service JSON directly to the client you can do so without having to transform the data. You can organise how your object is formed from the table columns. For example if you need some columns to be in a nested object. If you need to create a big object from multiple tables, you can do so with a single query using subqueries. Take a look at the zio-http demo project for examples of these features.

Performance

Statements can be cached for better performance and reusability. This means that the SQL string is created only once. Execution time should be the same as running a raw statement with JDBC. All statements can be cached except for SELECT statements with optional WHERE arguments.

Only Postgres

Kuzminki supports only Postgresql. It could be adapted for use with other databases if there is interest in that. But given that it has support for many postgres specific features, support for another database would require it’s own project rather than a size fits all approach. Therefore, at the moment the goal is to deliver a good library for Postgres. That being said, there are other Postgres compatible databases that work with Kuzminki. For example CockroachDB. For those looking to scale up, it might be a good choice.


Installation

For ZIO 1
libraryDependencies += "io.github.karimagnusson" % "kuzminki-zio" % "0.9.5-RC4"
For ZIO 2
libraryDependencies += "io.github.karimagnusson" % "kuzminki-zio-2" % "0.9.5-RC4"
For EC
libraryDependencies += "io.github.karimagnusson" % "kuzminki-ec" % "0.9.5-RC4"

This documentation is for version 0.9.5-RC4. If you need an older version of this documentation:

Example

This example is for ZIO. For ZIO 2 refer to the respective github pages.

								import zio._
import zio.console._
import zio.blocking._
import kuzminki.api._

object ExampleApp extends zio.App {

  class Client extends Model("client") {
    val id = column[Int]("id")
    val username = column[String]("username")
    val age = column[Int]("age")
    def all = (id, username, age)
  }

  val client = Model.get[Client]

  val job = for {
    _ <- sql
      .insert(client)
      .cols2(t => (t.username, t.age))
      .values(("Joe", 35))
      .run
    
    _ <- sql
      .update(client)
      .set(_.age ==> 24)
      .where(_.id === 4)
      .run
    
    _ <- sql.delete(client).where(_.id === 7).run
    
    clients <- sql
      .select(client)
      .cols3(_.all)
      .where(_.age > 25)
      .limit(5)
      .run
    
    _ <- ZIO.foreach(clients) {
      case (id, username, age) =>
        putStrLn(s"$id $username $age")
    }
  } yield ()

  val dbConfig = DbConfig.forDb("company")
  val dbLayer = Kuzminki.layer(dbConfig)

  override def run(args: List[String]): ZIO[ZEnv, Nothing, ExitCode] = {
    job.provideCustomLayer(dbLayer).exitCode
  }
}
							

Results

                // query
val stm = sql.select(client).cols3(_.all).where(_.age > 25).limit(5)
stm.run: List[T]
stm.runHead: T
stm.runHeadOpt: Option[T]

// row as type (cecked by the compiler)
stm.runType[MyType]: List[MyType]
stm.runHeadType[MyType]: MyType
stm.runHeadOptType[MyType]: Option[MyType]

// modify row
implicit val toMyType: SomeRow => MyType = row => //...
stm.runAs[MyType]: List[MyType]
stm.runHeadAs[MyType]: MyType
stm.runHeadOptAs[MyType]: Option[MyType]

// operation
val stm = sql.update(client).set(_.age ==> 24).where(_.id === 4)
stm.run: Unit
stm.runNum: Int
              

Connecting to the database

Config

								val dbConfig = DbConfig
  .forDb("{DB-NAME}")
  .withMaxPoolSize(10) // default = 10
  .withMinPoolSize(4)  // default = 4
  .withHost("{HOST}")  // default = localhost
  .withPort("{PORT}")  // default = 5432
  .withUser("{USER}}")
  .withPassword("{PASSWORD}")
  .withOptions(Map(...))
            	

Layer

Create a layer to make the driver instance accessable under Has[Kuzminki]

For ZIO 1

								object MyApp extends zio.App {
  val job = // ...
  val dbLayer = Kuzminki.layer(DbConfig.forDb("company"))

  override def run(args: List[String]): ZIO[ZEnv, Nothing, ExitCode] = {
    job.provideCustomLayer(dbLayer).exitCode
  }
}
            	

For ZIO 2

                object MyApp extends ZIOAppDefault {
  // ...
  def run = job.provide(dbLayer)
}
              

Split connection

If you wish to have two connection pools, one for SELECT and another for INSERT, UPDATE, DELETE, you can use layerSplit. To create an instance rather than a Layer, use createSplit.

								// for .query .queryHead .queryHeadOpt
val getConfig = DbConfig.forDb("company")

// for .exec .execNum
val setConfig = DbConfig.forDb("company")

val dbLayer = Kuzminki.layerSplit(getConfig, setConfig)
            	

Model

Creating a model

Column types are listed under Data types

								import kuzminki.api._
import java.sql.Timestamp

class User extends Model("user_profile") {
  val id = column[Int]("id")
  val username = column[String]("username")
  val email = column[String]("email")
  val name = column[String]("name")
  val age = column[Int]("age")
  val gender = column[String]("gender")
  val country = column[String]("country")
  val city = column[String]("city")
  val discount = column[Int]("discount")
  val isActive = column[Boolean]("is_active")
  val created = column[Timestamp]("created")
}

Model.register[User]
            	

Custom methods

You can create custom methods to access columns that you regularly use.

								class Customer extends Model("customer") {
  val id = column[Int]("id")
  val userId = column[Int]("user_id")
  val spending = column[Int]("spending")
  def all = (id, userId, spending)
}
            	

Create a model instance

Model.register[User] creates an instance of the model for later use and makes sure there is only one instance of the model. Model.get[User] gets an existing instance of the model. If it does not exist, it is created.

								Model.register[User]
// ...
val user = Model.get[User]
            	

Select

Select query

You select the columns as tuple of model columns. The query will return tuple of the column types. In this case Seq[Tuple2[Int, String]]. If you need more than 22 columns you can use colsSeq or colsType. To order by ASC rather than DESC use orderBy(_.age.asc).

								sql
  .select(user)
  .cols2(t => (
    t.id,
    t.username
  ))
  .where(t => Seq(
    t.gender === "f",
    t.age > 25
  ))
  .orderBy(_.age.desc)
  .limit(10)
  .run
            	
								SELECT
  "id",
  "username"
FROM "user_profile"
WHERE "gender" = 'f'
AND "age" > 25
ORDER BY "age" DESC
LIMIT 10
            	

Row as case class

              case class SimpleUser(id: Int, username: String, email: String)

sql
  .select(user)
  .cols3(t => (
    t.id,
    t.username,
    t.email
  ))
  .where(_.age < 25)
  .runType[SimpleUser]
  // returns List[SimpleUser]
              

Row as Seq

              sql
  .select(user)
  .colsSeq(t => Seq(
    t.id,
    t.username,
    t.email
  ))
  .where(_.age < 25)
  .run
  // returns List[Vector[Any]]
              

Row as JSON

Get the row as a JSON string.

                sql
  .select(user)
  .colsJson(t => Seq(
    t.id,
    t.username
  ))
  .where(_.id === 5)
  .runHead
  // returns {"id": 5, "username": "Joe"}
              

Pick a name for columns and create a nested object.

                .colsJson(t => Seq(
  "id" -> t.id,
  t.username.as("username"),
  Fn.json(Seq(
    t.country,
    t.city
  )).as("location")
))
// returns {"id": 5, "username": "Joe", "location": {"country": "IT", "city": "Rome"}}
              

Result with a nested array from subquery.

                .colsJson(t => Seq(
  t.id,
  t.username,
  sql
    .select(travels)
    .colsJson(s => Seq(
      s.country,
      s.year
    ))
    .where(s.userId <=> t.id)
    .orderBy(_.year.desc)
    .asColumn // if you need the first object instead of an array, you can use .first after .asColumn
    .as("travels")
))
              

Get the result as JSON, using your favorite JSON library.

                // See PlayJsonLoader below
implicit val loadJson: Seq[Tuple2[String, Any]] => JsValue = data => PlayJsonLoader.load(data)

sql
  .select(user)
  .colsNamed(t => Seq(
    t.id,        // Column name is used as a key.
    t.username,  // If you want a different key:
    t.email      // ("user_id"  -> t.id)
  ))
  .where(_.age < 25)
  .runAs[JsValue]
  // returns List[JsValue]
              

Write something along these lines to use the JSON library of your choosing.

                import java.util.UUID
import java.sql.Time
import java.sql.Date
import java.sql.Timestamp
import play.api.libs.json._
import kuzminki.api.Jsonb

object PlayJsonLoader {

  val toJsValue: Any => JsValue = {
    case v: String      => JsString(v)
    case v: Boolean     => JsBoolean(v)
    case v: Short       => JsNumber(v)
    case v: Int         => JsNumber(v)
    case v: Long        => JsNumber(v)
    case v: Float       => JsNumber(v)
    case v: Double      => JsNumber(v)
    case v: BigDecimal  => JsNumber(v)
    case v: Time        => Json.toJson(v)
    case v: Date        => Json.toJson(v)
    case v: Timestamp   => Json.toJson(v)
    case v: UUID        => JsString(v.toString)
    case v: Jsonb       => Json.parse(v.value)
    case v: Option[_]   => v.map(toJsValue).getOrElse(JsNull)
    case v: Seq[_]      => JsArray(v.map(toJsValue))
    case v: JsValue     => v
    case _              => throw new Exception("Cannot convert to JsValue")
  }

  def load(data: Seq[Tuple2[String, Any]]): JsValue = {
    JsObject(data.map(p => (p._1, toJsValue(p._2))))
  }
}
              

Where

Refer to Operators for a list of operators.

								.where(_.id > 100)

.where(t => Seq(
  t.gender === "f",
  t.age > 25
))
            	

GROUP BY / HAVING

Refer to Operators for a list of operators.

                sql
.select(user)
.cols2(t => (
  t.gender,
  Agg.avg(t.age)
))
.where(_.age > 0)
.groupBy(_.gender)
.having(_.gender !== "")
.orderBy(t => Agg.avg(t.age).desc)
.run
              
                SELECT "gender", avg("age")::numeric
FROM "user_profile" WHERE "age" > 0
GROUP BY "gender"
HAVING "gender" != ''
ORDER BY avg("age")::numeric ASC
)
              

AND / OR

								import kuzminki.fn._

.where(t => Seq(
  t.age > 25,
  Or(
    t.country === "RU",
    t.country === "FR"
  )
))
// WHERE "age" > 25 AND ("country" == 'RU' OR "country" == 'FR')

.where(t => Or(
  And(
    t.country === "RU",
    t.city === "Moscow"
  ),
  And(
    t.country === "FR",
    t.city === "Paris"
  )
))
// WHERE ("country" == 'RU' AND "city" == 'Moscow') OR ("country" == 'FR' AND "city" == 'Paris')
            	

Optional conditions

Optional conditions for example from http GET request.

								.whereOpt(_.id > Some(100))

.whereOpt(t => Seq(
  t.gender === None,
  t.age > Some(25)
))
// WHERE "age" > 25

.whereOpt(t => Seq(
  t.age > Some(25),
  Or.opt(
    t.country === Some("RU"),
    t.country === Some("FR")
  )
))
// WHERE "age" > 25 AND ("country" == 'RU' OR "country" == 'FR')
            	

Distinct

                sql
  .select(user)
  .cols2(t => (
    t.username,
    t.age
  ))
  .distinct
  .all
  .orderBy(_.age.asc)
  .run
              
                SELECT DISTINCT "username", "age"
FROM "user_profile"
ORDER BY "age" ASC
)
              
                sql
  .select(user)
  .cols2(t => (
    t.username,
    t.age
  ))
  .distinctOn(_.age)
  .all
  .orderBy(_.age.asc)
  .run
              

DISTINCT ON

                SELECT DISTINCT ON ("age") "username", "age"
FROM "user_profile"
ORDER BY "age" ASC
)
              

Nested query

								class Newsletter extends Model("newsletter") {
  val email = column[String]("email")
  val isSubscribed = column[Boolean]("is_subscribed")
}

val newsletter = Model.get[Newsletter]

sql
  .select(user)
  .cols1(_.username)
  .where(_.email.in(
    sql
      .select(newsletter)
      .cols1(_.email)
      .where(_.isSubscribed === true)
  ))
  .run
            	
								SELECT "username"
FROM "user_profile"
WHERE "email" = ANY(
  SELECT "email"
  FROM "newsletter"
  WHERE "is_subscribed" = true
)
            	

Pages

                val pages = sql
  .select(user)
  .cols3(t => (
    t.id,
    t.firstName,
    t.lastName)
  ))
  .orderBy(_.id.asc)
  .asPages(10) // 10 rows on page

val job = for {
  next  <- pages.next
  page3 <- pages.page(3)
} yield (next, page3)
              

Cache

For increased performance and reusability queries can be cached. The SQL string will be created only once and you get the same performance as you get with raw queries.

								val stm = sql
  .select(user)
  .cols1(_.username)
  .all
  .orderBy(_.age.asc)
  .pickWhere2(t => (
    t.country.use === Arg,
    t.age.use > Arg
  ))
  .cache

stm.run(("CN", 25))
            	
								SELECT "username"
FROM "user_profile"
WHERE "country" = 'CN'
AND "age" > 25
ORDER BY "age" ASC
            	

Chche a query with no arguments.

                val stm = sql
  .select(user)
  .col2(t => (
    t.username,
    t.created
  ))
  .all
  .orderBy(_.created.desc)
  .cache

stm.run
              

Cache nested query.

                val stm = sql
  .select(user)
  .cols1(_.username)
  .all
  .pickWhere(t = (
    t.age.use > Arg,
    t.email.use.in(
      sql
        .select(newsletter)
        .cols1(_.email)
        .all
        .pickWhere1(_.isActive.use === Arg)
    )
  ))
  .cache

stm.run(25, true)
              
                SELECT "username" FROM "user_profile"
WHERE "age" > 25
AND "email" = ANY(
  SELECT "a"."email"
  FROM "client" "a"
  WHERE "a"."is_active" > true
)
              

Cache WHERE with HAVING

                val stm = sql
  .select(user)
  .cols2(t => (
    t.gender,
    Agg.avg(t.age)
  ))
  .all
  .groupBy(_.gender)
  .having(_.gender !== "")
  .orderBy(t => Agg.avg(t.age).desc)
  .pickWhere1(_.gender.use > Arg)
  .cache

stm.run(18)
              
                SELECT "gender", avg("age")::numeric
FROM "user_profile" WHERE "age" > 18
GROUP BY "gender"
HAVING "gender" != ''
ORDER BY avg("age")::numeric ASC
              

Cached with WHERE

You can use normal WHERE conditions with cached queries.

								val stm = sql
  .select(user)
  .cols1(_.username)
  .where(_.age > 25)
  .orderBy(_.age.asc)
  .pickWhere1(_.country.use === Arg)
  .cache

stm.run("CN")
            	
								SELECT "username"
FROM "user_profile"
WHERE "age" > 25
AND "country" = 'CN'
ORDER BY "age" ASC
            	

Join

Select join

To do Joins you just put two model instances as arguments and the models will be accessable under a and b

							sql
  .select(user, customer)
  .cols3(t => (
    t.a.id,
    t.a.username,
    t.b.spending
  ))
  .joinOn(_.id, _.userId)
  .where(t => Seq(
    t.a.age > 25,
    t.b.spending > 1000
  ))
  .orderBy(_.b.spending.desc)
  .limit(10)
  .run
  // returns List[Tuple3[Int, String, Int]]
            	
								SELECT
  "a"."id",
  "a"."username",
  "b"."spending"
FROM "user_profile" "a"
INNER JOIN "customer" "b"
ON "a"."id" = "b"."user_id"
WHERE "a"."age" > 25
AND "b"."spending" > 1000
ORDER BY "b"."spending" DESC
LIMIT 10
            	

Join types

The following joins are available. Refer to the section Null values to avoid problems that may come up with joins.

							.joinOn(_.id, _.userId) // INNER JOIN

.innerJoinOn(_.id, _.userId) // INNER JOIN

.leftJoinOn(_.id, _.userId) // LEFT JOIN

.leftOuterJoinOn(_.id, _.userId) // LEFT OUTER JOIN
 
.rightJoinOn(_.id, _.userId) // RIGHT JOIN

.rightOuterJoinOn(_.id, _.userId) // RIGHT OUTER JOIN

.fullOuterJoinOn(_.id, _.userId) // FULL OUTER JOIN

.crossJoin // CROSS JOIN
            	

Insert

Basic insert

							sql
  .insert(user)
  .cols2(t => (
    t.username,
    t.email
  ))
  .values(("bob", "bob@mail.com"))
  .run
            
If you need to insert more than the tuple limit of 22.
              sql
  .insert(user)
  .data(t => Seq(
    t.username ==> "bob",
    t.email ==> "bob@mail.com"
  ))
  .run
            
For increased performance and reusability queries can be cached.
              val stm = sql
  .insert(user)
  .cols2(t => (
    t.username,
    t.email
  ))
  .cache

stm.run(("bob", "bob@mail.com"))
            	
								INSERT INTO "user_profile" ("username", "email") VALUES ('bob', 'bob@mail.com')
            	

Insert case class

              case class User(name: String, email: String)

val stm = sql
  .insert(user)
  .cols2(t => (
    t.username,
    t.email
  ))
  .valuesType(User("Bob", "bob@mail.com"))
  .run

// cache
val stm = sql
  .insert(user)
  .cols2(t => (
    t.username,
    t.email
  ))
  .cache

stm.run(User("Bob", "bob@mail.com"))

              
                INSERT INTO "product" ("name", "price") VALUES ('Banana', 12.5)
              

Insert returning

							sql
  .insert(user)
  .cols2(t => (
    t.username,
    t.email
  ))
  .values(("bob", "bob@mail.com"))
  .returning3(t => (
    t.id,
    t.username,
    t.email
  ))
  .runHead

// cache
val stm = sql
  .insert(user)
  .cols2(t => (
    t.username,
    t.email
  ))
  .returning3(t => (
    t.id,
    t.username,
    t.email
  ))
  .cache

stm.run(("bob", "bob@mail.com"))
            	
								INSERT INTO "user_profile"
("username", "email")
VALUES ('bob', 'bob@mail.com')
RETURNING
  "id",
  "username",
  "email"
            	

Insert on conflict do nothing

You can take advantage of ON CONFLICT DO NOTHING to avoid errors on columns with UNIQUE constraint.

							  sql
  .insert(user)
  .cols2(t => (
    t.username,
    t.email
  ))
  .values(("bob", "bob@mail.com"))
  .onConflictDoNothing
  .run

// cache
val stm = sql
  .insert(user)
  .cols2(t => (
    t.username,
    t.email
  ))
  .onConflictDoNothing
  .cache

stm.run(("bob", "bob@mail.com"))
            	
								INSERT INTO "user_profile"
("username", "email")
VALUES ('bob', 'bob@mail.com')
ON CONFLICT DO NOTHING
            	

Upsert

The updated column has to be one of the columns you intend to insert.

							sql
  .insert(user)
  .cols2(t => (
    t.username,
    t.email
  ))
  .values(("bob", "bob@hotmail.com"))
  .onConflictOnColumn(_.username)
  .doUpdate(_.email) // .doNothing is also an option
  .run

// cache
val stm = sql
  .insert(user)
  .cols2(t => (
    t.username,
    t.email
  ))
  .onConflictOnColumn(_.username)
  .doUpdate(_.email)
  .cache

stm.run(("bob", "bob@hotmail.com"))
            	
								INSERT INTO "user_profile"
("username", "email")
VALUES ('bob', 'bob@mail.com')
ON CONFLICT ("username")
DO UPDATE SET "email" = 'bob@mail.com'
            	

Insert where not exists

If you need to avoid duplication on a column that does not have a unique constraint you can use whereNotExists. Also, if you are makeing multible insert statements concurrently, from a stream for example, you will run into problems using onConflictDoNothing.

							sql
  .insert(user)
  .cols2(t => (
    t.username,
    t.email
  ))
  .values(("bob", "bob@mail.com"))
  .whereNotExists(_.username)
  .run

// cache
val stm = sql
  .insert(user)
  .cols2(t => (
    t.username,
    t.email
  ))
  .whereNotExists(_.username)
  .cache

stm.run(("bob", "bob@mail.com"))
            	
								INSERT INTO "user_profile"
("username", "email")
SELECT 'bob', 'bob@mail.com'
WHERE NOT EXISTS (
  SELECT 1
  FROM "user_profile"
  WHERE "username" = 'bob'
)
            	

Insert from select

							sql
  .insert(newsletter)
  .cols1(_.email)
  .fromSelect(
    sql
      .select(user)
      .cols1(_.email)
      .where(_.isActive === true)
  )
  .run

// cache
val stm = sql
  .insert(newsletter)
  .cols1(_.email)
  .pickSelect(
    sql
      .select(user)
      .cols1(_.email)
      .pickWhere1(_.isActive.use === Arg)
  )
  .cache

stm.run(true)
            	
								INSERT INTO "newsletter" ("email")
SELECT "email"
FROM "user_profile"
WHERE "is_active" = true
            	

Insert many

              val stm = sql
  .insert(user)
  .cols2(t => (
    t.username,
    t.email
  ))
  .cache

stm.runList(Seq(
  ("bob", "bob@mail.com"),
  ("jane", "jane@mail.com"),
  ("jack", "jack@mail.com")
))
              

Update

Update statement

See Update operators

								sql
  .update(user)
  .set(_.country ==> "JP")
  .where(_.id === 103)
  .run

// cache
val stm = sql
  .update(user)
  .pickSet1(_.country.use ==> Arg)
  .pickWhere1(_.id.use === Arg)
  .cache

stm.run("JP", 103)
            	
								UPDATE "user" SET "country" = 'JP' WHERE id = 103
            	

Update returning

								sql
  .update(user)
  .set(t => Seq(
    t.country ==> "IS",
    t.city ==> "Reykjavik"
  ))
  .where(_.id === 31)
  .returning3(t => (
    t.id,
    t.country,
    t.city
  ))
  .runHeadOpt

// cache
val stm = sql
  .update(user)
  .pickSet2(t => (
    t.country.use ==> Arg,
    t.city.use ==> Arg
  ))
  .pickWhere1(_.id.use === Arg)
  .returning3(t => (
    t.id,
    t.country,
    t.city
  ))
  .cache

stm.runHeadOpt(("IS", "Reykjavik"), 31)
            	
								UPDATE "user_profile"
SET country = 'IS',
       city = 'Reykjavik'
WHERE "id" = 31
RETURNING
  "id",
  "country",
  "city"
            	

Delete

Delete statement

								sql
  .delete(user)
  .where(_.id === 103)
  .run

// cache
val stm = sql
  .delete(user)
  .pickWhere1(_.id.use === Arg)
  .cache

stm.run(103)
            	
								DELETE FROM "user_profile" WHERE "id" = 103
            	

Delete returning

								val stm = sql
  .delete(user)
  .where(_.id === 103)
  .returning1(_.email)
  .runHeadOpt

// cache
val stm = sql
  .delete(user)
  .pickWhere1(_.id.use === Arg)
  .returning1(_.email)

stm.runHeadOpt(103)
            	
								DELETE FROM "user_profile"
WHERE "id" = 103
RETURNING "email"
            	

Null values

Null values in results

Convert the column in the query from T to Option[T]

                cols1(_.city.asOpt)
              

Use Posgres's coalesce function to return a default value in case of null

                cols1(t => t.city.default("No city"))
              

Insert and update null values

                sql
  .insert(client)
  .cols2(t => (t.username, t.age.asOpt))
  .values(("Joe", None))
  .run
    
sql
  .update(client)
  .set(_.age.asOpt ==> None)
  .where(_.id === 4)
  .run
              

Aggregation

Count

								sql
  .count(user)
  .where(_.country === "IT")
  .runHead
            	
								SELECT count(*) FROM "user_profile" WHERE "country" = 'IT'
            	

Avg Max Min

								import kuzminki.api._
import kuzminki.fn._

sql
  .select(user)
  .cols3(t => (
    Agg.avg(t.age),
    Agg.max(t.age),
    Agg.min(t.age)
  ))
  .where(_.country === "US")
  .runHead
            	
								SELECT
  avg("age"),
  max("age"),
  min("age")
FROM "user_profile"
WHERE "country" = 'US'
            	

Streaming

Stream from the database

Streaming is done in batches. By default a batch of 100. For a larger batch use .stream(1000)

                sql
  .select(client)
  .cols3(_.all)
  .all
  .orderBy(_.id.asc)
  .stream // .streamBatch(1000)
  .map(makeLine)
  .run(fileSink(Paths.get("clints.txt")))
              

Stream into the database

The same logic can be used for UPDATE and DELETE.

                val insertStm = sql
  .insert(client)
  .cols2(t => (t.username, t.age))
  .cache

// insert one at a time.
readFileIntoStream("clints.txt")
  .map(makeTupleFromLine)
  .run(insertStm.asSink)

// insert in chunks of 100 using transaction.
readFileIntoStream("clints.txt")
  .map(makeTupleFromLine)
  .transduce(insertStm.collect(100))
  .run(insertStm.asChunkSink)
              

Transaction

Execute multiple operations

Do INSERT, UPDATE and DELETE in one transaction.

                sql.transaction(
  sql.insert(user).cols2(t => (t.username, t.age)).values(("Joe", 25)),
  sql.update(user).set(_.age ==> 31).where(_.id === 45),
  sql.delete(user).where(_.id === 83)
).run
              

Execute a list of cached statements

                val stm = sql
  .delete(user)
  .pickWhere1(_.id)
  .cache

val statements = (1 to 10).map(id => stm.render(id))

sql.transactionList(Seq(statements)).run
              

Debug

Print the query

                sql
  .select(client)
  .cols3(_.all)
  .where(_.age > 25)
  .limit(5)
  .printSqlWithArgs
  .run
// SELECT "id", "username", "age" FROM "client" WHERE "age" > 25 LIMIT 5

sql
  .update(client)
  .set(_.age ==> 24)
  .where(_.id === 4)
  .printSql
  .run
// UPDATE "client" SET "age" = ? WHERE id = ?
              

Fields

Jsonb field

https://www.postgresql.org/docs/11/functions-json.html

                {
  "name": "Angela Barton",
  "is_active": true,
  "company": "Magnafone",
  "address": "178 Howard Place, Gulf, Washington, 702",
  "latitude": 19.793713,
  "longitude": 86.513373,
  "tags": ["enim", "aliquip", "qui"],
  "residents": {
    "name": "Rick",
    "age": 31
  }
}
              
                class Customer extends Model("customer") {
  val data = column[Jsonb]("data")
}

sql
  .insert(customer)
  .cols1(_.data)
  .values(Jsonb(jsonString))
  .run

// select

sql
  .select(customer)
  .cols1(_.data ->> "company")
  .where(_.id === 3)
  .runHead // "Magnafone"


sql
  .select(customer)
  .cols1(_.data #>> Seq("residents", "name"))
  .where(_.id === 3)
  .runHead // "Rick"


sql
  .select(customer)
  .cols1(_.data -> "tags" ->> 1)
  .where(_.id === 3)
  .runHead // "aliquip"


sql
  .select(customer)
  .cols1(_.data -> "residents")
  .where(_.id === 3)
  .runHead // Jsonb({"name" : "Rick", "age" : 31})

sql
  .select(customer)
  .cols1(_.data - "residents")
  .where(_.id === 3)
  .runHead

sql
  .select(customer)
  .cols1(t => t.data || t.other)
  .where(_.id === 3)
  .runHead

// update

sql
  .update(customer)
  .set(_.data += Json.obj("address" -> "Somewhere 12"))
  .where(_.id === 3)
  .run

  
sql
  .update(customer)
  .set(_.data -= "address")
  .where(_.id === 3)
  .run


sql
  .update(customer)
  .set(_.data #-= Seq("residents", "age"))
  .where(_.id === 3)
  .run
              

Array field

Array fields can be created for all the available types.

                class Nums extends Model("demo") {
  val id = column[Int]("id")
  val numbers = column[Seq[Int]]("numbers")
}

val nums = Model.get[Nums]

for {
  id <- sql
    .insert(nums)
    .cols1(_.numbers)
    .values(List(1, 2, 3))
    .returning1(_.id)
    .run

  _ <- sql
    .update(nums)
    .set(_.numbers += 4)
    .where(_.id === id)
    .run

  numbers <- sql
    .select(nums)
    .cols1(_.numbers)
    .where(_.numbers ? 2)
    .run
} yield numbers // List[Vector[Int]]
              

Unique and sorted

                // add element and and make sure it is unique
.set(_.numbers.add(4))

// add list of element and and make sure they are unique
.set(_.numbers.add(List(8, 7, 7, 2)))

// same as add but sorted
.set(_.numbers.addAsc(4))
.set(_.numbers.addDesc(4))

// for jsonb array, a key has to be provided that should be unique and used for sorting
.set(_.numbers.addDesc(jsonObj, "index"))

// cache
.pickSet1(_.numbers.addAsc(Arg))
// to use a list as argument
.pickSet1(_.numbers.addAsc(ArgSeq))
              

Timestamp, Date, Time

                class Demo extends Model("demo") {
  val id = column[Int]("id")
  val eventTime = column[Time]("event_time")
  val eventDate = column[Date]("event_date")
  val updatedAt = column[Timestamp]("updated_at")
}

val demo = Model.get[Demo]

sql
  .update(demo)
  .set(t => Seq(
    t.eventTime += Fn.interval(hours = 3, minutes = 10),
    t.eventDate += Fn.interval(years = 1, days = 2),
    t.updatedAt += Fn.interval(months = 4, hours = 5)
  ))
  .where(_.id === 25)
  .run

sql
  .select(demo)
  .cols3(t => (
    t.eventTime.format("MM:HH"),
    t.eventDate.format("DD Mon YYYY"),
    t.updatedAt.format("DD Mon YYYY MM:HH")
  ))
  .where(_.id === 25)
  .runHead

sql
  .select(demo)
  .cols4(t => (
    t.id,
    t.eventDate.month,
    t.updatedAt.week
    t.updatedAt + Fn.interval(days = 10)
  ))
  .where(t => Seq(
    t.eventDate.year === 2022,
    t.eventDate.quarter === 2
  ))
  .run
              

Functions

Postgres functions

Use postgres functions to modify results.

                import kuzminki.api._
import kuzminki.fn._

class Profile extends Model("profile") {
  val firstName = column[String]("first_name")
  val lastName = column[String]("last_name")
  val bigNum = column[BigDecimal]("big_num")
}

val profile = Model.get[Profile]

sql
  .select(profile)
  .cols3(t => (
    Fn.concatWs(" ", t.firstName, t.lastName),
    Fn.initcap(t.lastName),
    Cast.asString(t.bigNum)
  ))
  .all
  .run
              

Create functions

Create your own function classes.

                import kuzminki.fn.StringFn

case class FullName(
  title: String,
  first: TypeCol[String],
  second: TypeCol[String]
) extends StringFn {
  val name = "full_name"
  val template = s"concat_ws(' ', '$title', %s, %s)"
  val cols = Vector(first, second)
}

sql
  .select(user)
  .cols2(t => (
    t.id,
    FullName("Mr", t.firstName, t.lastName)
  ))
  .where(_.id === 10)
  .runHead

// If you need to have the driver fill in arguments:

case class FullNameParam(
  title: String,
  first: TypeCol[String],
  second: TypeCol[String]
) extends StringParamsFn {
  val name = "full_name"
  val template = s"concat_ws(' ', ?, %s, %s)"
  val cols = Vector(first, second)
  val params = Vector(title)
}
              

Available functions

https://www.postgresql.org/docs/current/functions-string.html

Function Column type
Fn.coalesce(col, default) String
Fn.concat(col, ...) String
Fn.concatWs(glue, col, ...) String
Fn.substr(col, start) String
Fn.substr(col, start, len) String
Fn.trim(col) String
Fn.upper(col) String
Fn.lower(col) String
Fn.initcap(col) String
Fn.round(col) BigDecimal
Fn.roundStr(col) BigDecimal
Cast.asString(col) Any
Cast.asShort(col) Any
Cast.asInt(col) Any
Cast.asLong(col) Any
Cast.asFloat(col) Any
Cast.asDouble(col) Any
Cast.asBigDecimal(col) Any

Raw SQL

Select

								def rawStm(country: String, minAge: Int) =
  rsql"""SELECT * FROM "user_profile" WHERE country = $country AND age > $minAge"""

val job = for {
  users <- db.query(rawStm("TR", 25))
} yield users
							

Operations

								val username = "bob"
val email = "bob@mail.com"

db.exec(rsql"""INSERT INTO "user_profile" ("username", "email") VALUES ($username, $email)""")
            	

Data types and operators

Data types

Postgres Scala
varchar / text String
bool Boolean
int2 Short
int4 Int
int8 Long
float4 Float
float8 Double
numeric BigDecimal
time java.sql.Time
date java.sql.Date
timestamp java.sql.Timestamp
uuid java.util.UUID
jsonb kuzminki.api.Jsonb(value: String)

Operators

Operator Column type Argumeent Explainattion
=== Any T
!== Any T
in Any T Seq[T], Subquery[T]
notIn Any T Seq[T], Subquery[T]
> Numbers and time T
< Numbers and time T
>= Numbers and time T
<= Numbers and time T
~ String String Match
~* String String Match case insensitive
!~ String String Not match
!~* String String Not match case insensitive
like String String
starts String String
ends String String
similarTo String String
? Array T Exists
!? Array T Not exists
@> Array Seq[T] Contains
!@> Array Seq[T] Not contains
<@ Array Seq[T] Contained by
!<@ Array Seq[T] Not contained by
&& Array Seq[T] Overlaps
!&& Array Seq[T] Not overlaps
? Jsonb String Exists
!? Jsonb String Not exists
?| Jsonb Seq[String] Exists any
?& Jsonb Seq[String] Exists all
@> Jsonb Jsonb Contains
!@> Jsonb Jsonb Not contains
<@ Jsonb Jsonb Contained by
!<@ Jsonb Jsonb Not contained by

Update operators

Operator Column type Argumeent Explainattion
==> Any T
+= Numbers T Increase
-= Numbers T Decrease
setNow Timestamp, Date, Tiime
+= Timestamp, Date, Tiime PGInterval Increase
-= Timestamp, Date, Tiime PGInterval Decrease
+= Array T, Seq[T] Append
:= Array T, Seq[T] Prepend
-= Array T Remove
add Array T, Seq[T] Add unique
addAsc Array T, Seq[T] Add unique and sort asc
addDesc Array T, Seq[T] Add unique and sort desc
+= Jsonb Jsonb Extend
-= Jsonb String, Int Remove
#-= Jsonb Seq[String] Remove path

Modifiers

Operator Column type Argumeent Explainattion
default Any T If null, return default
concat String String, ...
concatWs String (String, Any, ...)
substr String Int, (Int, Int)
replace String (String, String)
trim String
upper String
lower String
initcap String
asShort String
asInt String
asLong String
asFloat String
asDouble String
asBigDecimal String
round Numbers Int Returns BigDecimal
roundStr Numbers Int Returns String
age Timestamp Returns PgInterval
epochSecs Timestamp
epochMillis Timestamp
century Timestamp, Date
decade Timestamp, Date
year Timestamp, Date
quarter Timestamp, Date
month Timestamp, Date
week Timestamp, Date
day Timestamp, Date
dow Timestamp, Date
doy Timestamp, Date
isoDow Timestamp, Date
isoYear Timestamp, Date
hour Timestamp, Time
minute Timestamp, Time
second Timestamp, Time
microseconds Timestamp, Time
milliseconds Timestamp, Time
asDate Timestamp
asTime Timestamp
asTimestamp Timestamp
asString Timestamp, Date, Time
format Timestamp, Date, Time String
unnest Array
length Array
trim Array Int Get the first x elements
get Array Int Get the element at a given 1 based index
pos Array T Index of the given element
first Array
last Array
join Array String
|| Array TypeCol[T]
-> Jsonb String, Int
->> Jsonb String, Int
#> Jsonb Seq[String]
#>> Jsonb Seq[String]
|| Array TypeCol[Jsonb]
- Jsonb String, Int, Seq[String]
#- Jsonb Seq[String]