diff --git a/Makefile b/Makefile index cc6397c7b6..dcb06eb860 100644 --- a/Makefile +++ b/Makefile @@ -33,6 +33,7 @@ install: $(APPS) deps: @go mod tidy + @cd internal/impl/scylla && go mod tidy SOURCE_FILES = $(shell find internal public cmd -type f) TEMPLATE_FILES = $(shell find internal/impl -type f -name "template_*.yaml") @@ -79,6 +80,7 @@ test-race: $(APPS) test-integration: $(warning WARNING! Running the integration tests in their entirety consumes a huge amount of computing resources and is likely to time out on most machines. It's recommended that you instead run the integration suite for connectors you are working selectively with `go test -run 'TestIntegration/kafka' ./...` and so on.) @go test $(GO_FLAGS) -ldflags "$(LD_FLAGS)" -run "^Test.*Integration.*$$" -timeout 5m ./... + @cd internal/impl/scylla && go test $(GO_FLAGS) -ldflags "$(LD_FLAGS)" -run "^Test.*Integration.*$$" -timeout 5m ./... clean: rm -rf $(PATHINSTBIN) diff --git a/go.mod b/go.mod index 16c202141b..3094e6cf9a 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,9 @@ module github.com/redpanda-data/connect/v4 -replace github.com/99designs/keyring => github.com/Jeffail/keyring v1.2.3 +replace ( + github.com/99designs/keyring => github.com/Jeffail/keyring v1.2.3 + github.com/redpanda-data/connect/v4/internal/impl/scylla => ./internal/impl/scylla +) require ( cloud.google.com/go/bigquery v1.61.0 @@ -95,6 +98,7 @@ require ( github.com/redis/go-redis/v9 v9.5.3 github.com/redpanda-data/benthos/v4 v4.33.0 github.com/redpanda-data/connect/public/bundle/free/v4 v4.29.0 + github.com/redpanda-data/connect/v4/internal/impl/scylla v0.0.0-00010101000000-000000000000 github.com/rs/xid v1.2.1 github.com/sijms/go-ora/v2 v2.8.19 github.com/smira/go-statsd v1.3.3 diff --git a/internal/impl/scylla/go.mod b/internal/impl/scylla/go.mod new file mode 100644 index 0000000000..c97d1db92f --- /dev/null +++ b/internal/impl/scylla/go.mod @@ -0,0 +1,85 @@ +module github.com/redpanda-data/connect/v4/internal/impl/scylla + +go 1.22.0 + +toolchain go1.22.4 + +replace ( + github.com/gocql/gocql => github.com/scylladb/gocql v1.14.2 + github.com/redpanda-data/connect/v4 => ../../../../.. +) + +require ( + github.com/gocql/gocql v0.0.0-00010101000000-000000000000 + github.com/ory/dockertest/v3 v3.10.0 + github.com/redpanda-data/benthos/v4 v4.33.0 + github.com/stretchr/testify v1.9.0 +) + +require ( + cuelang.org/go v0.9.1 // indirect + github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect + github.com/Jeffail/gabs/v2 v2.7.0 // indirect + github.com/Jeffail/shutdown v1.0.0 // indirect + github.com/Microsoft/go-winio v0.6.0 // indirect + github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect + github.com/OneOfOne/xxhash v1.2.8 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/cockroachdb/apd/v3 v3.2.1 // indirect + github.com/containerd/continuity v0.3.0 // indirect + github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/docker/cli v20.10.17+incompatible // indirect + github.com/docker/docker v20.10.7+incompatible // indirect + github.com/docker/go-connections v0.4.0 // indirect + github.com/docker/go-units v0.4.0 // indirect + github.com/fatih/color v1.17.0 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/fsnotify/fsnotify v1.7.0 // indirect + github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/gofrs/uuid v4.4.0+incompatible // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang-jwt/jwt/v5 v5.2.1 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect + github.com/gorilla/handlers v1.5.2 // indirect + github.com/gorilla/mux v1.8.1 // indirect + github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect + github.com/imdario/mergo v0.3.12 // indirect + github.com/matoous/go-nanoid/v2 v2.1.0 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/mitchellh/mapstructure v1.4.1 // indirect + github.com/moby/term v0.0.0-20201216013528-df9cb8a40635 // indirect + github.com/nsf/jsondiff v0.0.0-20210926074059-1e845ec5d249 // indirect + github.com/opencontainers/go-digest v1.0.0 // indirect + github.com/opencontainers/image-spec v1.1.0 // indirect + github.com/opencontainers/runc v1.1.5 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/segmentio/ksuid v1.0.4 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect + github.com/tilinna/z85 v1.0.0 // indirect + github.com/urfave/cli/v2 v2.27.2 // indirect + github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect + github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect + github.com/xeipuuv/gojsonschema v1.2.0 // indirect + github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect + github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect + go.opentelemetry.io/otel v1.27.0 // indirect + go.opentelemetry.io/otel/metric v1.27.0 // indirect + go.opentelemetry.io/otel/trace v1.27.0 // indirect + golang.org/x/crypto v0.25.0 // indirect + golang.org/x/mod v0.17.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.22.0 // indirect + golang.org/x/text v0.16.0 // indirect + golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect + gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/internal/impl/scylla/go.sum b/internal/impl/scylla/go.sum new file mode 100644 index 0000000000..a2ece5cf58 --- /dev/null +++ b/internal/impl/scylla/go.sum @@ -0,0 +1,314 @@ +cuelabs.dev/go/oci/ociregistry v0.0.0-20240404174027-a39bec0462d2 h1:BnG6pr9TTr6CYlrJznYUDj6V7xldD1W+1iXPum0wT/w= +cuelabs.dev/go/oci/ociregistry v0.0.0-20240404174027-a39bec0462d2/go.mod h1:pK23AUVXuNzzTpfMCA06sxZGeVQ/75FdVtW249de9Uo= +cuelang.org/go v0.9.1 h1:SkNkBFMcGpDjjYbbEthAogVP86VA48vRt/KvZ2Xb5OU= +cuelang.org/go v0.9.1/go.mod h1:qpAYsLOf7gTM1YdEg6cxh553uZ4q9ZDWlPbtZr9q1Wk= +github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8= +github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/Jeffail/gabs/v2 v2.7.0 h1:Y2edYaTcE8ZpRsR2AtmPu5xQdFDIthFG0jYhu5PY8kg= +github.com/Jeffail/gabs/v2 v2.7.0/go.mod h1:dp5ocw1FvBBQYssgHsG7I1WYsiLRtkUaB1FEtSwvNUw= +github.com/Jeffail/grok v1.1.0 h1:kiHmZ+0J5w/XUihRgU3DY9WIxKrNQCDjnfAb6bMLFaE= +github.com/Jeffail/grok v1.1.0/go.mod h1:dm0hLksrDwOMa6To7ORXCuLbuNtASIZTfYheavLpsuE= +github.com/Jeffail/shutdown v1.0.0 h1:afYjnY4pksqP/012m3NGJVccDI+WATdSzIMVHZKU8/Y= +github.com/Jeffail/shutdown v1.0.0/go.mod h1:5dT4Y1oe60SJELCkmAB1pr9uQyHBhh6cwDLQTfmuO5U= +github.com/Microsoft/go-winio v0.6.0 h1:slsWYD/zyx7lCXoZVlvQrj0hPTM1HI4+v1sIda2yDvg= +github.com/Microsoft/go-winio v0.6.0/go.mod h1:cTAf44im0RAYeL23bpB+fzCyDH2MJiz2BO69KH/soAE= +github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= +github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= +github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= +github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= +github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY= +github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/checkpoint-restore/go-criu/v5 v5.3.0/go.mod h1:E/eQpaFtUKGOOSEBZgmKAcn+zUUwWxqcaKZlF54wK8E= +github.com/cilium/ebpf v0.7.0/go.mod h1:/oI2+1shJiTGAMgl6/RgJr36Eo1jzrRcAWbcXO2usCA= +github.com/cockroachdb/apd/v3 v3.2.1 h1:U+8j7t0axsIgvQUqthuNm82HIrYXodOV2iWLWtEaIwg= +github.com/cockroachdb/apd/v3 v3.2.1/go.mod h1:klXJcjp+FffLTHlhIG69tezTDvdP065naDsHzKhYSqc= +github.com/containerd/console v1.0.3/go.mod h1:7LqA/THxQ86k76b8c/EMSiaJ3h1eZkMkXar0TQ1gf3U= +github.com/containerd/continuity v0.3.0 h1:nisirsYROK15TAMVukJOUyGJjz4BNQJBVsNvAXZJ/eg= +github.com/containerd/continuity v0.3.0/go.mod h1:wJEAIwKOm/pBZuBd0JmeTvnLquTB1Ag8espWhkykbPM= +github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4= +github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw= +github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/cyphar/filepath-securejoin v0.2.3/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/docker/cli v20.10.17+incompatible h1:eO2KS7ZFeov5UJeaDmIs1NFEDRf32PaqRpvoEkKBy5M= +github.com/docker/cli v20.10.17+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8= +github.com/docker/docker v20.10.7+incompatible h1:Z6O9Nhsjv+ayUEeI1IojKbYcsGdgYSNqxe1s2MYzUhQ= +github.com/docker/docker v20.10.7+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= +github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= +github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= +github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/emicklei/proto v1.10.0 h1:pDGyFRVV5RvV+nkBK9iy3q67FBy9Xa7vwrOTE+g5aGw= +github.com/emicklei/proto v1.10.0/go.mod h1:rn1FgRS/FANiZdD2djyH7TMA9jdRDcYQ9IEN9yvjX0A= +github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4= +github.com/fatih/color v1.17.0/go.mod h1:YZ7TlrGPkiz6ku9fK3TLD/pl3CpsiFyu8N92HLgmosI= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7eI= +github.com/go-quicktest/qt v1.101.0/go.mod h1:14Bz/f7NwaXPtdYEgzsx46kqSxVwTbzVZsDC26tQJow= +github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= +github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/godbus/dbus/v5 v5.0.6/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA= +github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= +github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4= +github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/handlers v1.5.2 h1:cLTUSsNkgcwhgRqvCNmdbRWG0A3N4F+M2nWKdScwyEE= +github.com/gorilla/handlers v1.5.2/go.mod h1:dX+xVpaxdSw+q0Qek8SSsl3dfMk3jNddUkMzo0GtH0w= +github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= +github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/govalues/decimal v0.1.29 h1:GKC5g9y9oWxKIy51czdHTShOABwHm/shVuOVPwG415M= +github.com/govalues/decimal v0.1.29/go.mod h1:LUlHHucpCmA4rJfNrDvMgrWibDpYnDNWqJuNU1/gxW8= +github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= +github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= +github.com/hashicorp/golang-lru/arc/v2 v2.0.7 h1:QxkVTxwColcduO+LP7eJO56r2hFiG8zEbfAAzRv52KQ= +github.com/hashicorp/golang-lru/arc/v2 v2.0.7/go.mod h1:Pe7gBlGdc8clY5LJ0LpJXMt5AmgmWNH1g+oFFVUHOEc= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU= +github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= +github.com/influxdata/go-syslog/v3 v3.0.0 h1:jichmjSZlYK0VMmlz+k4WeOQd7z745YLsvGMqwtYt4I= +github.com/influxdata/go-syslog/v3 v3.0.0/go.mod h1:tulsOp+CecTAYC27u9miMgq21GqXRW6VdKbOG+QSP4Q= +github.com/itchyny/gojq v0.12.16 h1:yLfgLxhIr/6sJNVmYfQjTIv0jGctu6/DgDoivmxTr7g= +github.com/itchyny/gojq v0.12.16/go.mod h1:6abHbdC2uB9ogMS38XsErnfqJ94UlngIJGlRAIj4jTM= +github.com/itchyny/timefmt-go v0.1.6 h1:ia3s54iciXDdzWzwaVKXZPbiXzxxnv1SPGFfM/myJ5Q= +github.com/itchyny/timefmt-go v0.1.6/go.mod h1:RRDZYC5s9ErkjQvTvvU7keJjxUYzIISJGxm9/mAERQg= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/pgzip v1.2.6 h1:8RXeL5crjEUFnR2/Sn6GJNWtSQ3Dk8pq4CL3jvdDyjU= +github.com/klauspost/pgzip v1.2.6/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/linkedin/goavro/v2 v2.13.0 h1:L8eI8GcuciwUkt41Ej62joSZS4kKaYIUdze+6for9NU= +github.com/linkedin/goavro/v2 v2.13.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk= +github.com/matoous/go-nanoid/v2 v2.1.0 h1:P64+dmq21hhWdtvZfEAofnvJULaRR1Yib0+PnU669bE= +github.com/matoous/go-nanoid/v2 v2.1.0/go.mod h1:KlbGNQ+FhrUNIHUxZdL63t7tl4LaPkZNpUULS8H4uVM= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mitchellh/go-wordwrap v1.0.1 h1:TLuKupo69TCn6TQSyGxwI1EblZZEsQ0vMlAFQflz0v0= +github.com/mitchellh/go-wordwrap v1.0.1/go.mod h1:R62XHJLzvMFRBbcrT7m7WgmE1eOyTSsCt+hzestvNj0= +github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxdASFVQag= +github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/moby/sys/mountinfo v0.5.0/go.mod h1:3bMD3Rg+zkqx8MRYPi7Pyb0Ie97QEBmdxbhnCLlSvSU= +github.com/moby/term v0.0.0-20201216013528-df9cb8a40635 h1:rzf0wL0CHVc8CEsgyygG0Mn9CNCCPZqOPaz8RiiHYQk= +github.com/moby/term v0.0.0-20201216013528-df9cb8a40635/go.mod h1:FBS0z0QWA44HXygs7VXDUOGoN/1TV3RuWkLO04am3wc= +github.com/mrunalp/fileutils v0.5.0/go.mod h1:M1WthSahJixYnrXQl/DFQuteStB1weuxD2QJNHXfbSQ= +github.com/nsf/jsondiff v0.0.0-20210926074059-1e845ec5d249 h1:NHrXEjTNQY7P0Zfx1aMrNhpgxHmow66XQtm0aQLY0AE= +github.com/nsf/jsondiff v0.0.0-20210926074059-1e845ec5d249/go.mod h1:mpRZBD8SJ55OIICQ3iWH0Yz3cjzA61JdqMLoWXeB2+8= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= +github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= +github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= +github.com/opencontainers/runc v1.1.5 h1:L44KXEpKmfWDcS02aeGm8QNTFXTo2D+8MYGDIJ/GDEs= +github.com/opencontainers/runc v1.1.5/go.mod h1:1J5XiS+vdZ3wCyZybsuxXZWGrgSr8fFJHLXuG2PsnNg= +github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= +github.com/opencontainers/selinux v1.10.0/go.mod h1:2i0OySw99QjzBBQByd1Gr9gSjvuho1lHsJxIJ3gGbJI= +github.com/ory/dockertest/v3 v3.10.0 h1:4K3z2VMe8Woe++invjaTB7VRyQXQy5UY+loujO4aNE4= +github.com/ory/dockertest/v3 v3.10.0/go.mod h1:nr57ZbRWMqfsdGdFNLHz5jjNdDb7VVFnzAeW1n5N1Lg= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/protocolbuffers/txtpbfmt v0.0.0-20230328191034-3462fbc510c0 h1:sadMIsgmHpEOGbUs6VtHBXRR1OHevnj7hLx9ZcdNGW4= +github.com/protocolbuffers/txtpbfmt v0.0.0-20230328191034-3462fbc510c0/go.mod h1:jgxiZysxFPM+iWKwQwPR+y+Jvo54ARd4EisXxKYpB5c= +github.com/quipo/dependencysolver v0.0.0-20170801134659-2b009cb4ddcc h1:hK577yxEJ2f5s8w2iy2KimZmgrdAUZUNftE1ESmg2/Q= +github.com/quipo/dependencysolver v0.0.0-20170801134659-2b009cb4ddcc/go.mod h1:OQt6Zo5B3Zs+C49xul8kcHo+fZ1mCLPvd0LFxiZ2DHc= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/redpanda-data/benthos/v4 v4.33.0 h1:CrgAjbzgn6G4xKA7Y5Byf3ikpwSkqProNgqKLFKrJBQ= +github.com/redpanda-data/benthos/v4 v4.33.0/go.mod h1:E3ivVEVcwCC/wsvDJ8uDK84FnNXObuAIAdwzyljE6/s= +github.com/rickb777/period v1.0.5 h1:jAzlI2knYam5VMy0X8eYgqJBl0ew57N+J1djJSBOulM= +github.com/rickb777/period v1.0.5/go.mod h1:AmEwpgIShi3EEw34qbafoPJxVeRbv9VVtjLyOeRwK6c= +github.com/rickb777/plural v1.4.2 h1:Kl/syFGLFZ5EbuV8c9SVud8s5HI2HpCCtOMw2U1kS+A= +github.com/rickb777/plural v1.4.2/go.mod h1:kdmXUpmKBJTS0FtG/TFumd//VBWsNTD7zOw7x4umxNw= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/scylladb/gocql v1.14.2 h1:IBPtfJFcRDzifCjXYMtrZ14oQ7OqpqQjwITQCwtGZsc= +github.com/scylladb/gocql v1.14.2/go.mod h1:ZLEJ0EVE5JhmtxIW2stgHq/v1P4fWap0qyyXSKyV8K0= +github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646/go.mod h1:JA8cRccbGaA1s33RQf7Y1+q9gHmZX1yB/z9WDN1C6fg= +github.com/segmentio/ksuid v1.0.4 h1:sBo2BdShXjmcugAMwjugoGUdUV0pcxY5mW4xKRn3v4c= +github.com/segmentio/ksuid v1.0.4/go.mod h1:/XUiZBD3kVx5SmUOl55voK5yeAbBNNIed+2O73XgrPE= +github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= +github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= +github.com/tilinna/z85 v1.0.0 h1:uqFnJBlD01dosSeo5sK1G1YGbPuwqVHqR+12OJDRjUw= +github.com/tilinna/z85 v1.0.0/go.mod h1:EfpFU/DUY4ddEy6CRvk2l+UQNEzHbh+bqBQS+04Nkxs= +github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= +github.com/urfave/cli/v2 v2.27.2 h1:6e0H+AkS+zDckwPCUrZkKX38mRaau4nL2uipkJpbkcI= +github.com/urfave/cli/v2 v2.27.2/go.mod h1:g0+79LmHHATl7DAcHO99smiR/T7uGLw84w8Y42x+4eM= +github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE= +github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= +github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= +github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo= +github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= +github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0= +github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= +github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74= +github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= +github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 h1:+qGGcbkzsfDQNPPe9UDgpxAWQrhbbBXOYJFQDq/dtJw= +github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913/go.mod h1:4aEEwZQutDLsQv2Deui4iYQ6DWTxR14g6m8Wv88+Xqk= +github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a h1:fZHgsYlfvtyqToslyjUt3VOPF4J7aK/3MPcK7xp3PDk= +github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a/go.mod h1:ul22v+Nro/R083muKhosV54bj5niojjWZvU8xrevuH4= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg= +go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ= +go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0FxV/ik= +go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak= +go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw= +go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= +golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= +golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220526153639-5463443f8c37/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/oauth2 v0.20.0 h1:4mQdhULixXKP1rwYBW0vAijoXnkTG0BLCDRzfe1idMo= +golang.org/x/oauth2 v0.20.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190606203320-7fc4e5ec1444/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191115151921-52ab43148777/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200831180312-196b9ba8737a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190624222133-a101b041ded4/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk= +gotest.tools/v3 v3.3.0 h1:MfDY1b1/0xN1CyMlQDac0ziEy9zJQd9CXBRRDHw2jJo= +gotest.tools/v3 v3.3.0/go.mod h1:Mcr9QNxkg0uMvy/YElmo4SpXgJKWgQvYrT7Kw5RzJ1A= +sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo= +sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8= diff --git a/internal/impl/scylla/input.go b/internal/impl/scylla/input.go new file mode 100644 index 0000000000..ac4acf9b35 --- /dev/null +++ b/internal/impl/scylla/input.go @@ -0,0 +1,140 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scylla + +import ( + "context" + "fmt" + + "github.com/gocql/gocql" + + "github.com/redpanda-data/benthos/v4/public/service" +) + +const ( + ciFieldQuery = "query" +) + +func inputConfigSpec() *service.ConfigSpec { + spec := service.NewConfigSpec(). + Categories("Services"). + Summary("Executes a find query and creates a message for each row received."). + Fields(clientFields()...). + Field(service.NewStringField(ciFieldQuery). + Description("A query to execute.")). + Field(service.NewAutoRetryNacksToggleField()). + Example("Minimal Select (Cassandra/Scylla)", + ` +Let's presume that we have 3 Cassandra nodes, like in this tutorial by Sebastian Sigl from freeCodeCamp: + +https://www.freecodecamp.org/news/the-apache-cassandra-beginner-tutorial/ + +Then if we want to select everything from the table users_by_country, we should use the configuration below. +If we specify the stdin output, the result will look like: + +`+"```json"+` +{"age":23,"country":"UK","first_name":"Bob","last_name":"Sandler","user_email":"bob@email.com"} +`+"```"+` + +This configuration also works for Scylla. +`, + ` +input: + cassandra: + addresses: + - 172.17.0.2 + query: + 'SELECT * FROM learn_cassandra.users_by_country' +`, + ) + return spec +} + +func init() { + err := service.RegisterInput( + "cassandra", inputConfigSpec(), + func(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) { + return newCassandraInput(conf) + }) + if err != nil { + panic(err) + } +} + +func newCassandraInput(conf *service.ParsedConfig) (service.Input, error) { + query, err := conf.FieldString(ciFieldQuery) + if err != nil { + return nil, err + } + + clientConf, err := clientConfFromParsed(conf) + if err != nil { + return nil, err + } + + return service.AutoRetryNacksToggled(conf, &cassandraInput{ + query: query, + clientConf: clientConf, + }) +} + +type cassandraInput struct { + query string + clientConf clientConf + + session *gocql.Session + iter *gocql.Iter +} + +func (c *cassandraInput) Connect(ctx context.Context) error { + if c.session != nil { + return nil + } + + conn, err := c.clientConf.Create() + if err != nil { + return err + } + + session, err := conn.CreateSession() + if err != nil { + return fmt.Errorf("creating Cassandra session: %w", err) + } + + c.session = session + c.iter = session.Query(c.query).Iter() + return nil +} + +func (c *cassandraInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error) { + mp := make(map[string]interface{}) + if !c.iter.MapScan(mp) { + return nil, nil, service.ErrEndOfInput + } + + msg := service.NewMessage(nil) + msg.SetStructuredMut(mp) + return msg, func(ctx context.Context, err error) error { + return nil + }, nil +} + +func (c *cassandraInput) Close(ctx context.Context) error { + if c.session != nil { + c.session.Close() + c.session = nil + } + return nil +} diff --git a/internal/impl/scylla/integration_test.go b/internal/impl/scylla/integration_test.go new file mode 100644 index 0000000000..d989bf9265 --- /dev/null +++ b/internal/impl/scylla/integration_test.go @@ -0,0 +1,158 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scylla + +import ( + "context" + "fmt" + "github.com/ory/dockertest/v3" + "strings" + "testing" + "time" + + "github.com/gocql/gocql" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/redpanda-data/benthos/v4/public/service/integration" +) + +func TestIntegrationScylla(t *testing.T) { + integration.CheckSkip(t) + + t.Parallel() + + pool, err := dockertest.NewPool("") + require.NoError(t, err) + + pool.MaxWait = time.Minute * 3 + resource, err := pool.Run("scylladb/scylla", "6.0.2", nil) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, pool.Purge(resource)) + }) + + var session *gocql.Session + t.Cleanup(func() { + if session != nil { + session.Close() + } + }) + + _ = resource.Expire(900) + require.NoError(t, pool.Retry(func() error { + if session == nil { + conn := gocql.NewCluster(fmt.Sprintf("localhost:%v", resource.GetPort("9042/tcp"))) + conn.Consistency = gocql.All + var rerr error + if session, rerr = conn.CreateSession(); rerr != nil { + return rerr + } + } + _ = session.Query( + "CREATE KEYSPACE testspace WITH replication = {'class':'SimpleStrategy','replication_factor':1};", + ).Exec() + return session.Query( + "CREATE TABLE testspace.testtable (id int primary key, content text, created_at timestamp);", + ).Exec() + })) + + t.Run("with JSON", func(t *testing.T) { + template := ` +output: + cassandra: + addresses: + - localhost:$PORT + query: 'INSERT INTO testspace.table$ID JSON ?' + args_mapping: 'root = [ this ]' +` + queryGetFn := func(ctx context.Context, testID, messageID string) (string, []string, error) { + var resID int + var resContent string + if err := session.Query( + fmt.Sprintf("select id, content from testspace.table%v where id = ?;", testID), messageID, + ).Scan(&resID, &resContent); err != nil { + return "", nil, err + } + return fmt.Sprintf(`{"content":"%v","id":%v}`, resContent, resID), nil, err + } + suite := integration.StreamTests( + integration.StreamTestOutputOnlySendSequential(10, queryGetFn), + integration.StreamTestOutputOnlySendBatch(10, queryGetFn), + ) + suite.Run( + t, template, + integration.StreamTestOptPort(resource.GetPort("9042/tcp")), + integration.StreamTestOptSleepAfterInput(time.Second*10), + integration.StreamTestOptSleepAfterOutput(time.Second*10), + integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, vars *integration.StreamTestConfigVars) { + vars.ID = strings.ReplaceAll(vars.ID, "-", "") + require.NoError(t, session.Query( + fmt.Sprintf( + "CREATE TABLE testspace.table%v (id int primary key, content text, created_at timestamp);", + vars.ID, + ), + ).Exec()) + }), + ) + }) + + t.Run("with values", func(t *testing.T) { + template := ` +output: + cassandra: + addresses: + - localhost:$PORT + query: 'INSERT INTO testspace.table$ID (id, content, created_at, meows) VALUES (?, ?, ?, ?)' + args_mapping: | + root = [ this.id, this.content, now(), [ "first meow", "second meow" ] ] +` + queryGetFn := func(ctx context.Context, testID, messageID string) (string, []string, error) { + var resID int + var resContent string + var createdAt time.Time + var meows []string + if err := session.Query( + fmt.Sprintf("select id, content, created_at, meows from testspace.table%v where id = ?;", testID), messageID, + ).Scan(&resID, &resContent, &createdAt, &meows); err != nil { + return "", nil, err + } + if time.Since(createdAt) > time.Hour || time.Since(createdAt) < 0 { + return "", nil, fmt.Errorf("received bad created_at: %v", createdAt) + } + assert.Equal(t, []string{"first meow", "second meow"}, meows) + return fmt.Sprintf(`{"content":"%v","id":%v}`, resContent, resID), nil, err + } + suite := integration.StreamTests( + integration.StreamTestOutputOnlySendSequential(10, queryGetFn), + integration.StreamTestOutputOnlySendBatch(10, queryGetFn), + ) + suite.Run( + t, template, + integration.StreamTestOptPort(resource.GetPort("9042/tcp")), + integration.StreamTestOptSleepAfterInput(time.Second*10), + integration.StreamTestOptSleepAfterOutput(time.Second*10), + integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, vars *integration.StreamTestConfigVars) { + vars.ID = strings.ReplaceAll(vars.ID, "-", "") + require.NoError(t, session.Query( + fmt.Sprintf( + "CREATE TABLE testspace.table%v (id int primary key, content text, created_at timestamp, meows list);", + vars.ID, + ), + ).Exec()) + }), + ) + }) +} diff --git a/internal/impl/scylla/output.go b/internal/impl/scylla/output.go new file mode 100644 index 0000000000..3c96706985 --- /dev/null +++ b/internal/impl/scylla/output.go @@ -0,0 +1,359 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scylla + +import ( + "context" + "encoding/json" + "fmt" + "math" + "math/rand" + "sync" + "time" + + "github.com/gocql/gocql" + + "github.com/redpanda-data/benthos/v4/public/bloblang" + "github.com/redpanda-data/benthos/v4/public/service" +) + +const ( + coFieldQuery = "query" + coFieldArgsMapping = "args_mapping" + coFieldConsistency = "consistency" + coFieldLoggedBatch = "logged_batch" + coFieldBatching = "batching" +) + +func outputSpec() *service.ConfigSpec { + return service.NewConfigSpec(). + Beta(). + Summary("Runs a query against a Cassandra database for each message in order to insert data."). + Description(` +Query arguments can be set using a bloblang array for the fields using the `+"`args_mapping`"+` field. + +When populating timestamp columns the value must either be a string in ISO 8601 format (2006-01-02T15:04:05Z07:00), or an integer representing unix time in seconds.`+service.OutputPerformanceDocs(true, true)). + Example( + "Basic Inserts", + "If we were to create a table with some basic columns with `CREATE TABLE foo.bar (id int primary key, content text, created_at timestamp);`, and were processing JSON documents of the form `{\"id\":\"342354354\",\"content\":\"hello world\",\"timestamp\":1605219406}` using logged batches, we could populate our table with the following config:", + ` +output: + cassandra: + addresses: + - localhost:9042 + query: 'INSERT INTO foo.bar (id, content, created_at) VALUES (?, ?, ?)' + args_mapping: | + root = [ + this.id, + this.content, + this.timestamp + ] + batching: + count: 500 + period: 1s +`, + ). + Example( + "Insert JSON Documents", + "The following example inserts JSON documents into the table `footable` of the keyspace `foospace` using INSERT JSON (https://cassandra.apache.org/doc/latest/cql/json.html#insert-json).", + ` +output: + cassandra: + addresses: + - localhost:9042 + query: 'INSERT INTO foospace.footable JSON ?' + args_mapping: 'root = [ this ]' + batching: + count: 500 + period: 1s +`, + ). + Fields(clientFields()...). + Fields( + service.NewStringField(coFieldQuery). + Description("A query to execute for each message."), + service.NewBloblangField(coFieldArgsMapping). + Description("A xref:guides:bloblang/about.adoc[Bloblang mapping] that can be used to provide arguments to Cassandra queries. The result of the query must be an array containing a matching number of elements to the query arguments."). + Version("3.55.0"). + Optional(), + service.NewStringEnumField(coFieldConsistency, + "ANY", "ONE", "TWO", "THREE", "QUORUM", "ALL", "LOCAL_QUORUM", "EACH_QUORUM", "LOCAL_ONE"). + Description("The consistency level to use."). + Advanced(). + Default("QUORUM"), + service.NewBoolField(coFieldLoggedBatch). + Description("If enabled the driver will perform a logged batch. Disabling this prompts unlogged batches to be used instead, which are less efficient but necessary for alternative storages that do not support logged batches."). + Advanced(). + Default(true), + service.NewOutputMaxInFlightField(), + service.NewBatchPolicyField(coFieldBatching), + ) +} + +func init() { + err := service.RegisterBatchOutput( + "cassandra", outputSpec(), + func(conf *service.ParsedConfig, mgr *service.Resources) (out service.BatchOutput, batchPolicy service.BatchPolicy, maxInFlight int, err error) { + if maxInFlight, err = conf.FieldMaxInFlight(); err != nil { + return + } + if batchPolicy, err = conf.FieldBatchPolicy(coFieldBatching); err != nil { + return + } + out, err = newCassandraWriter(conf, mgr) + return + }) + if err != nil { + panic(err) + } +} + +type cassandraWriter struct { + log *service.Logger + + query string + clientConf clientConf + argsMapping *bloblang.Executor + batchType gocql.BatchType + consistency gocql.Consistency + + session *gocql.Session + connLock sync.RWMutex +} + +func newCassandraWriter(conf *service.ParsedConfig, mgr *service.Resources) (c *cassandraWriter, err error) { + c = &cassandraWriter{ + log: mgr.Logger(), + } + + if c.query, err = conf.FieldString(coFieldQuery); err != nil { + return + } + + if c.clientConf, err = clientConfFromParsed(conf); err != nil { + return + } + + if aStr, _ := conf.FieldString(coFieldArgsMapping); aStr != "" { + if c.argsMapping, err = conf.FieldBloblang(coFieldArgsMapping); err != nil { + return + } + } + + c.batchType = gocql.UnloggedBatch + if loggedBatch, _ := conf.FieldBool(coFieldLoggedBatch); loggedBatch { + c.batchType = gocql.LoggedBatch + } + + var consistencyStr string + if consistencyStr, err = conf.FieldString(coFieldConsistency); err != nil { + return + } + if c.consistency, err = gocql.ParseConsistencyWrapper(consistencyStr); err != nil { + return nil, fmt.Errorf("parsing consistency: %w", err) + } + + return +} + +func (c *cassandraWriter) Connect(ctx context.Context) error { + c.connLock.Lock() + defer c.connLock.Unlock() + if c.session != nil { + return nil + } + + conn, err := c.clientConf.Create() + if err != nil { + return err + } + conn.Consistency = c.consistency + + session, err := conn.CreateSession() + if err != nil { + return fmt.Errorf("creating Cassandra session: %w", err) + } + + c.session = session + return nil +} + +func (c *cassandraWriter) WriteBatch(ctx context.Context, batch service.MessageBatch) error { + c.connLock.RLock() + session := c.session + c.connLock.RUnlock() + + if c.session == nil { + return service.ErrNotConnected + } + + if len(batch) == 1 { + return c.writeRow(session, batch) + } + return c.writeBatch(session, batch) +} + +func (c *cassandraWriter) writeRow(session *gocql.Session, b service.MessageBatch) error { + var argsExec *service.MessageBatchBloblangExecutor + if c.argsMapping != nil { + argsExec = b.BloblangExecutor(c.argsMapping) + } + values, err := c.mapArgs(0, argsExec) + if err != nil { + return fmt.Errorf("parsing args: %w", err) + } + return session.Query(c.query, values...).Exec() +} + +func (c *cassandraWriter) writeBatch(session *gocql.Session, b service.MessageBatch) error { + batch := session.NewBatch(c.batchType) + + var argsExec *service.MessageBatchBloblangExecutor + if c.argsMapping != nil { + argsExec = b.BloblangExecutor(c.argsMapping) + } + + for i := range b { + values, err := c.mapArgs(i, argsExec) + if err != nil { + return fmt.Errorf("parsing args for part: %d: %w", i, err) + } + batch.Query(c.query, values...) + } + + return session.ExecuteBatch(batch) +} + +func (c *cassandraWriter) mapArgs(index int, exec *service.MessageBatchBloblangExecutor) ([]any, error) { + if exec == nil { + return nil, nil + } + + // We've got an "args_mapping" field, extract values from there. + part, err := exec.Query(index) + if err != nil { + return nil, fmt.Errorf("executing bloblang mapping: %w", err) + } + + jraw, err := part.AsStructured() + if err != nil { + return nil, fmt.Errorf("parsing bloblang mapping result as json: %w", err) + } + + j, ok := jraw.([]any) + if !ok { + return nil, fmt.Errorf("expected bloblang mapping result to be []interface{} but was %T", jraw) + } + + for i, v := range j { + j[i] = genericValue{v: v} + } + return j, nil +} + +func (c *cassandraWriter) Close(context.Context) error { + c.connLock.Lock() + if c.session != nil { + c.session.Close() + c.session = nil + } + c.connLock.Unlock() + return nil +} + +type decorator struct { + NumRetries int + Min, Max time.Duration +} + +func (d *decorator) Attempt(q gocql.RetryableQuery) bool { + if q.Attempts() > d.NumRetries { + return false + } + time.Sleep(getExponentialTime(d.Min, d.Max, q.Attempts())) + return true +} + +func getExponentialTime(min, max time.Duration, attempts int) time.Duration { + minFloat := float64(min) + napDuration := minFloat * math.Pow(2, float64(attempts-1)) + + // Add some jitter + napDuration += rand.Float64()*minFloat - (minFloat / 2) + if napDuration > float64(max) { + return max + } + return time.Duration(napDuration) +} + +func (d *decorator) GetRetryType(err error) gocql.RetryType { + switch t := err.(type) { + // not enough replica alive to perform query with required consistency + case *gocql.RequestErrUnavailable: + if t.Alive > 0 { + return gocql.RetryNextHost + } + return gocql.Retry + // write timeout - uncertain whetever write was successful or not + case *gocql.RequestErrWriteTimeout: + if t.Received > 0 { + return gocql.Ignore + } + return gocql.Retry + default: + return gocql.Rethrow + } +} + +type genericValue struct { + v any +} + +// We get typed values out of mappings. However, gocql performs type checking +// and unfortunately does not like timestamp and some other values as strings: +// https://github.com/gocql/gocql/blob/5913df4d474e0b2492a129d17bbb3c04537a15cd/marshal.go#L1160 +// it's also very strict on numerical types, so we need to do some magic here. +func (g genericValue) MarshalCQL(info gocql.TypeInfo) ([]byte, error) { + switch info.Type() { + case gocql.TypeTimestamp: + t, err := bloblang.ValueAsTimestamp(g.v) + if err != nil { + return nil, err + } + return gocql.Marshal(info, t) + case gocql.TypeDouble: + f, err := bloblang.ValueAsFloat64(g.v) + if err != nil { + return nil, err + } + return gocql.Marshal(info, f) + case gocql.TypeFloat: + f, err := bloblang.ValueAsFloat32(g.v) + if err != nil { + return nil, err + } + return gocql.Marshal(info, f) + case gocql.TypeVarchar: + return gocql.Marshal(info, bloblang.ValueToString(g.v)) + } + if _, isJSONNum := g.v.(json.Number); isJSONNum { + i, err := bloblang.ValueAsInt64(g.v) + if err != nil { + return nil, err + } + return gocql.Marshal(info, i) + } + return gocql.Marshal(info, g.v) +} diff --git a/internal/impl/scylla/shared.go b/internal/impl/scylla/shared.go new file mode 100644 index 0000000000..ce200faa0a --- /dev/null +++ b/internal/impl/scylla/shared.go @@ -0,0 +1,283 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scylla + +import ( + "crypto/tls" + "fmt" + "strings" + "time" + + "github.com/gocql/gocql" + + "github.com/redpanda-data/benthos/v4/public/service" +) + +const ( + cFieldAddresses = "addresses" + cFieldTLS = "tls" + cFieldPassAuth = "password_authenticator" + cFieldHostPolicy = "host_policy" + cFieldHostPolicyName = "name" + cFieldHostPolicyDCName = "dc_name" + cFieldHostPolicyRackName = "rack_name" + cFieldHostPolicyFallback = "fallback_policy" + cFieldPassAuthEnabled = "enabled" + cFieldPassAuthUsername = "username" + cFieldPassAuthPassword = "password" + cFieldDisableIHL = "disable_initial_host_lookup" + cFieldMaxRetries = "max_retries" + cFieldBackoff = "backoff" + cFieldConsistency = "consistency" + cFieldBackoffInitInterval = "initial_interval" + cFieldBackoffMaxInterval = "max_interval" + cFieldTimeout = "timeout" +) + +func clientFields() []*service.ConfigField { + return []*service.ConfigField{ + service.NewStringListField(cFieldAddresses). + Description("A list of Cassandra nodes to connect to. Multiple comma separated addresses can be specified on a single line."). + Examples( + []string{"localhost:9042"}, + []string{"foo:9042", "bar:9042"}, + []string{"foo:9042,bar:9042"}, + ), + service.NewTLSToggledField(cFieldTLS).Advanced(), + service.NewObjectField(cFieldPassAuth, + service.NewBoolField(cFieldPassAuthEnabled). + Description("Whether to use password authentication"). + Default(false), + service.NewStringField(cFieldPassAuthUsername). + Description("The username to authenticate as."). + Default(""), + service.NewStringField(cFieldPassAuthPassword). + Description("The password to authenticate with."). + Secret(). + Default(""), + ). + Description("Optional configuration of Cassandra authentication parameters."). + Advanced(), + service.NewBoolField(cFieldDisableIHL). + Description("If enabled the driver will not attempt to get host info from the system.peers table. This can speed up queries but will mean that data_centre, rack and token information will not be available."). + Advanced(). + Default(false), + service.NewIntField(cFieldMaxRetries). + Description("The maximum number of retries before giving up on a request."). + Advanced(). + Default(3), + service.NewObjectField(cFieldBackoff, + service.NewDurationField(cFieldBackoffInitInterval). + Description("The initial period to wait between retry attempts."). + Default("1s"), + service.NewDurationField(cFieldBackoffMaxInterval). + Description("The maximum period to wait between retry attempts."). + Default("5s"), + ). + Description("Control time intervals between retry attempts."). + Advanced(), + service.NewDurationField(cFieldTimeout). + Description("The client connection timeout."). + Default("600ms"), + service.NewObjectField(cFieldHostPolicy, + service.NewStringField(cFieldHostPolicyName). + Description("A host selection policy name."). + Examples("round_robin", "rack_aware", "dc_aware", "token_aware"). + Default("round_robin"), + service.NewStringField(cFieldHostPolicyDCName). + Description(""). + Default(""). + Example("dc1"), + service.NewStringField(cFieldPassAuthPassword). + Description("The password to authenticate with."). + Default(""). + Example("rack1"), + service.NewObjectField(cFieldHostPolicyFallback, + service.NewStringField(cFieldHostPolicyName). + Description("A fallback host selection policy name."). + Examples("round_robin", "rack_aware", "dc_aware"). + Default(""), + service.NewStringField(cFieldHostPolicyDCName). + Description(""). + Default(""). + Example("dc1"), + service.NewStringField(cFieldPassAuthPassword). + Description("The password to authenticate with."). + Default(""). + Example("rack1"), + ).Advanced().Description("A fallback host selection policy configuration. Only needed for if main policy is token_aware."), + ).Advanced().Description("A host selection policy configuration."), + } +} + +type clientConf struct { + addresses []string + tlsEnabled bool + tlsConf *tls.Config + authEnabled bool + authUsername string + authPassword string + disableIHL bool + maxRetries int + backoffInitInterval time.Duration + backoffMaxInterval time.Duration + timeout time.Duration + consistencyLevel gocql.Consistency + hostPolicy gocql.HostSelectionPolicy +} + +func (c *clientConf) Create() (*gocql.ClusterConfig, error) { + conn := gocql.NewCluster(c.addresses...) + if c.tlsEnabled { + conn.SslOpts = &gocql.SslOptions{ + Config: c.tlsConf, + } + conn.DisableInitialHostLookup = c.tlsConf.InsecureSkipVerify + } else { + conn.DisableInitialHostLookup = c.disableIHL + } + + if c.authEnabled { + conn.Authenticator = gocql.PasswordAuthenticator{ + Username: c.authUsername, + Password: c.authPassword, + } + } + + conn.Consistency = c.consistencyLevel + + conn.RetryPolicy = &decorator{ + NumRetries: c.maxRetries, + Min: c.backoffInitInterval, + Max: c.backoffMaxInterval, + } + + conn.PoolConfig.HostSelectionPolicy = c.hostPolicy + + conn.Timeout = c.timeout + return conn, nil +} + +func clientConfFromParsed(conf *service.ParsedConfig) (c clientConf, err error) { + var tmpAddresses []string + if tmpAddresses, err = conf.FieldStringList(cFieldAddresses); err != nil { + return + } + for _, a := range tmpAddresses { + c.addresses = append(c.addresses, strings.Split(a, ",")...) + } + + if c.tlsConf, c.tlsEnabled, err = conf.FieldTLSToggled(cFieldTLS); err != nil { + return + } + + { + authConf := conf.Namespace(cFieldPassAuth) + c.authEnabled, _ = authConf.FieldBool(cFieldPassAuthEnabled) + c.authUsername, _ = authConf.FieldString(cFieldPassAuthUsername) + c.authPassword, _ = authConf.FieldString(cFieldPassAuthPassword) + } + + if c.disableIHL, err = conf.FieldBool(cFieldDisableIHL); err != nil { + return + } + if c.maxRetries, err = conf.FieldInt(cFieldMaxRetries); err != nil { + return + } + if c.backoffInitInterval, err = conf.FieldDuration(cFieldBackoff, cFieldBackoffInitInterval); err != nil { + return + } + if c.backoffMaxInterval, err = conf.FieldDuration(cFieldBackoff, cFieldBackoffMaxInterval); err != nil { + return + } + var cl string + cl, _ = conf.FieldString(cFieldConsistency) + if cl != "" { + if c.consistencyLevel, err = gocql.ParseConsistencyWrapper(cl); err != nil { + return + } + } + + hostPolicyNS := conf.Namespace(cFieldHostPolicy) + policyName, _ := hostPolicyNS.FieldString(cFieldHostPolicyName) + dcName, _ := hostPolicyNS.FieldString(cFieldHostPolicyDCName) + rackName, _ := hostPolicyNS.FieldString(cFieldHostPolicyRackName) + + fallbackNS := hostPolicyNS.Namespace(cFieldHostPolicyFallback) + fallbackPolicyName, _ := fallbackNS.FieldString(cFieldHostPolicyName) + fallbackDCName, _ := fallbackNS.FieldString(cFieldHostPolicyDCName) + fallbackRackName, _ := fallbackNS.FieldString(cFieldHostPolicyRackName) + + var fallbackPolicy gocql.HostSelectionPolicy + if fallbackPolicyName != "" { + fallbackPolicy, err = getFallbackPolicy(fallbackPolicyName, fallbackDCName, fallbackRackName) + if err != nil { + return + } + } + + c.hostPolicy, err = getMainPolicy(policyName, dcName, rackName, fallbackPolicy) + if err != nil { + return + } + + if c.timeout, err = conf.FieldDuration(cFieldTimeout); err != nil { + return + } + return +} + +func getMainPolicy(policyName string, dcName, rackName string, fallback gocql.HostSelectionPolicy) (gocql.HostSelectionPolicy, error) { + switch policyName { + case "dc_aware": + if dcName == "" { + return nil, fmt.Errorf("dc_aware policy requires a dc_name") + } + return gocql.DCAwareRoundRobinPolicy(dcName), nil + case "rack_aware": + if dcName == "" || rackName == "" { + return nil, fmt.Errorf("dc_aware policy requires a dc_name and rack_name") + } + return gocql.RackAwareRoundRobinPolicy(dcName, rackName), nil + case "round_robin", "": + return gocql.RoundRobinHostPolicy(), nil + case "token_aware": + if fallback == nil { + return nil, fmt.Errorf("token_aware policy requires a fallback_policy") + } + return gocql.TokenAwareHostPolicy(fallback), nil + default: + return nil, fmt.Errorf("unrecognized policy name: %v", policyName) + } +} + +func getFallbackPolicy(policyName, dcName, rackName string) (gocql.HostSelectionPolicy, error) { + switch policyName { + case "dc_aware": + if dcName == "" { + return nil, fmt.Errorf("dc_aware policy requires a dc_name") + } + return gocql.DCAwareRoundRobinPolicy(dcName), nil + case "rack_aware": + if dcName == "" || rackName == "" { + return nil, fmt.Errorf("dc_aware policy requires a dc_name and rack_name") + } + return gocql.RackAwareRoundRobinPolicy(dcName, rackName), nil + case "round_robin", "": + return gocql.RoundRobinHostPolicy(), nil + default: + return nil, fmt.Errorf("unrecognized policy name: %v", policyName) + } +} diff --git a/public/components/community/package.go b/public/components/community/package.go index eab6ea7d35..cbdcef62af 100644 --- a/public/components/community/package.go +++ b/public/components/community/package.go @@ -59,6 +59,7 @@ import ( _ "github.com/redpanda-data/connect/v4/public/components/pusher" _ "github.com/redpanda-data/connect/v4/public/components/redis" _ "github.com/redpanda-data/connect/v4/public/components/redpanda" + _ "github.com/redpanda-data/connect/v4/public/components/scylla" _ "github.com/redpanda-data/connect/v4/public/components/sentry" _ "github.com/redpanda-data/connect/v4/public/components/sftp" _ "github.com/redpanda-data/connect/v4/public/components/sql" diff --git a/public/components/scylla/package.go b/public/components/scylla/package.go new file mode 100644 index 0000000000..3005e1799c --- /dev/null +++ b/public/components/scylla/package.go @@ -0,0 +1,20 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scylla + +import ( + // Bring in the internal plugin definitions. + _ "github.com/redpanda-data/connect/v4/internal/impl/scylla" +)