This repository has been archived by the owner on Jul 23, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathutilMultiState.py
236 lines (201 loc) · 9.28 KB
/
utilMultiState.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
#!/usr/bin/python3
from __future__ import annotations
from typing import Dict
from functools import reduce
from os.path import exists, dirname, basename
from os import mkdir
from re import compile as reg_compile
from itertools import chain
try:
from matplotlib import pyplot as plt
from matplotlib.ticker import MultipleLocator, PercentFormatter
except ImportError as e:
print('[!]Module Unavailable : {}'.format(str(e)))
exit(1)
def plotAllCompaniesByStateUsingStatus(dataSet: Dict[str, int], status: str, targetPath: str) -> bool:
'''
calculates total # of companies we're considering
for a certain state, so that calculation of percentage
becomes feasible
'''
def __calculateTotalNumberOfCompaniesinState__(data: Dict) -> int:
return sum([data[i] for i in data])
'''
calculates percentage of companies in a certain state
for a specified `status`
'''
def __calculatePercentageOfCompaniesOfSimilarStatusInState__(data: Dict) -> float:
return data.get(status, 0) * 100 / __calculateTotalNumberOfCompaniesinState__(data)
try:
if(not exists(dirname(targetPath))):
mkdir(dirname(targetPath))
# extracting percentage of companies for a certain type of `status`, present in a State
extractedData = dict(reduce(lambda acc, cur: [(cur, __calculatePercentageOfCompaniesOfSimilarStatusInState__(dataSet[cur]))
] + acc, dataSet, []))
# checks whether this dataset is having any useful data for all states or not
# if no, then we'll simply be raising an exception
# otherwise it'll draw BAR chart
if(all(map(
lambda v: False if extractedData[v] > 0 else True, extractedData))):
raise Exception('Empty Dataset')
y = sorted(extractedData, key=lambda v: extractedData[v], reverse=True)
y_pos = range(len(y))
x = [extractedData[i] for i in y]
with plt.style.context('ggplot'):
font = {
'family': 'serif',
'color': '#264040',
'weight': 'normal',
'size': 12
}
# creates a figure of size 2400x1200
plt.figure(figsize=(24, 12), dpi=100)
plt.xlim((0, 100))
plt.gca().xaxis.set_major_locator(MultipleLocator(10))
plt.gca().xaxis.set_major_formatter(PercentFormatter())
plt.gca().xaxis.set_minor_locator(MultipleLocator(1))
plt.barh(y_pos, x, align='center', color='cornflowerblue', lw=1.6)
plt.gca().yaxis.set_ticks(y_pos)
plt.gca().yaxis.set_ticklabels(y)
plt.xlabel('`{}` Companies'.format(
status), fontdict=font, labelpad=12)
plt.title('`{}` Companies in Different States of India'.format(
status), fontdict=font, pad=12)
plt.tight_layout()
plt.savefig(targetPath, bbox_inches='tight', pad_inches=.5)
plt.close()
return True
except Exception:
return False
'''
Plots a PIE chart, showing usage percentage for Top 10 email
service providers in India ( only company data I collected
from data.gov.in was used )
So this shows popularity of various email service providers
among Indian Companies
'''
def plotTopEmailProvidersShare(dataSet: Dict[str, int], total: int, title: str, targetPath: str) -> bool:
try:
wedgeSizes = [dataSet[i] for i in dataSet]
labels = ['{} ( {:.2f} % )'.format(i.capitalize(), dataSet[i]*100 / total)
for i in dataSet]
font = {
'family': 'serif',
'color': '#264040',
'weight': 'normal',
'size': 12
}
plt.figure(figsize=(24, 12), dpi=100)
patches, _ = plt.pie(wedgeSizes)
plt.legend(patches, labels, loc='best', fontsize='medium')
plt.title(title, fontdict=font)
plt.axis('equal')
plt.tight_layout()
plt.savefig(targetPath, bbox_inches='tight',
pad_inches=.5)
plt.close()
return True
except Exception:
return False
'''
Merges two count holder dictionaries (one holding everything calculated upto this point )
and another one holding record for a certain state ( which we just processed )
will return merged one, which is to be used as next accumulated dictionary,
holding everything upto this point
Can be used for simply merging two Dict[str, int]
'''
def __mergeTwoDicts__(first: Dict[str, int], second: Dict[str, int]) -> Dict[str, int]:
return reduce(lambda acc, cur: dict(
[(cur, second[cur])] + [(k, v) for k, v in acc.items()]) if cur not in acc else dict([(k, v + second[cur]) if k == cur else (k, v) for k, v in acc.items()]),
second, first)
'''
expected to take a chain of generator(s),
each of them generating a stream of model.corporateStat.Company object(s),
located in a certain State of India.
So this chain will finally generate a list of all companies
registered in India ( as of 21/04/2018 ), when iterated over
And finally giving us a Dict[str, int], holding a distribution
of email provider(s) & their corresponding count
'''
def extractAllCompanyEmailProvider(dataStream: map) -> (Dict[str, int], int):
# Extracts email service provider's name using regular expression
def __getEmailProvider__(email: str) -> str:
matchObj = reg.search(email)
return matchObj.group().lower() if(matchObj) else None
# Increments usage count email service provider & returns updated Dictionary
def __updateCounter__(holder: Dict[str, int], email: str) -> Dict[str, int]:
'''
return holder if not email else dict([(email, 1)] + [(k, v) for k, v in holder.items()]) if email not in holder else dict(
[(k, v + 1) if k == email else (k, v) for k, v in holder.items()])
'''
if(email):
holder.update({email: holder.get(email, 0) + 1})
return holder
# Keeps only top 5 elements ( having highest usage count ) in dictionary
def __cleanupCounter__(holder: Dict[str, int], count: int, findTotal: bool = True) -> Dict[str, int]:
nonlocal total
total += sum(holder.values()) if findTotal else 0
return dict(map(lambda v: (v, holder[v]), sorted(
holder, key=lambda v: holder[v], reverse=True)[:count]))
try:
total = 0
reg = reg_compile(r'(?<=@)[^.]+(?=\.)')
# processes each state of India at a time & extracts top 5
# email service providers, finally we calculate top 5
# email service providers used by companies spread across different states of India
return __cleanupCounter__(reduce(lambda acc, cur:
__mergeTwoDicts__(acc, __cleanupCounter__(
reduce(lambda acc, cur: __updateCounter__(
acc, __getEmailProvider__(cur.email)), cur, {}), 10)), dataStream, {}), 10, findTotal=False), total
except Exception:
return None
'''
Plots `How many companies are registered under which RoC` data,
in a PIE chart, which is exported into a target file
'''
def plotAllRoCStatistics(data: Dict[str, int], targetPath: str) -> bool:
try:
# removing those records which didn't have any useful information regarding their RoC
data.pop('NA', None)
y = sorted(data, key=lambda e: data[e], reverse=True)
y_pos = range(len(y))
x = [data[i] for i in y]
with plt.style.context('ggplot'):
font = {
'family': 'serif',
'style': 'normal',
'color': '#264040',
'weight': 'regular',
'size': 16
}
plt.figure(figsize=(24, 12), dpi=100)
plt.barh(y_pos, x, align='center', color='cornflowerblue',
lw=1.6, joinstyle='miter')
plt.gca().yaxis.set_ticks(y_pos)
plt.gca().yaxis.set_ticklabels(y)
plt.xlabel('# of Companies Registered', fontdict=font, labelpad=16)
plt.title('Company Registration under RoC(s)',
fontdict=font, pad=12)
plt.tight_layout()
plt.savefig(targetPath, bbox_inches='tight',
pad_inches=.5, dpi=100, format=basename(targetPath).split('.')[-1])
plt.close()
return True
except Exception:
return False
'''
Extracts how many companies are registered under which RoC ( Registrar of Companies ),
all over India
'''
def extractRoCStatForAllCompanies(dataStream: map) -> Dict[str, int]:
try:
return reduce(lambda acc, cur:
__mergeTwoDicts__(acc, reduce(
lambda accInner, curInner:
dict([(curInner.registrarOfCompanies, 1)] + [(k, v) for k, v in accInner.items()]) if curInner.registrarOfCompanies not in accInner else dict([(k, v+1) if k == curInner.registrarOfCompanies else (k, v) for k, v in accInner.items()]), cur, {})),
dataStream, {})
except Exception:
return None
if __name__ == '__main__':
print('[!]This module is expected to be used as a backend handler')
exit(0)