From 58a97d2b47b5fe642870445bce8dfa20d70b417f Mon Sep 17 00:00:00 2001 From: Gaurav Mehta Date: Mon, 3 Feb 2020 19:45:12 +1100 Subject: [PATCH 1/3] Added a minimal sample of using kine with RKE --- samples/minimal.md | 113 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) create mode 100644 samples/minimal.md diff --git a/samples/minimal.md b/samples/minimal.md new file mode 100644 index 00000000..73975202 --- /dev/null +++ b/samples/minimal.md @@ -0,0 +1,113 @@ +## Minimal example of using kine +The following example uses kine with a mysql database for persistence. + +We can run mysql on a host: + +``` +docker run --name kine-mysql -p 3306:3306 -e MYSQL_DATABASE=kine -e MYSQL_ROOT_PASSWORD=$PASSWORD -d mysql:latest +``` + +Run kine on the same host as mysql database: +``` +kine --endpoint "mysql://root:$PASSWORD@tcp(localhost:3306)/kine" +``` + + +Use the following RKE cluster.yml sample to boot up the cluster. + +RKE supports using an external etcd endpoint. + +``` +nodes: + - address: 1.1.1.1 + user: ubuntu + role: + - controlplane + - worker + - address: 2.2.2.2 + user: ubuntu + role: + - controlplane + - worker +cluster_name: "kine-demo" +network: + plugin: canal +ignore_docker_version: true +services: + etcd: + path: / + external_urls: + - http://3.3.3.3:2379 + ca_cert: |- + -----BEGIN CERTIFICATE----- + MIIDVTCCAj2gAwIBAgIUZV9P6JhHOgjT5cRHdsX0rUp6q2AwDQYJKoZIhvcNAQEL + BQAwOjELMAkGA1UEBhMCQVUxDDAKBgNVBAgMA1ZJQzEOMAwGA1UECgwFcmFuY2gx + DTALBgNVBAMMBG15Q0EwHhcNMjAwMjAyMjI1NzAzWhcNMjUwMTMxMjI1NzAzWjA6 + MQswCQYDVQQGEwJBVTEMMAoGA1UECAwDVklDMQ4wDAYDVQQKDAVyYW5jaDENMAsG + A1UEAwwEbXlDQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAOWU/5KI + 3Es7fZeP7zpgl1t/EeIYtZHNQKDozTjuaNgdvaDYp5Ly1ARlrBDBJTJcUkVjUpbN + 33WeAsOew3YKWjNNPw1nC9x8r1BgdOwgJhPOFQ7paHHYVJMLPJ3B1qz31ZSkqjyB + vCT7/qMijY8YJkM70FYIe/MokM9iLfFKP1RLlB/kRHgVOF3LT/uvknPX1Rg0tOmI + 8SrjChAkMwSC0LruBJnzAuAs1z03qLWl0R3uEGnRF3o9P6QzJnEsnXMs692IWwt2 + 4ilxuC5yq9Km/gCuJMzhGsqkRpdRSutXgXnMWUSCk2LQQJK9qLTpEbxWY50LAF+Q + Yctr7kmVC1H69skCAwEAAaNTMFEwHQYDVR0OBBYEFFf6jIEgiCAaA3zNllILqVg+ + fACnMB8GA1UdIwQYMBaAFFf6jIEgiCAaA3zNllILqVg+fACnMA8GA1UdEwEB/wQF + MAMBAf8wDQYJKoZIhvcNAQELBQADggEBAJ3508ltZz0CkoEvMqq4Ux5dn+rE+J/f + eq5kjjE/QB+HZnU1089OLwLilPPF7yMOGPhLt7sZw1/Ymqm7l6yrl2grL90brz2i + DvDHw3N9fxRxnvjUeg61JOk6vOk/It6odJ2Lbht56L1PsgENEe2ih2kYy/i3NC6z + yFcNtg3xR+iSO/2Gp1/UICDWVx7n4VLrbC34AKwuHF+WZDOLymk0MwtL6CV02U0W + KOShfFMtkve95ZZtEojypGr+EhIHePLmaleTwxjgG15WXQLxKcPzDbg9I4je3FII + U/nen9pF8UJ3+H6Mxotw4vOcIuafZrFLM7pXsyv00UUSzbvWiH9+X4I= + -----END CERTIFICATE----- + cert: |- + -----BEGIN CERTIFICATE----- + MIIDXTCCAkWgAwIBAgIUMakvO9NvNAeXHOaxOZdg+7e4evMwDQYJKoZIhvcNAQEL + BQAwOjELMAkGA1UEBhMCQVUxDDAKBgNVBAgMA1ZJQzEOMAwGA1UECgwFcmFuY2gx + DTALBgNVBAMMBG15Q0EwHhcNMjAwMjAyMjMzOTM1WhcNMjUwMTMxMjMzOTM1WjA/ + MQswCQYDVQQGEwJBVTEMMAoGA1UECAwDVklDMQ4wDAYDVQQKDAVyYW5jaDESMBAG + A1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA + oPOFYtnP4hXnDgGl0zN+Q8pwcXr5+fyS4/YVIciAgwG8z8gVSPgf9HakiPhJ2Z7Q + eVbH5xikS0kzzPKS+fdu+nAJtbW6vTvP2XizkJx1xGcuC0jhmNYT3A8vCskzSuno + YplVBGUvNyBBPaoW1PiUWzqLtO39wnleizg8ilcnoTje3owO/OQpxSCFVAezxJEv + 8gAQu0+wjVYPs2PAbmKKrXKo4aqGy6hVL/aIhEOXni4cMmxOl/UDCmV1SsmHKICk + J6/v1kRcRyUdC2Gj7RxU+i0ipCUG9ZqQbnvNEhgQqWVzYdf+jIHu+wEVJEGNejU4 + CfKaC/za8jM98g/ui6bF3wIDAQABo1YwVDAfBgNVHSMEGDAWgBRX+oyBIIggGgN8 + zZZSC6lYPnwApzAJBgNVHRMEAjAAMAsGA1UdDwQEAwIE8DAZBgNVHREEEjAQggNr + M3OCA2s4c4cEfwAAATANBgkqhkiG9w0BAQsFAAOCAQEAmQUrvASk70Kf1T23Dhng + NFuq8mNrsm/IOPgxrVYwzG1nzhop9wa3060YHVQ39B2QSylVnkYwyfDWY0064RXx + OrMOig6hsos+KMZKNhwHnVseZj33quz+FJZWIfByeAFenJ8z4j2Zx8QbxHznytOc + lO+fKw+lbRId3ZyW4E/g4kChK6VXVCZ6+9VXEj/pmp77p8q8NYk5setS4HkQpJxN + Y7x/tDvNyo8efz1VP3yjeNZ4WGANkYUx8O+rOzq31Qwopf6OBLlllumTXeDGU/n/ + fi2eFpSTPmAZZK0nlwu2T1YHbA4hxrX7HvVeqOVxypRmVZ1nLz2poUMUtphP6a+4 + ug== + -----END CERTIFICATE----- + key: |- + -----BEGIN RSA PRIVATE KEY----- + MIIEpAIBAAKCAQEAoPOFYtnP4hXnDgGl0zN+Q8pwcXr5+fyS4/YVIciAgwG8z8gV + SPgf9HakiPhJ2Z7QeVbH5xikS0kzzPKS+fdu+nAJtbW6vTvP2XizkJx1xGcuC0jh + mNYT3A8vCskzSunoYplVBGUvNyBBPaoW1PiUWzqLtO39wnleizg8ilcnoTje3owO + /OQpxSCFVAezxJEv8gAQu0+wjVYPs2PAbmKKrXKo4aqGy6hVL/aIhEOXni4cMmxO + l/UDCmV1SsmHKICkJ6/v1kRcRyUdC2Gj7RxU+i0ipCUG9ZqQbnvNEhgQqWVzYdf+ + jIHu+wEVJEGNejU4CfKaC/za8jM98g/ui6bF3wIDAQABAoIBAGkfxU5UP2BGt/xA + /UMeDelPLvQfw2gRHQwBrbm8EJwApYb9A1H+pjhwyXSg3vNhtH6cPMLnKF/39vp0 + saTMhNLUHLNveAGjMFW1bWsVliHq1nsOZjMCGESSMkKUOYlDj8HerlXJlPYnfhU9 + o94EYjnX2moZS7YaubKqz3f4Bu1Yg4kHQWRRoD07pU0CXRjBZpg1ALhW3aAntuZo + AL2TXa6dharA6wVi4tC64AbPvVoXQ9Pc96sEHs7yNxjwtfPnk2dZtN4XnKF6lcA7 + VnDVnVkcXEEsZ+YQYjfooOiIPHpG0Q0ymqaAydNoQ0xwS9V/0oFvWsPQ++ZwgOek + mSLQctECgYEA0SLO9+tPaBF5t2fbX2TjlIU1/dnfLc7w2H7VHTV3QqYq3zYWRQZF + bTEnBbWM81CHZPYTEmY+JToaUavHs2il1WfpfZQmYyhOUiXg9Jk8w3aYo5b0aVH2 + E/doNN6txuQ3c3xoJF3HzsOMLuinz3cYrnvGW0nQkUj0F5di5aMSRncCgYEAxQSS + XjwIBgl8aR8WjbWJ7kI9Xk5Xx/aTP4FzUTmHSLXY4zxWFv7zRV3vHSzEEsKdxnYr + mxtIRim1n6btNm6KCBqGOCDBHq2bWxuoFMODVZiMwVTYeqZcrOHlqh+vLcKNbJsO + 4jImvNwN8F2h9T0DBiXamL9pVWFt2bAZlY1TDdkCgYEAq0K6AkPsTuigqBSgjMnt + pB5CTJMyNC0XBfM3SigSdb3ltcxxCC1OhVCPCBnYRxhXB9KLY7HeilW+X8swSjcU + NmJVzsSXevPyz0q9oRArtlVUQgLIO8cmoMslxsXjwM/6qNPj5IP3r9Zq4a8cXMTG + rXwmv3L/HTqEyRzrm+mieZkCgYEAxC6gPTvT1YcenmK5h5Ss61aEW2LxoAmVaJhT + px786ldByEytgSqQPZOi5c9M00195ECJfWL2Xf9sfrSu4xPBWP5ohn1/MDg5ScjJ + XxusrNBB4MXG5qLAB9rNYdE5E/z17J6efHjqAAezzZS/ED+XwkhxWsbHcaCZzTnA + 0B2xBUkCgYB0+bFGxDetRYSNEPBUiMxCUvzzwAR0bqXpFronTu8pTsQKGlnddJh3 + tNVseLswaQ9A8j1foxvTGv5/GYrPq7rpcKjPDkJs4CgLFTIBM0/t2v8wNwfvzJa5 + 017peVJ5tbqdqphS8lrfdM0M509ao7ehqTMnu8STYWoLib54ZZG8Gw== + -----END RSA PRIVATE KEY----- +``` + From c7038758bc448cadb50902e6a36e7fbf4ba844ba Mon Sep 17 00:00:00 2001 From: Gaurav Mehta Date: Thu, 9 Jul 2020 19:56:23 +1000 Subject: [PATCH 2/3] Initial commit for sqlserver driver --- go.mod | 19 +- go.sum | 7 + pkg/drivers/generic/generic.go | 63 ++++--- pkg/drivers/sqlserver/sqlserver.go | 277 +++++++++++++++++++++++++++++ pkg/endpoint/endpoint.go | 4 + samples/minimal.md | 113 ------------ 6 files changed, 335 insertions(+), 148 deletions(-) create mode 100644 pkg/drivers/sqlserver/sqlserver.go delete mode 100644 samples/minimal.md diff --git a/go.mod b/go.mod index d9a5ae9c..7cd4f805 100644 --- a/go.mod +++ b/go.mod @@ -3,15 +3,12 @@ module github.com/rancher/kine go 1.12 require ( - github.com/Rican7/retry v0.1.0 - github.com/canonical/go-dqlite v1.5.1 - github.com/go-sql-driver/mysql v1.4.1 - github.com/lib/pq v1.1.1 - github.com/mattn/go-sqlite3 v1.10.0 - github.com/pkg/errors v0.8.1 - github.com/rancher/wrangler v0.4.0 - github.com/sirupsen/logrus v1.4.2 - github.com/urfave/cli v1.21.0 - go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738 - google.golang.org/grpc v1.23.1 + github.com/Rican7/retry v0.1.0 // indirect + github.com/canonical/go-dqlite v1.5.1 // indirect + github.com/denisenkom/go-mssqldb v0.0.0-20200620013148-b91950f658ec // indirect + github.com/go-sql-driver/mysql v1.4.1 // indirect + github.com/lib/pq v1.1.1 // indirect + github.com/mattn/go-sqlite3 v1.10.0 // indirect + github.com/rancher/wrangler v0.4.0 // indirect + github.com/urfave/cli v1.21.0 // indirect ) diff --git a/go.sum b/go.sum index 70effee0..b4834d42 100644 --- a/go.sum +++ b/go.sum @@ -54,6 +54,8 @@ github.com/davecgh/go-spew v0.0.0-20151105211317-5215b55f46b2/go.mod h1:J7Y8YcW2 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/denisenkom/go-mssqldb v0.0.0-20200620013148-b91950f658ec h1:NfhRXXFDPxcF5Cwo06DzeIaE7uuJtAUhsDwH3LNsjos= +github.com/denisenkom/go-mssqldb v0.0.0-20200620013148-b91950f658ec/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU= github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/docker/docker v0.7.3-0.20190327010347-be7ac8be2ae0/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= @@ -128,6 +130,8 @@ github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d h1:3PaI8p3seN09VjbTYC/QWlUZdZ1qS1zGjy7LH2Wt07I= github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= +github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY= +github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903 h1:LbsanbbD6LieFkXbj9YNNBupiGHJgFeLpO0j0Fza1h8= @@ -251,6 +255,8 @@ github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8 github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNGfs= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/rancher/kine v0.4.0 h1:1IhWy3TzjExG8xnj46eyUEWdzqNAD1WrgL4eEBKm6Uc= +github.com/rancher/kine v0.4.0/go.mod h1:IImtCJ68AIkE+VY/kUI0NkyJL5q5WzO8QvMsSXqbrpA= github.com/rancher/wrangler v0.4.0 h1:iLvuJcZkd38E3RGG74dFMMNEju0PeTzfT1PQiv5okVU= github.com/rancher/wrangler v0.4.0/go.mod h1:1cR91WLhZgkZ+U4fV9nVuXqKurWbgXcIReU4wnQvTN8= github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uYEyJGbgTkfkS4+E/PavXkNJcbFIpEtjt2B0KDQ5+9M= @@ -316,6 +322,7 @@ golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190320223903-b7391e95e576/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190617133340-57b3e21c3d56/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586 h1:7KByu05hhLed2MO29w7p1XfZvZ13m8mub3shuVftRs0= diff --git a/pkg/drivers/generic/generic.go b/pkg/drivers/generic/generic.go index d5da195c..08e4ce2e 100644 --- a/pkg/drivers/generic/generic.go +++ b/pkg/drivers/generic/generic.go @@ -66,7 +66,7 @@ type TranslateErr func(error) error type Generic struct { sync.Mutex - + DriverName string LockWrites bool LastInsertID bool DB *sql.DB @@ -82,11 +82,13 @@ type Generic struct { InsertSQL string FillSQL string InsertLastInsertIDSQL string + RevSQL string + CompactRevSQL string Retry ErrRetry TranslateErr TranslateErr } -func q(sql, param string, numbered bool) string { +func QueryBuilder(sql, param string, numbered bool) string { if param == "?" && !numbered { return sql } @@ -128,7 +130,7 @@ func (d *Generic) Migrate(ctx context.Context) { } } -func openAndTest(driverName, dataSourceName string) (*sql.DB, error) { +func OpenAndTest(driverName, dataSourceName string) (*sql.DB, error) { db, err := sql.Open(driverName, dataSourceName) if err != nil { return nil, err @@ -151,7 +153,7 @@ func Open(ctx context.Context, driverName, dataSourceName string, paramCharacter ) for i := 0; i < 300; i++ { - db, err = openAndTest(driverName, dataSourceName) + db, err = OpenAndTest(driverName, dataSourceName) if err == nil { break } @@ -165,25 +167,25 @@ func Open(ctx context.Context, driverName, dataSourceName string, paramCharacter } return &Generic{ - DB: db, - - GetRevisionSQL: q(fmt.Sprintf(` + DB: db, + DriverName: driverName, + GetRevisionSQL: QueryBuilder(fmt.Sprintf(` SELECT 0, 0, %s FROM kine kv WHERE kv.id = ?`, columns), paramCharacter, numbered), - GetCurrentSQL: q(fmt.Sprintf(listSQL, ""), paramCharacter, numbered), - ListRevisionStartSQL: q(fmt.Sprintf(listSQL, "AND mkv.id <= ?"), paramCharacter, numbered), - GetRevisionAfterSQL: q(fmt.Sprintf(listSQL, idOfKey), paramCharacter, numbered), + GetCurrentSQL: QueryBuilder(fmt.Sprintf(listSQL, ""), paramCharacter, numbered), + ListRevisionStartSQL: QueryBuilder(fmt.Sprintf(listSQL, "AND mkv.id <= ?"), paramCharacter, numbered), + GetRevisionAfterSQL: QueryBuilder(fmt.Sprintf(listSQL, idOfKey), paramCharacter, numbered), - CountSQL: q(fmt.Sprintf(` + CountSQL: QueryBuilder(fmt.Sprintf(` SELECT (%s), COUNT(c.theid) FROM ( %s ) c`, revSQL, fmt.Sprintf(listSQL, "")), paramCharacter, numbered), - AfterSQL: q(fmt.Sprintf(` + AfterSQL: QueryBuilder(fmt.Sprintf(` SELECT (%s), (%s), %s FROM kine kv WHERE @@ -191,23 +193,25 @@ func Open(ctx context.Context, driverName, dataSourceName string, paramCharacter kv.id > ? ORDER BY kv.id ASC`, revSQL, compactRevSQL, columns), paramCharacter, numbered), - DeleteSQL: q(` + DeleteSQL: QueryBuilder(` DELETE FROM kine WHERE id = ?`, paramCharacter, numbered), - UpdateCompactSQL: q(` + UpdateCompactSQL: QueryBuilder(` UPDATE kine SET prev_revision = ? WHERE name = 'compact_rev_key'`, paramCharacter, numbered), - InsertLastInsertIDSQL: q(`INSERT INTO kine(name, created, deleted, create_revision, prev_revision, lease, value, old_value) + InsertLastInsertIDSQL: QueryBuilder(`INSERT INTO kine(name, created, deleted, create_revision, prev_revision, lease, value, old_value) values(?, ?, ?, ?, ?, ?, ?, ?)`, paramCharacter, numbered), - InsertSQL: q(`INSERT INTO kine(name, created, deleted, create_revision, prev_revision, lease, value, old_value) - values(?, ?, ?, ?, ?, ?, ?, ?) RETURNING id`, paramCharacter, numbered), + InsertSQL: QueryBuilder(`INSERT INTO kine(name, created, deleted, create_revision, prev_revision, lease, value, old_value) + values(?, ?, ?, ?, ?, ?, ?, ?)`, paramCharacter, numbered), - FillSQL: q(`INSERT INTO kine(id, name, created, deleted, create_revision, prev_revision, lease, value, old_value) + FillSQL: QueryBuilder(`INSERT INTO kine(id, name, created, deleted, create_revision, prev_revision, lease, value, old_value) values(?, ?, ?, ?, ?, ?, ?, ?, ?)`, paramCharacter, numbered), + RevSQL: revSQL, + CompactRevSQL: compactRevSQL, }, err } @@ -246,7 +250,7 @@ func (d *Generic) execute(ctx context.Context, sql string, args ...interface{}) func (d *Generic) GetCompactRevision(ctx context.Context) (int64, error) { var id int64 - row := d.queryRow(ctx, compactRevSQL) + row := d.queryRow(ctx, d.CompactRevSQL) err := row.Scan(&id) if err == sql.ErrNoRows { return 0, nil @@ -271,7 +275,7 @@ func (d *Generic) DeleteRevision(ctx context.Context, revision int64) error { func (d *Generic) ListCurrent(ctx context.Context, prefix string, limit int64, includeDeleted bool) (*sql.Rows, error) { sql := d.GetCurrentSQL if limit > 0 { - sql = fmt.Sprintf("%s LIMIT %d", sql, limit) + sql = d.applyLimit(sql, limit) } return d.query(ctx, sql, prefix, includeDeleted) } @@ -280,14 +284,14 @@ func (d *Generic) List(ctx context.Context, prefix, startKey string, limit, revi if startKey == "" { sql := d.ListRevisionStartSQL if limit > 0 { - sql = fmt.Sprintf("%s LIMIT %d", sql, limit) + sql = d.applyLimit(sql, limit) } return d.query(ctx, sql, prefix, revision, includeDeleted) } sql := d.GetRevisionAfterSQL if limit > 0 { - sql = fmt.Sprintf("%s LIMIT %d", sql, limit) + sql = d.applyLimit(sql, limit) } return d.query(ctx, sql, prefix, revision, startKey, revision, includeDeleted) } @@ -305,7 +309,7 @@ func (d *Generic) Count(ctx context.Context, prefix string) (int64, int64, error func (d *Generic) CurrentRevision(ctx context.Context) (int64, error) { var id int64 - row := d.queryRow(ctx, revSQL) + row := d.queryRow(ctx, d.RevSQL) err := row.Scan(&id) if err == sql.ErrNoRows { return 0, nil @@ -316,7 +320,7 @@ func (d *Generic) CurrentRevision(ctx context.Context) (int64, error) { func (d *Generic) After(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error) { sql := d.AfterSQL if limit > 0 { - sql = fmt.Sprintf("%s LIMIT %d", sql, limit) + sql = d.applyLimit(sql, limit) } return d.query(ctx, sql, prefix, rev) } @@ -360,3 +364,14 @@ func (d *Generic) Insert(ctx context.Context, key string, create, delete bool, c err = row.Scan(&id) return id, err } + +func (d Generic) applyLimit(sql string, limit int64) string { + if d.DriverName != "sqlserver" { + sql = fmt.Sprintf("%s LIMIT %d", sql, limit) + } else { + limitRewrite := fmt.Sprintf("SELECT TOP %d ", limit) + strings.Replace(sql, "SELECT", limitRewrite, 1) + } + + return sql +} diff --git a/pkg/drivers/sqlserver/sqlserver.go b/pkg/drivers/sqlserver/sqlserver.go new file mode 100644 index 00000000..1d1859f0 --- /dev/null +++ b/pkg/drivers/sqlserver/sqlserver.go @@ -0,0 +1,277 @@ +package sqlserver + +import ( + "context" + "database/sql" + "fmt" + "net/url" + "time" + + "github.com/sirupsen/logrus" + + mssql "github.com/denisenkom/go-mssqldb" + "github.com/rancher/kine/pkg/drivers/generic" + "github.com/rancher/kine/pkg/logstructured" + "github.com/rancher/kine/pkg/logstructured/sqllog" + "github.com/rancher/kine/pkg/server" + "github.com/rancher/kine/pkg/tls" +) + +const ( + defaultDSN = "sqlserver://sa:" +) + +var ( + schema = []string{ + `if not exists (SELECT * FROM INFORMATION_SCHEMA.TABLES + WHERE TABLE_NAME = N'kine') + begin + create table kine ( + id int primary key identity (1, 1), + name varchar(630), + created int, + deleted int, + create_revision int, + prev_revision int, + lease int, + value varbinary(max), + old_value varbinary(max) ) + end + `, + `if not exists ( select * from sys.indexes + where name = 'kine_name_index' and + object_id = OBJECT_ID('kine')) + begin + create nonclustered index kine_name_index on kine (name) + end + `, + `if not exists ( + select * + from sys.indexes + where name = 'kine_name_prev_revision_uindex' and + object_id = OBJECT_ID('kine') + ) begin + create unique index kine_name_prev_revision_uindex on kine (name, prev_revision) + end + `, + } + createDB = "create database " + columns = "kv.id as theid, kv.name, kv.created, kv.deleted, kv.create_revision, kv.prev_revision, kv.lease, kv.value, kv.old_value" + revSQL = ` + SELECT TOP 1 rkv.id + FROM kine rkv + ORDER BY rkv.id + DESC` + compactRevSQL = ` + SELECT TOP 1 crkv.prev_revision + FROM kine crkv + WHERE crkv.name = 'compact_rev_key' + ORDER BY crkv.id DESC` + + idOfKey = ` + AND mkv.id <= ? AND mkv.id > ( + SELECT TOP 1 ikv.id + FROM kine ikv + WHERE + ikv.name = ? AND + ikv.id <= ? + ORDER BY ikv.id DESC )` + + listSQL = fmt.Sprintf(`SELECT TOP 100 PERCENT (%s)[a], (%s)[b], %s + FROM kine kv + JOIN ( + SELECT MAX(mkv.id) as id + FROM kine mkv + WHERE + mkv.name LIKE ? + %%s + GROUP BY mkv.name) maxkv + ON maxkv.id = kv.id + WHERE + ( kv.deleted = 0 OR 'true' = ? ) + ORDER BY kv.id ASC + `, revSQL, compactRevSQL, columns) +) + +func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config) (server.Backend, error) { + parsedDSN, err := prepareDSN(dataSourceName, tlsInfo) + if err != nil { + return nil, err + } + + if err := createDBIfNotExist(parsedDSN); err != nil { + return nil, err + } + dialect, err := setupGenericDriver(ctx, "sqlserver", parsedDSN, "@p", true) + if err != nil { + return nil, err + } + dialect.LastInsertID = false + dialect.TranslateErr = func(err error) error { + // Need to verify msqql error code for unique constraint violation + if err, ok := err.(mssql.Error); ok && err.Number == 2627 { + return server.ErrKeyExists + } + return err + } + + if err := setup(dialect.DB); err != nil { + return nil, err + } + + dialect.Migrate(context.Background()) + return logstructured.New(sqllog.New(dialect)), nil + +} + +func setupGenericDriver(ctx context.Context, driverName, dataSourceName string, paramCharacter string, numbered bool) (*generic.Generic, error) { + var ( + db *sql.DB + err error + ) + + for i := 0; i < 300; i++ { + db, err = generic.OpenAndTest(driverName, dataSourceName) + if err == nil { + break + } + + logrus.Errorf("failed to ping connection: %v", err) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(time.Second): + } + } + return &generic.Generic{ + DB: db, + DriverName: driverName, + GetRevisionSQL: generic.QueryBuilder(fmt.Sprintf(` + SELECT + 0, 0, %s + FROM kine kv + WHERE kv.id = ?`, columns), paramCharacter, numbered), + + GetCurrentSQL: generic.QueryBuilder(fmt.Sprintf(listSQL, ""), paramCharacter, numbered), + ListRevisionStartSQL: generic.QueryBuilder(fmt.Sprintf(listSQL, "AND mkv.id <= ?"), paramCharacter, numbered), + GetRevisionAfterSQL: generic.QueryBuilder(fmt.Sprintf(listSQL, idOfKey), paramCharacter, numbered), + + CountSQL: generic.QueryBuilder(fmt.Sprintf(` + SELECT (%s), COUNT(c.theid) + FROM ( + %s + ) c`, revSQL, fmt.Sprintf(listSQL, "")), paramCharacter, numbered), + + AfterSQL: generic.QueryBuilder(fmt.Sprintf(` + SELECT (%s), (%s), %s + FROM kine kv + WHERE + kv.name LIKE ? AND + kv.id > ? + ORDER BY kv.id ASC`, revSQL, compactRevSQL, columns), paramCharacter, numbered), + + DeleteSQL: generic.QueryBuilder(` + DELETE FROM kine + WHERE id = ?`, paramCharacter, numbered), + + UpdateCompactSQL: generic.QueryBuilder(` + UPDATE kine + SET prev_revision = ? + WHERE name = 'compact_rev_key'`, paramCharacter, numbered), + + InsertLastInsertIDSQL: generic.QueryBuilder(`INSERT INTO kine(name, created, deleted, create_revision, prev_revision, lease, value, old_value) + values(?, ?, ?, ?, ?, ?, ?, ?);select SCOPE_IDENTITY()`, paramCharacter, numbered), + + InsertSQL: generic.QueryBuilder(`INSERT INTO kine(name, created, deleted, create_revision, prev_revision, lease, value, old_value) + values(?, ?, ?, ?, ?, ?, ?, ?); select SCOPE_IDENTITY()`, paramCharacter, numbered), + + FillSQL: generic.QueryBuilder(`INSERT INTO kine(id, name, created, deleted, create_revision, prev_revision, lease, value, old_value) + values(?, ?, ?, ?, ?, ?, ?, ?, ?);select SCOPE_IDENTITY()`, paramCharacter, numbered), + RevSQL: revSQL, + }, err + +} + +func setup(db *sql.DB) error { + for _, stmt := range schema { + _, err := db.Exec(stmt) + if err != nil { + return err + } + } + + return nil +} + +func generateConnector(dataSourceName string) (*mssql.Connector, error) { + conn, err := mssql.NewConnector(dataSourceName) + if err != nil { + return nil, err + } + + return conn, nil +} + +func createDBIfNotExist(dataSourceName string) error { + u, err := url.Parse(dataSourceName) + if err != nil { + return err + } + + dbName := u.Query().Get("database") + db, err := sql.Open("sqlserver", dataSourceName) + if err != nil { + return err + } + defer db.Close() + + err = db.Ping() + + if _, ok := err.(mssql.Error); !ok { + return err + } + + if err := err.(mssql.Error); err.Number != 1801 { // 1801 = database already exists + db, err := sql.Open("sqlserver", u.String()) + if err != nil { + return err + } + defer db.Close() + _, err = db.Exec(createDB + dbName + ":") + if err != nil { + return err + } + } + return nil +} + +func prepareDSN(dataSourceName string, tlsInfo tls.Config) (string, error) { + if len(dataSourceName) == 0 { + return "", fmt.Errorf("invalid dsn") + } else { + dataSourceName = "sqlserver://" + dataSourceName + } + + u, err := url.Parse(dataSourceName) + if err != nil { + return "", err + } + + queryMap := u.Query() + params := url.Values{} + + if _, ok := queryMap["certificate"]; tlsInfo.CertFile != "" && !ok { + params.Add("certificate", tlsInfo.CAFile) + } + + if _, ok := queryMap["database"]; !ok { + params.Add("database", "kubernetes") + } + + for k, v := range queryMap { + params.Add(k, v[0]) + } + + u.RawQuery = params.Encode() + return u.String(), nil +} diff --git a/pkg/endpoint/endpoint.go b/pkg/endpoint/endpoint.go index e8fc74f7..b188d7de 100644 --- a/pkg/endpoint/endpoint.go +++ b/pkg/endpoint/endpoint.go @@ -12,6 +12,7 @@ import ( "github.com/rancher/kine/pkg/drivers/mysql" "github.com/rancher/kine/pkg/drivers/pgsql" "github.com/rancher/kine/pkg/drivers/sqlite" + "github.com/rancher/kine/pkg/drivers/sqlserver" "github.com/rancher/kine/pkg/server" "github.com/rancher/kine/pkg/tls" "github.com/sirupsen/logrus" @@ -25,6 +26,7 @@ const ( ETCDBackend = "etcd3" MySQLBackend = "mysql" PostgresBackend = "postgres" + SQLServer = "sqlserver" ) type Config struct { @@ -131,6 +133,8 @@ func getKineStorageBackend(ctx context.Context, driver, dsn string, cfg Config) backend, err = pgsql.New(ctx, dsn, cfg.Config) case MySQLBackend: backend, err = mysql.New(ctx, dsn, cfg.Config) + case SQLServer: + backend, err = sqlserver.New(ctx, dsn, cfg.Config) default: return false, nil, fmt.Errorf("storage backend is not defined") } diff --git a/samples/minimal.md b/samples/minimal.md deleted file mode 100644 index 73975202..00000000 --- a/samples/minimal.md +++ /dev/null @@ -1,113 +0,0 @@ -## Minimal example of using kine -The following example uses kine with a mysql database for persistence. - -We can run mysql on a host: - -``` -docker run --name kine-mysql -p 3306:3306 -e MYSQL_DATABASE=kine -e MYSQL_ROOT_PASSWORD=$PASSWORD -d mysql:latest -``` - -Run kine on the same host as mysql database: -``` -kine --endpoint "mysql://root:$PASSWORD@tcp(localhost:3306)/kine" -``` - - -Use the following RKE cluster.yml sample to boot up the cluster. - -RKE supports using an external etcd endpoint. - -``` -nodes: - - address: 1.1.1.1 - user: ubuntu - role: - - controlplane - - worker - - address: 2.2.2.2 - user: ubuntu - role: - - controlplane - - worker -cluster_name: "kine-demo" -network: - plugin: canal -ignore_docker_version: true -services: - etcd: - path: / - external_urls: - - http://3.3.3.3:2379 - ca_cert: |- - -----BEGIN CERTIFICATE----- - MIIDVTCCAj2gAwIBAgIUZV9P6JhHOgjT5cRHdsX0rUp6q2AwDQYJKoZIhvcNAQEL - BQAwOjELMAkGA1UEBhMCQVUxDDAKBgNVBAgMA1ZJQzEOMAwGA1UECgwFcmFuY2gx - DTALBgNVBAMMBG15Q0EwHhcNMjAwMjAyMjI1NzAzWhcNMjUwMTMxMjI1NzAzWjA6 - MQswCQYDVQQGEwJBVTEMMAoGA1UECAwDVklDMQ4wDAYDVQQKDAVyYW5jaDENMAsG - A1UEAwwEbXlDQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAOWU/5KI - 3Es7fZeP7zpgl1t/EeIYtZHNQKDozTjuaNgdvaDYp5Ly1ARlrBDBJTJcUkVjUpbN - 33WeAsOew3YKWjNNPw1nC9x8r1BgdOwgJhPOFQ7paHHYVJMLPJ3B1qz31ZSkqjyB - vCT7/qMijY8YJkM70FYIe/MokM9iLfFKP1RLlB/kRHgVOF3LT/uvknPX1Rg0tOmI - 8SrjChAkMwSC0LruBJnzAuAs1z03qLWl0R3uEGnRF3o9P6QzJnEsnXMs692IWwt2 - 4ilxuC5yq9Km/gCuJMzhGsqkRpdRSutXgXnMWUSCk2LQQJK9qLTpEbxWY50LAF+Q - Yctr7kmVC1H69skCAwEAAaNTMFEwHQYDVR0OBBYEFFf6jIEgiCAaA3zNllILqVg+ - fACnMB8GA1UdIwQYMBaAFFf6jIEgiCAaA3zNllILqVg+fACnMA8GA1UdEwEB/wQF - MAMBAf8wDQYJKoZIhvcNAQELBQADggEBAJ3508ltZz0CkoEvMqq4Ux5dn+rE+J/f - eq5kjjE/QB+HZnU1089OLwLilPPF7yMOGPhLt7sZw1/Ymqm7l6yrl2grL90brz2i - DvDHw3N9fxRxnvjUeg61JOk6vOk/It6odJ2Lbht56L1PsgENEe2ih2kYy/i3NC6z - yFcNtg3xR+iSO/2Gp1/UICDWVx7n4VLrbC34AKwuHF+WZDOLymk0MwtL6CV02U0W - KOShfFMtkve95ZZtEojypGr+EhIHePLmaleTwxjgG15WXQLxKcPzDbg9I4je3FII - U/nen9pF8UJ3+H6Mxotw4vOcIuafZrFLM7pXsyv00UUSzbvWiH9+X4I= - -----END CERTIFICATE----- - cert: |- - -----BEGIN CERTIFICATE----- - MIIDXTCCAkWgAwIBAgIUMakvO9NvNAeXHOaxOZdg+7e4evMwDQYJKoZIhvcNAQEL - BQAwOjELMAkGA1UEBhMCQVUxDDAKBgNVBAgMA1ZJQzEOMAwGA1UECgwFcmFuY2gx - DTALBgNVBAMMBG15Q0EwHhcNMjAwMjAyMjMzOTM1WhcNMjUwMTMxMjMzOTM1WjA/ - MQswCQYDVQQGEwJBVTEMMAoGA1UECAwDVklDMQ4wDAYDVQQKDAVyYW5jaDESMBAG - A1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA - oPOFYtnP4hXnDgGl0zN+Q8pwcXr5+fyS4/YVIciAgwG8z8gVSPgf9HakiPhJ2Z7Q - eVbH5xikS0kzzPKS+fdu+nAJtbW6vTvP2XizkJx1xGcuC0jhmNYT3A8vCskzSuno - YplVBGUvNyBBPaoW1PiUWzqLtO39wnleizg8ilcnoTje3owO/OQpxSCFVAezxJEv - 8gAQu0+wjVYPs2PAbmKKrXKo4aqGy6hVL/aIhEOXni4cMmxOl/UDCmV1SsmHKICk - J6/v1kRcRyUdC2Gj7RxU+i0ipCUG9ZqQbnvNEhgQqWVzYdf+jIHu+wEVJEGNejU4 - CfKaC/za8jM98g/ui6bF3wIDAQABo1YwVDAfBgNVHSMEGDAWgBRX+oyBIIggGgN8 - zZZSC6lYPnwApzAJBgNVHRMEAjAAMAsGA1UdDwQEAwIE8DAZBgNVHREEEjAQggNr - M3OCA2s4c4cEfwAAATANBgkqhkiG9w0BAQsFAAOCAQEAmQUrvASk70Kf1T23Dhng - NFuq8mNrsm/IOPgxrVYwzG1nzhop9wa3060YHVQ39B2QSylVnkYwyfDWY0064RXx - OrMOig6hsos+KMZKNhwHnVseZj33quz+FJZWIfByeAFenJ8z4j2Zx8QbxHznytOc - lO+fKw+lbRId3ZyW4E/g4kChK6VXVCZ6+9VXEj/pmp77p8q8NYk5setS4HkQpJxN - Y7x/tDvNyo8efz1VP3yjeNZ4WGANkYUx8O+rOzq31Qwopf6OBLlllumTXeDGU/n/ - fi2eFpSTPmAZZK0nlwu2T1YHbA4hxrX7HvVeqOVxypRmVZ1nLz2poUMUtphP6a+4 - ug== - -----END CERTIFICATE----- - key: |- - -----BEGIN RSA PRIVATE KEY----- - MIIEpAIBAAKCAQEAoPOFYtnP4hXnDgGl0zN+Q8pwcXr5+fyS4/YVIciAgwG8z8gV - SPgf9HakiPhJ2Z7QeVbH5xikS0kzzPKS+fdu+nAJtbW6vTvP2XizkJx1xGcuC0jh - mNYT3A8vCskzSunoYplVBGUvNyBBPaoW1PiUWzqLtO39wnleizg8ilcnoTje3owO - /OQpxSCFVAezxJEv8gAQu0+wjVYPs2PAbmKKrXKo4aqGy6hVL/aIhEOXni4cMmxO - l/UDCmV1SsmHKICkJ6/v1kRcRyUdC2Gj7RxU+i0ipCUG9ZqQbnvNEhgQqWVzYdf+ - jIHu+wEVJEGNejU4CfKaC/za8jM98g/ui6bF3wIDAQABAoIBAGkfxU5UP2BGt/xA - /UMeDelPLvQfw2gRHQwBrbm8EJwApYb9A1H+pjhwyXSg3vNhtH6cPMLnKF/39vp0 - saTMhNLUHLNveAGjMFW1bWsVliHq1nsOZjMCGESSMkKUOYlDj8HerlXJlPYnfhU9 - o94EYjnX2moZS7YaubKqz3f4Bu1Yg4kHQWRRoD07pU0CXRjBZpg1ALhW3aAntuZo - AL2TXa6dharA6wVi4tC64AbPvVoXQ9Pc96sEHs7yNxjwtfPnk2dZtN4XnKF6lcA7 - VnDVnVkcXEEsZ+YQYjfooOiIPHpG0Q0ymqaAydNoQ0xwS9V/0oFvWsPQ++ZwgOek - mSLQctECgYEA0SLO9+tPaBF5t2fbX2TjlIU1/dnfLc7w2H7VHTV3QqYq3zYWRQZF - bTEnBbWM81CHZPYTEmY+JToaUavHs2il1WfpfZQmYyhOUiXg9Jk8w3aYo5b0aVH2 - E/doNN6txuQ3c3xoJF3HzsOMLuinz3cYrnvGW0nQkUj0F5di5aMSRncCgYEAxQSS - XjwIBgl8aR8WjbWJ7kI9Xk5Xx/aTP4FzUTmHSLXY4zxWFv7zRV3vHSzEEsKdxnYr - mxtIRim1n6btNm6KCBqGOCDBHq2bWxuoFMODVZiMwVTYeqZcrOHlqh+vLcKNbJsO - 4jImvNwN8F2h9T0DBiXamL9pVWFt2bAZlY1TDdkCgYEAq0K6AkPsTuigqBSgjMnt - pB5CTJMyNC0XBfM3SigSdb3ltcxxCC1OhVCPCBnYRxhXB9KLY7HeilW+X8swSjcU - NmJVzsSXevPyz0q9oRArtlVUQgLIO8cmoMslxsXjwM/6qNPj5IP3r9Zq4a8cXMTG - rXwmv3L/HTqEyRzrm+mieZkCgYEAxC6gPTvT1YcenmK5h5Ss61aEW2LxoAmVaJhT - px786ldByEytgSqQPZOi5c9M00195ECJfWL2Xf9sfrSu4xPBWP5ohn1/MDg5ScjJ - XxusrNBB4MXG5qLAB9rNYdE5E/z17J6efHjqAAezzZS/ED+XwkhxWsbHcaCZzTnA - 0B2xBUkCgYB0+bFGxDetRYSNEPBUiMxCUvzzwAR0bqXpFronTu8pTsQKGlnddJh3 - tNVseLswaQ9A8j1foxvTGv5/GYrPq7rpcKjPDkJs4CgLFTIBM0/t2v8wNwfvzJa5 - 017peVJ5tbqdqphS8lrfdM0M509ao7ehqTMnu8STYWoLib54ZZG8Gw== - -----END RSA PRIVATE KEY----- -``` - From 31ab3f829fbf9d28c2ed17647237c83bb737d1a7 Mon Sep 17 00:00:00 2001 From: Gaurav Mehta Date: Sat, 11 Jul 2020 09:41:00 +1000 Subject: [PATCH 3/3] Changes to generic driver to allow custom limit function to be passed as part of the driver. Also dropped DriverName from Generic driver type as its not needed anymore for switching between driver types --- go.mod | 5 ++++- pkg/drivers/generic/generic.go | 28 ++++++++++------------------ pkg/drivers/sqlserver/sqlserver.go | 9 +++++++-- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/go.mod b/go.mod index 7cd4f805..9d473fc4 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,15 @@ module github.com/rancher/kine go 1.12 require ( - github.com/Rican7/retry v0.1.0 // indirect + github.com/Rican7/retry v0.1.0 github.com/canonical/go-dqlite v1.5.1 // indirect github.com/denisenkom/go-mssqldb v0.0.0-20200620013148-b91950f658ec // indirect github.com/go-sql-driver/mysql v1.4.1 // indirect github.com/lib/pq v1.1.1 // indirect github.com/mattn/go-sqlite3 v1.10.0 // indirect github.com/rancher/wrangler v0.4.0 // indirect + github.com/sirupsen/logrus v1.4.2 github.com/urfave/cli v1.21.0 // indirect + go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738 + google.golang.org/grpc v1.23.1 ) diff --git a/pkg/drivers/generic/generic.go b/pkg/drivers/generic/generic.go index 08e4ce2e..c2ced418 100644 --- a/pkg/drivers/generic/generic.go +++ b/pkg/drivers/generic/generic.go @@ -66,7 +66,6 @@ type TranslateErr func(error) error type Generic struct { sync.Mutex - DriverName string LockWrites bool LastInsertID bool DB *sql.DB @@ -86,6 +85,7 @@ type Generic struct { CompactRevSQL string Retry ErrRetry TranslateErr TranslateErr + ApplyLimit func(string, int64) string } func QueryBuilder(sql, param string, numbered bool) string { @@ -167,8 +167,7 @@ func Open(ctx context.Context, driverName, dataSourceName string, paramCharacter } return &Generic{ - DB: db, - DriverName: driverName, + DB: db, GetRevisionSQL: QueryBuilder(fmt.Sprintf(` SELECT 0, 0, %s @@ -212,6 +211,10 @@ func Open(ctx context.Context, driverName, dataSourceName string, paramCharacter values(?, ?, ?, ?, ?, ?, ?, ?, ?)`, paramCharacter, numbered), RevSQL: revSQL, CompactRevSQL: compactRevSQL, + ApplyLimit: func(sql string, limit int64) string { + sql = fmt.Sprintf("%s LIMIT %d", sql, limit) + return sql + }, }, err } @@ -275,7 +278,7 @@ func (d *Generic) DeleteRevision(ctx context.Context, revision int64) error { func (d *Generic) ListCurrent(ctx context.Context, prefix string, limit int64, includeDeleted bool) (*sql.Rows, error) { sql := d.GetCurrentSQL if limit > 0 { - sql = d.applyLimit(sql, limit) + sql = d.ApplyLimit(sql, limit) } return d.query(ctx, sql, prefix, includeDeleted) } @@ -284,14 +287,14 @@ func (d *Generic) List(ctx context.Context, prefix, startKey string, limit, revi if startKey == "" { sql := d.ListRevisionStartSQL if limit > 0 { - sql = d.applyLimit(sql, limit) + sql = d.ApplyLimit(sql, limit) } return d.query(ctx, sql, prefix, revision, includeDeleted) } sql := d.GetRevisionAfterSQL if limit > 0 { - sql = d.applyLimit(sql, limit) + sql = d.ApplyLimit(sql, limit) } return d.query(ctx, sql, prefix, revision, startKey, revision, includeDeleted) } @@ -320,7 +323,7 @@ func (d *Generic) CurrentRevision(ctx context.Context) (int64, error) { func (d *Generic) After(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error) { sql := d.AfterSQL if limit > 0 { - sql = d.applyLimit(sql, limit) + sql = d.ApplyLimit(sql, limit) } return d.query(ctx, sql, prefix, rev) } @@ -364,14 +367,3 @@ func (d *Generic) Insert(ctx context.Context, key string, create, delete bool, c err = row.Scan(&id) return id, err } - -func (d Generic) applyLimit(sql string, limit int64) string { - if d.DriverName != "sqlserver" { - sql = fmt.Sprintf("%s LIMIT %d", sql, limit) - } else { - limitRewrite := fmt.Sprintf("SELECT TOP %d ", limit) - strings.Replace(sql, "SELECT", limitRewrite, 1) - } - - return sql -} diff --git a/pkg/drivers/sqlserver/sqlserver.go b/pkg/drivers/sqlserver/sqlserver.go index 1d1859f0..2777e13f 100644 --- a/pkg/drivers/sqlserver/sqlserver.go +++ b/pkg/drivers/sqlserver/sqlserver.go @@ -5,6 +5,7 @@ import ( "database/sql" "fmt" "net/url" + "strings" "time" "github.com/sirupsen/logrus" @@ -144,8 +145,7 @@ func setupGenericDriver(ctx context.Context, driverName, dataSourceName string, } } return &generic.Generic{ - DB: db, - DriverName: driverName, + DB: db, GetRevisionSQL: generic.QueryBuilder(fmt.Sprintf(` SELECT 0, 0, %s @@ -188,6 +188,11 @@ func setupGenericDriver(ctx context.Context, driverName, dataSourceName string, FillSQL: generic.QueryBuilder(`INSERT INTO kine(id, name, created, deleted, create_revision, prev_revision, lease, value, old_value) values(?, ?, ?, ?, ?, ?, ?, ?, ?);select SCOPE_IDENTITY()`, paramCharacter, numbered), RevSQL: revSQL, + ApplyLimit: func(sql string, limit int64) string { + limitRewrite := fmt.Sprintf("SELECT TOP %d ", limit) + sql = strings.Replace(sql, "SELECT TOP 100 PERCENT", limitRewrite, 1) + return sql + }, }, err }