Skip to content

Commit

Permalink
Adding new endpoint to fetch trust monitor events from the admin api (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
nomadmtb authored Oct 30, 2020
1 parent 1f274c9 commit ce5accf
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 1 deletion.
93 changes: 93 additions & 0 deletions duo_client/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2876,3 +2876,96 @@ def sync_user(self, username, directory_key):
'/admin/v1/users/directorysync/{directory_key}/syncuser').format(
directory_key=directory_key)
return self.json_api_call('POST', path, params)

def get_trust_monitor_events_iterator(
self,
mintime,
maxtime,
event_type=None,
):
"""
Returns a generator which yields trust monitor events.
Params:
mintime (int) - Return events that have a surfaced timestamp greater
than or equal to mintime. Timestamp is represented as
a unix timestamp in milliseconds.
maxtime (int) - Return events that have a surfaced timestamp less
than or equal to maxtime. Timestamp is represented as
a unix timestamp in milliseconds.
event_type (str, optional) - Limit the events returned by a supplied
event type represented as a string. If
not supplied, the caller will recieve all
event types. Check the Duo Admin API
documentation for expected values for
this parameter.
Returns: Generator which yields trust monitor events.
"""

params = {
"mintime": "{}".format(mintime),
"maxtime": "{}".format(maxtime),
}

if event_type is not None:
params["type"] = event_type

return self.json_cursor_api_call(
"GET",
"/admin/v1/trust_monitor/events",
params,
lambda resp: resp["events"],
)

def get_trust_monitor_events_by_offset(
self,
mintime,
maxtime,
limit=None,
offset=None,
event_type=None,
):
"""
Fetch Duo Trust Monitor Events from the Admin API.
Params:
mintime (int) - Return events that have a surfaced timestamp greater
than or equal to mintime. Timestamp is represented as
a unix timestamp in milliseconds.
maxtime (int) - Return events that have a surfaced timestamp less
than or equal to maxtime. Timestamp is represented as
a unix timestamp in milliseconds.
limit (int, optional) - Limit the number of events returned.
offset (str, optional) - Provide an offset from a previous request's
response metadata["next_offset"].
event_type (str, optional) - Limit the events returned by a supplied
event type represented as a string. If
not supplied, the caller will recieve all
event types. Check the Duo Admin API
documentation for expected values for
this parameter.
Returns: response containing a list of Duo Trust Monitor Events.
"""

params = {
"mintime": "{}".format(mintime),
"maxtime": "{}".format(maxtime),
}

if limit is not None:
params["limit"] = "{}".format(limit)

if offset is not None:
params["offset"] = "{}".format(offset)

if event_type is not None:
params["type"] = event_type

return self.json_api_call(
"GET",
"/admin/v1/trust_monitor/events",
params,
)
40 changes: 40 additions & 0 deletions duo_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,46 @@ def json_paging_api_call(self, method, path, params):
for obj in objects:
yield obj

def json_cursor_api_call(self, method, path, params, get_records_func):
"""
Call a Duo API endpoint which utilizes a cursor in some responses to
page through a set of data. This cursor is supplied through the optional
"offset" parameter. The cursor for the next set of data is in the
response metadata as "next_offset". Callers must also include a
function parameter to extract the iterable of records to yield. This is
slightly different than json_paging_api_call because the first request
does not contain the offset parameter.
:param method: The method to make the request w/ as a string. Ex:
"GET", "POST", "PUT" etc.
:param path: The path to make the request with as a string.
:param params: The dict of parameters to send in the request.
:param get_records_func: Function that can be called to extract an
iterable of records from the parsed response
json.
:returns: Generator which will yield records from the api response(s).
"""

next_offset = None

if 'limit' not in params and self.paging_limit:
params['limit'] = str(self.paging_limit)

while True:
if next_offset is not None:
params['offset'] = str(next_offset)
(http_resp, http_resp_data) = self.api_call(method, path, params)
(response, metadata) = self.parse_json_response_and_metadata(
http_resp,
http_resp_data,
)
for record in get_records_func(response):
yield record
next_offset = metadata.get('next_offset', None)
if next_offset is None:
break

def parse_json_response(self, response, data):
"""
Return the parsed data structure or raise RuntimeError.
Expand Down
11 changes: 11 additions & 0 deletions tests/admin/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ def setUp(self):
self.client_authlog._connect = \
lambda: util.MockHTTPConnection(data_response_from_get_authlog=True)

# client to simulate basic structure of a call to fetch Duo Trust
# Monitor events.
self.client_dtm = duo_client.admin.Admin(
'test_ikey',
'test_akey',
'example.com',
)
self.client_dtm.account_id = 'DA012345678901234567'
self.client_dtm._connect = \
lambda: util.MockHTTPConnection(data_response_from_get_dtm_events=True)


if __name__ == '__main__':
unittest.main()
50 changes: 50 additions & 0 deletions tests/admin/test_trust_monitor_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from .. import util
import duo_client.admin
from .base import TestAdmin


MINTIME = 1603399970000
MAXTIME = 1603399973797
LIMIT = 10
NEXT_OFFSET = "99999"
EVENT_TYPE = "auth"


class TestTrustMonitorEvents(TestAdmin):

def test_get_trust_monitor_events_iterator(self):
"""
Test to ensure that the correct parameters are supplied when calling
next on the generator.
"""

generator = self.client_dtm.get_trust_monitor_events_iterator(
MINTIME,
MAXTIME,
event_type=EVENT_TYPE,
)
events = [e for e in generator]
self.assertEqual(events, [{"foo": "bar"},{"bar": "foo"}])

def test_get_trust_monitor_events_by_offset(self):
"""
Test to ensure that the correct parameters are supplied.
"""

res = self.client_list.get_trust_monitor_events_by_offset(
MINTIME,
MAXTIME,
limit=LIMIT,
offset=NEXT_OFFSET,
event_type=EVENT_TYPE,
)[0]
uri, qry_str = res["uri"].split("?")
args = util.params_to_dict(qry_str)

self.assertEqual(res["method"], "GET")
self.assertEqual(uri, "/admin/v1/trust_monitor/events")
self.assertEqual(args["mintime"], ["1603399970000"])
self.assertEqual(args["maxtime"], ["1603399973797"])
self.assertEqual(args["offset"], ["99999"])
self.assertEqual(args["limit"], ["10"])
self.assertEqual(args["type"], ["auth"])
11 changes: 10 additions & 1 deletion tests/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,17 @@ class MockHTTPConnection(object):
"""
status = 200 # success!

def __init__(self, data_response_should_be_list=False, data_response_from_get_authlog=False):
def __init__(
self,
data_response_should_be_list=False,
data_response_from_get_authlog=False,
data_response_from_get_dtm_events=False,
):
# if a response object should be a list rather than
# a dict, then set this flag to true
self.data_response_should_be_list = data_response_should_be_list
self.data_response_from_get_authlog = data_response_from_get_authlog
self.data_response_from_get_dtm_events = data_response_from_get_dtm_events

def dummy(self):
return self
Expand All @@ -44,6 +50,9 @@ def read(self):
if self.data_response_from_get_authlog:
response['authlogs'] = []

if self.data_response_from_get_dtm_events:
response['events'] = [{"foo": "bar"}, {"bar": "foo"}]

return json.dumps({"stat":"OK", "response":response},
cls=MockObjectJsonEncoder)

Expand Down

0 comments on commit ce5accf

Please sign in to comment.