-
-
Notifications
You must be signed in to change notification settings - Fork 45
/
Copy pathredact_sensitive.py
147 lines (118 loc) · 4.24 KB
/
redact_sensitive.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""
Interactive script to redact sensitive data.
Be careful to not delete stuff you want to keep!
Issues/improvements:
- If an event matches the sensitive string, only the sensitive field will be redacted (so if the title matches but not the URL, the URL will remain unredacted)
- One might not want to redact to the non-informative 'REDACTED', but instead to a string with secret meaning.
- No preview of the events/strings to be redacted.
"""
import re
import sys
from copy import deepcopy
from typing import (
List,
Pattern,
Set,
Union,
cast,
)
from aw_client import ActivityWatchClient
from aw_core import Event
aw: ActivityWatchClient
REDACTED = "REDACTED"
DRYRUN = True
def main():
global DRYRUN
if "--wet" in sys.argv:
DRYRUN = False
global aw
aw = ActivityWatchClient(testing=True)
buckets = aw.get_buckets()
print("Buckets: ")
print("\n".join([" - " + bid for bid in buckets.keys()]) + "\n")
bid_to_redact = input(
"In which bucket are the events you want to redact? (* for all): "
)
assert bid_to_redact == "*" or bid_to_redact in buckets, "Not a valid option"
regex_or_string = input(
"Do you want to search by regex or string? (regex/string): "
)
assert regex_or_string in ["regex", "string"], "Not a valid option"
print("\nNOTE: Matching is not case sensitive!")
pattern: Union[str, Pattern]
if regex_or_string == "string":
pattern = input("Enter a string indicating sensitive content: ").lower()
else:
pattern = re.compile(
input("Enter a regex indicating sensitive content: ").lower()
)
print("")
if DRYRUN:
print(
"NOTE: Performing a dry run, no events will be modified. Run with --wet to modify events."
)
else:
print(
"WARNING: Note that this performs an operation that cannot be undone. We strongly recommend that you backup/export your data before proceeding."
)
input("Press ENTER to continue, or Ctrl-C to abort")
if bid_to_redact == "*":
for bucket_id in buckets.keys():
if bucket_id.startswith("aw-watcher-afk"):
continue
_redact_bucket(bucket_id, pattern)
else:
_redact_bucket(bid_to_redact, pattern)
def _redact_bucket(bucket_id: str, pattern: Union[str, Pattern]):
print(f"\nChecking bucket: {bucket_id}")
global aw
events = aw.get_events(bucket_id, limit=-1)
sensitive_ids = _find_sensitive(events, pattern)
print(f"Found {len(sensitive_ids)} sensitive events")
if not sensitive_ids:
return
yes_redact = input(
f"Do you want to replace all the matching strings with '{REDACTED}'? (y/N): "
)
if yes_redact == "y":
for e in events:
if e.id in sensitive_ids:
e_before = e
e = _redact_event(e, pattern)
print(f"\nData before: {e_before.data}")
print(f"Data after: {e.data}")
if DRYRUN:
print("DRYRUN, would do: aw.insert_event(bucket_id, e)")
else:
aw.delete_event(bucket_id, cast(int, e_before.id))
aw.insert_event(bucket_id, e)
print("Redacted event")
def _check_event(e: Event, pattern: Union[str, Pattern]) -> bool:
for v in e.data.values():
if isinstance(v, str):
if isinstance(pattern, str):
if pattern in v.lower():
return True
else:
if pattern.findall(v.lower()):
return True
return False
def _redact_event(e: Event, pattern: Union[str, Pattern]) -> Event:
e = deepcopy(e)
for k, v in e.data.items():
if isinstance(v, str):
if isinstance(pattern, str):
if pattern in v.lower():
e.data[k] = REDACTED
else:
if pattern.findall(v.lower()):
e.data[k] = REDACTED
return e
def _find_sensitive(el: List[Event], pattern: Union[str, Pattern]) -> Set:
sensitive_ids = set()
for e in el:
if _check_event(e, pattern):
sensitive_ids.add(e.id)
return sensitive_ids
if __name__ == "__main__":
main()