From 84d6082406c7b6550ddf13f31ead78f78e6013eb Mon Sep 17 00:00:00 2001 From: Isaak Date: Tue, 20 Aug 2024 18:50:06 -0400 Subject: [PATCH 1/3] #3042 added put method to Python client Endpoint --- .../streampipes/endpoint/endpoint.py | 20 ++++++++++++++++ .../tests/client/test_endpoint.py | 23 +++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/streampipes-client-python/streampipes/endpoint/endpoint.py b/streampipes-client-python/streampipes/endpoint/endpoint.py index 2c491c6dd1..8f5d0d18b4 100644 --- a/streampipes-client-python/streampipes/endpoint/endpoint.py +++ b/streampipes-client-python/streampipes/endpoint/endpoint.py @@ -236,6 +236,26 @@ def post(self, resource: Resource) -> None: headers={"Content-type": "application/json"}, ) + def put(self, resource: Resource) -> None: + """Allows to put a resource to the StreamPipes API. + + Parameters + ---------- + resource: Resource + The resource to be updated. + + Returns + ------- + None + """ + + self._make_request( + request_method=self._parent_client.request_session.put, + url=f"{self.build_url()}", + data=json.dumps(resource.to_dict(use_source_names=True)), + headers={"Content-type": "application/json"}, + ) + class MessagingEndpoint(Endpoint): """Abstract implementation of a StreamPipes messaging endpoint. diff --git a/streampipes-client-python/tests/client/test_endpoint.py b/streampipes-client-python/tests/client/test_endpoint.py index 69ab569b8d..a7d1dab224 100644 --- a/streampipes-client-python/tests/client/test_endpoint.py +++ b/streampipes-client-python/tests/client/test_endpoint.py @@ -241,6 +241,29 @@ def test_endpoint_post(self, server_version: MagicMock, http_session: MagicMock) headers={"Content-type": "application/json"}, ) + @patch("streampipes.client.client.Session", autospec=True) + @patch("streampipes.client.client.StreamPipesClient._get_server_version", autospec=True) + def test_endpoint_put(self, server_version: MagicMock, http_session: MagicMock): + http_session_mock = MagicMock() + http_session.return_value = http_session_mock + + server_version.return_value = {"backendVersion": "0.x.y"} + + client = StreamPipesClient( + client_config=StreamPipesClientConfig( + credential_provider=StreamPipesApiKeyCredentials(username="user", api_key="key"), + host_address="localhost", + ) + ) + + client.dataStreamApi.put(DataStream(**self.data_stream_get)) + + http_session_mock.put.assert_called_with( + url="https://localhost:80/streampipes-backend/api/v2/streams", + data=json.dumps(self.data_stream_get), + headers={"Content-type": "application/json"}, + ) + @patch("streampipes.client.client.Session", autospec=True) @patch("streampipes.client.client.StreamPipesClient._get_server_version", autospec=True) def test_endpoint_data_stream_happy_path(self, server_version: MagicMock, http_session: MagicMock): From a5947484cace24ed55ffc2573e46ecf6292bccbd Mon Sep 17 00:00:00 2001 From: Isaak Date: Tue, 20 Aug 2024 18:50:36 -0400 Subject: [PATCH 2/3] #3042 added Put endpoint to Data Streams back-end --- .../rest/impl/pe/DataStreamResource.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/pe/DataStreamResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/pe/DataStreamResource.java index 70822846fe..2ecab6d806 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/pe/DataStreamResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/pe/DataStreamResource.java @@ -34,6 +34,7 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @@ -89,6 +90,20 @@ public ResponseEntity addDataStream(@RequestBody SpDataStream dataStream) { } } + @PutMapping( + produces = MediaType.APPLICATION_JSON_VALUE, + consumes = MediaType.APPLICATION_JSON_VALUE + ) + @PreAuthorize(AuthConstants.HAS_WRITE_PIPELINE_ELEMENT_PRIVILEGE) + public ResponseEntity updateDataStream(@RequestBody SpDataStream dataStream) { + try { + getDataStreamResourceManager().update(dataStream); + return ok(); + } catch (IllegalArgumentException e) { + return badRequest(e.getMessage()); + } + } + private DataStreamResourceManager getDataStreamResourceManager() { return getSpResourceManager().manageDataStreams(); } From cfceff7e47cf38212835735d7bdb758abed69b41 Mon Sep 17 00:00:00 2001 From: Isaak Date: Fri, 23 Aug 2024 14:25:53 -0400 Subject: [PATCH 3/3] #3042 added pipeline update --- .../management/AdapterUpdateManagement.java | 8 ++++++ .../rest/impl/pe/DataStreamResource.java | 26 ++++++++++++++++--- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java index 9aae62888d..bc9d57738a 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java @@ -105,6 +105,14 @@ public void updateAdapter(AdapterDescription ad) } } + public void updateDataStream(SpDataStream dataStream) throws AdapterException { + var correspondingAdapter = adapterMasterManagement.getAdapter(dataStream.getCorrespondingAdapterId()); + dataStreamResourceManager.update(dataStream); + + correspondingAdapter.setDataStream(dataStream); + updateAdapter(correspondingAdapter); + } + public List checkPipelineMigrations(AdapterDescription adapterDescription) { var affectedPipelines = PipelineManager .getPipelinesContainingElements(adapterDescription.getCorrespondingDataStreamElementId()); diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/pe/DataStreamResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/pe/DataStreamResource.java index 2ecab6d806..b5b6ab452a 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/pe/DataStreamResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/pe/DataStreamResource.java @@ -18,13 +18,19 @@ package org.apache.streampipes.rest.impl.pe; +import org.apache.streampipes.commons.exceptions.connect.AdapterException; +import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager; +import org.apache.streampipes.connect.management.management.AdapterMasterManagement; +import org.apache.streampipes.connect.management.management.AdapterUpdateManagement; import org.apache.streampipes.model.SpDataStream; import org.apache.streampipes.model.message.Message; import org.apache.streampipes.model.message.NotificationType; import org.apache.streampipes.model.monitoring.SpLogMessage; import org.apache.streampipes.resource.management.DataStreamResourceManager; -import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource; +import org.apache.streampipes.resource.management.SpResourceManager; +import org.apache.streampipes.rest.impl.connect.AbstractAdapterResource; import org.apache.streampipes.rest.security.AuthConstants; +import org.apache.streampipes.storage.management.StorageDispatcher; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; @@ -43,7 +49,17 @@ @RestController @RequestMapping("/api/v2/streams") -public class DataStreamResource extends AbstractAuthGuardedRestResource { +public class DataStreamResource extends AbstractAdapterResource { + + public DataStreamResource() { + super(() -> new AdapterMasterManagement( + StorageDispatcher.INSTANCE.getNoSqlStore() + .getAdapterInstanceStorage(), + new SpResourceManager().manageAdapters(), + new SpResourceManager().manageDataStreams(), + AdapterMetricsManager.INSTANCE.getAdapterMetrics() + )); + } @GetMapping(path = "/available", produces = MediaType.APPLICATION_JSON_VALUE) @PreAuthorize(AuthConstants.HAS_READ_PIPELINE_ELEMENT_PRIVILEGE) @@ -98,8 +114,12 @@ public ResponseEntity addDataStream(@RequestBody SpDataStream dataStream) { public ResponseEntity updateDataStream(@RequestBody SpDataStream dataStream) { try { getDataStreamResourceManager().update(dataStream); + var updateManager = new AdapterUpdateManagement(managementService); + + updateManager.updateDataStream(dataStream); + return ok(); - } catch (IllegalArgumentException e) { + } catch (AdapterException e) { return badRequest(e.getMessage()); } }