-
Notifications
You must be signed in to change notification settings - Fork 140
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #181 from gkaur793/newEndpoint
Adding new endpoint
- Loading branch information
Showing
7 changed files
with
226 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
#!/usr/bin/env python | ||
from __future__ import absolute_import | ||
from __future__ import print_function | ||
import sys | ||
import csv | ||
import duo_client | ||
from six.moves import input | ||
|
||
argv_iter = iter(sys.argv[1:]) | ||
def get_next_arg(prompt): | ||
try: | ||
return next(argv_iter) | ||
except StopIteration: | ||
return input(prompt) | ||
|
||
# Configuration and information about objects to create. | ||
admin_api = duo_client.Admin( | ||
ikey=get_next_arg('Admin API integration key ("DI..."): '), | ||
skey=get_next_arg('integration secret key: '), | ||
host=get_next_arg('API hostname ("api-....duosecurity.com"): '), | ||
) | ||
kwargs = {} | ||
#get arguments | ||
mintime = get_next_arg('Mintime: ') | ||
if mintime: | ||
kwargs['mintime'] = mintime | ||
|
||
maxtime = get_next_arg('Maxtime: ') | ||
if maxtime: | ||
kwargs['maxtime'] = maxtime | ||
|
||
limit_arg = get_next_arg('Limit (Default = 100, Max = 1000): ') | ||
if limit_arg: | ||
kwargs['limit'] = limit_arg | ||
|
||
next_offset_arg = get_next_arg('Next_offset: ') | ||
if next_offset_arg: | ||
kwargs['next_offset'] = next_offset_arg | ||
|
||
sort_arg = get_next_arg('Sort (Default - ts:desc) :') | ||
if sort_arg: | ||
kwargs['sort'] = sort_arg | ||
|
||
reporter = csv.writer(sys.stdout) | ||
|
||
logs = admin_api.get_activity_logs(**kwargs) | ||
|
||
print("==============================") | ||
print("Next offset from response : ", logs.get('metadata').get('next_offset')) | ||
|
||
reporter.writerow(('activity_id', 'ts', 'action', 'actor_name', 'target_name')) | ||
|
||
for log in logs['items']: | ||
activity = log['activity_id'], | ||
ts = log['ts'] | ||
action = log['action'] | ||
actor_name = log.get('actor', {}).get('name', None) | ||
target_name = log.get('target', {}).get('name', None) | ||
reporter.writerow([ | ||
activity, | ||
ts, | ||
action, | ||
actor_name, | ||
target_name, | ||
]) | ||
|
||
print("==============================") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,3 +3,4 @@ flake8 | |
pytz>=2022.1 | ||
mock | ||
dlint | ||
freezegun |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
from .. import util | ||
from .base import TestAdmin | ||
from datetime import datetime, timedelta | ||
from freezegun import freeze_time | ||
import pytz | ||
|
||
|
||
class TestEndpoints(TestAdmin): | ||
|
||
def test_get_activity_log(self): | ||
""" Test to get activities log. | ||
""" | ||
response = self.client_activity.get_activity_logs(maxtime='1663131599000', mintime='1662958799000') | ||
uri, args = response['uri'].split('?') | ||
|
||
self.assertEqual(response['method'], 'GET') | ||
self.assertEqual(uri, '/admin/v2/logs/activity') | ||
self.assertEqual( | ||
util.params_to_dict(args)['account_id'], | ||
[self.client_activity.account_id]) | ||
|
||
@freeze_time('2022-10-01') | ||
def test_get_activity_log_with_no_args(self): | ||
freezed_time = datetime(2022,10,1,0,0,0, tzinfo=pytz.utc) | ||
expected_mintime = str(int((freezed_time-timedelta(days=180)).timestamp()*1000)) | ||
expected_maxtime = str(int(freezed_time.timestamp() * 1000)) | ||
response = self.client_activity.get_activity_logs() | ||
uri, args = response['uri'].split('?') | ||
param_dict = util.params_to_dict(args) | ||
self.assertEqual(response['method'], 'GET') | ||
self.assertEqual(uri, '/admin/v2/logs/activity') | ||
self.assertEqual( | ||
param_dict['mintime'], | ||
[expected_mintime]) | ||
self.assertEqual( | ||
param_dict['maxtime'], | ||
[expected_maxtime]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters