This repository has been archived by the owner on Jul 23, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathutil.py
248 lines (197 loc) · 11.1 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
#!/usr/bin/python3
from __future__ import annotations
from typing import Dict, List
from functools import reduce
from itertools import chain
from collections import Counter
from os.path import dirname, exists
from os import mkdir
from time import localtime, time
from re import compile as reg_compile, Pattern
try:
from model.companiesUnderState import CountOfCompaniesUnderState, CountOfCompaniesUnderDistrict
from model.post import PostOfficeGraph, PostOffice
from matplotlib import pyplot as plt
from matplotlib.ticker import MultipleLocator, FormatStrFormatter
except ImportError as e:
print('[!]Module Unavailable : {}'.format(str(e)))
exit(1)
'''
Takes a Dict[str, int], which is generated by function(s) defined below
( categorizes company dataset, for a certain State in India, using various parameters )
& a targetPath on local file system ( an image of `*.png` form ),
where to store this generated PIE chart.
'''
def plotCategorizedCompanyDataForACertainState(dataSet: Dict[str, int], targetPath: str, title: str) -> bool:
try:
if(not exists(dirname(targetPath))):
# creating target directory if not existing already
mkdir(dirname(targetPath))
font = {
'family': 'serif',
'color': '#264040',
'weight': 'normal',
'size': 12
}
# calculating total # of companies we're considering here, for a certain state
total = sum([dataSet[i] for i in dataSet])
_tmpLabels = sorted(dataSet, key=lambda e:
dataSet[e], reverse=True)
# now PIE chart's labels will include a percentage field too, in its legend
labels = ['{} ( {:.4f} % )'.format(
i, dataSet[i]*100/total) for i in _tmpLabels]
# this is the actual data to be plotted
data = [dataSet[i] for i in _tmpLabels]
# figure on which pie chart to be drawn ( of size 2400x1200 )
plt.figure(figsize=(24, 12), dpi=100)
patches, _ = plt.pie(data) # plotting pie chart
plt.legend(patches, labels, loc='best', fontsize='medium')
plt.title(title, fontdict=font)
plt.axis('equal')
plt.tight_layout()
plt.savefig(targetPath, bbox_inches='tight',
pad_inches=.5) # exporting plotted PIE chart
plt.close() # closing this figure on which we just plotted a PIE chart
return True
except Exception:
return False
'''
Takes a list of all companies present in one State ( instances of model.corporateStat.Company )
as argument & returns a Dict[str, int] holding count of all companies of a
certain state, categorzied as per their STATUS, which is to be used for plotting a PIE chart.
'''
def categorizeAsPerCompanyStatus(dataSet) -> Dict[str, int]:
return reduce(lambda acc, cur: dict([(cur.status, 1)] + [(k, v) for k, v in acc.items()]) if cur.status.strip().lower() != 'na' and cur.status not in acc else dict(((k, v + 1) if k == cur.status else (k, v) for k, v in acc.items())), dataSet, {})
'''
Takes a list of all companies present in one State ( instances of model.corporateStat.Company )
as argument & returns a Dict[str, int] holding count of all companies of a
certain state, categorzied as per their CLASS, which is to be used for plotting a PIE chart.
'''
def categorizeAsPerCompanyClass(dataSet) -> Dict[str, int]:
return reduce(lambda acc, cur: dict([(cur.companyClass, 1)] + [(k, v) for k, v in acc.items()]) if cur.companyClass.strip().lower() != 'na' and cur.companyClass not in acc else dict(((k, v + 1) if k == cur.companyClass else (k, v) for k, v in acc.items())), dataSet, {})
'''
Takes a list of all companies present in one State ( instances of model.corporateStat.Company )
as argument & returns a Dict[str, int] holding count of all companies of a
certain state, categorzied as per their CATEGORY, which is to be used for plotting a PIE chart.
'''
def categorizeAsPerCompanyCategory(dataSet) -> Dict[str, int]:
return reduce(lambda acc, cur: dict([(cur.category, 1)] + [(k, v) for k, v in acc.items()]) if cur.category.strip().lower() != 'na' and cur.category not in acc else dict(((k, v + 1) if k == cur.category else (k, v) for k, v in acc.items())), dataSet, {})
'''
Takes a list of all companies present in one State ( instances of model.corporateStat.Company )
as argument & returns a Dict[str, int] holding count of all companies of a
certain state, categorzied as per their SUB_CATEGORY, which is to be used for plotting a PIE chart.
'''
def categorizeAsPerCompanySubCategory(dataSet) -> Dict[str, int]:
return reduce(lambda acc, cur: dict([(cur.subCategory, 1)] + [(k, v) for k, v in acc.items()]) if cur.subCategory.strip().lower() != 'na' and cur.subCategory not in acc else dict(((k, v + 1) if k == cur.subCategory else (k, v) for k, v in acc.items())), dataSet, {})
'''
Takes a list of all companies present in one State ( instances of model.corporateStat.Company )
as argument & returns a Dict[str, int] holding count of all companies of a
certain state, categorzied as per their PRINCIPAL_BUSINESS_ACTIVITY, which is to be used for plotting a PIE chart.
'''
def categorizeAsPerCompanyPrincipalBusinessActivity(dataSet) -> Dict[str, int]:
return reduce(lambda acc, cur: dict([(cur.principalBusinessActivity, 1)] + [(k, v) for k, v in acc.items()]) if cur.principalBusinessActivity.strip().lower() != 'na' and cur.principalBusinessActivity not in acc else dict(((k, v + 1) if k == cur.principalBusinessActivity else (k, v) for k, v in acc.items())), dataSet, {})
'''
Plots a graph of year of registration vs. #-of companies registered
in that certain year, while using dataset obtained from function defined just below it.
'''
def plotCompanyRegistrationDateWiseCategorizedData(dataSet: Dict[int, int], targetPath: str, title: str) -> bool:
try:
if(not exists(dirname(targetPath))):
# creating target directory if not existing already
mkdir(dirname(targetPath))
# style `ggplot` is in use
with plt.style.context('ggplot'):
font = {
'family': 'serif',
'color': '#264040',
'weight': 'normal',
'size': 12
}
# a range from `first when a company was registered` to `nearest year upto which we have any status`
# filtering out improper years ( may be higher than current year ), lets us clean dataset, so that things go smooth
x = range(min(dataSet),
max(filter(lambda v: v < (
localtime(time()).tm_year + 1), dataSet)) + 1)
y = [dataSet.get(i, 0) for i in x]
plt.figure(figsize=(24, 12), dpi=100)
# creating major x-tick locator every 10 years
plt.gca().xaxis.set_major_locator(MultipleLocator(10))
# creating x-tick formatter using only year name
plt.gca().xaxis.set_major_formatter(FormatStrFormatter('%d'))
# setting minor x-tick locator every 1 year
plt.gca().xaxis.set_minor_locator(MultipleLocator(1))
plt.plot(x, y, 'r-', lw=1.5)
plt.xlabel('Year', fontdict=font, labelpad=16)
plt.ylabel('# of Companies Registered', fontdict=font, labelpad=16)
plt.title(title, fontdict=font)
plt.tight_layout()
plt.savefig(targetPath, bbox_inches='tight', pad_inches=.5)
plt.close()
return True
except Exception:
return False
'''
Filters out those companies which has `dateOfRegistration` field None
& classifies remaining ones using year of registration
So finally we get a Dict[int, int], holding a mapping between
year of registration & #-of companies registered in that year,
which is going to be used by above function for plotting a graph.
This function is used in both case of processing individual states
& companies from all states across India
( actually we just chain them before invoking this function )
'''
def categorizeAsPerCompanyDateOfRegistration(dataSet) -> Dict[int, int]:
return reduce(lambda acc, cur: dict([(cur.dateOfRegistration.year, 1)] + [(k, v) for k, v in acc.items()]) if cur.dateOfRegistration.year not in acc else dict(((k, v + 1) if k == cur.dateOfRegistration.year else (k, v) for k, v in acc.items())),
filter(lambda v: v.dateOfRegistration is not None, dataSet), {})
'''
Extracts 6 digit Pincode from registered office address
of a company & returns so.
In case of failure, returns None
'''
def __extractPinCodeFromAddress__(reg: Pattern, address: str) -> str:
matchObj = reg.search(address)
return matchObj.group() if matchObj else None
'''
Takes an iterable of model.corporateStat.Company & classifies their
count using `Pincode of their Registered Address` ( extracted from Address field )
Finally a Dict[str, int], holding count of companies registered in different PinCode(s)
is returned
'''
def classifyCompaniesUsingPinCodeOfRegisteredAddress(dataStream: chain) -> Counter:
reg = reg_compile(r'(\d{6})') # pincode extraction regular expression
return Counter(map(lambda e: __extractPinCodeFromAddress__(
reg, e.registeredOfficeAddress), dataStream))
'''
Converts a `Companies registered under a PinCode record` to
`Companies registered under each District of a certain State
( or may be for whole country ) based record`
'''
def pincodeToDistrictNameMapper(pincodes: Dict[str, int], poGraph: PostOfficeGraph) -> List[CountOfCompaniesUnderState]:
def __updateCounter__(holder: List[CountOfCompaniesUnderState], key: str) -> List[CountOfCompaniesUnderState]:
postOffice: PostOffice = poGraph.findPostOfficeUsingPin(key)
if postOffice:
found: CountOfCompaniesUnderState = reduce(lambda acc, cur: cur if cur.name ==
postOffice.stateName else acc, holder, None)
if found:
found.updateCountForDistrict(
postOffice.districtName, pincodes.get(key, 0))
else:
holder.append(CountOfCompaniesUnderState(postOffice.stateName, [
CountOfCompaniesUnderDistrict(postOffice.districtName, pincodes.get(key, 0))]))
'''
holder.update(
{
postOffice.stateName: holder.get(postOffice.stateName, {}).update(
{
postOffice.districtName: holder.get(postOffice.stateName, {}).get(
postOffice.districtName, 0) + pincodes.get(key, 0)
}
) # updating each district under each state, holding count of companies registered in that district under that certain state
}
) # updating parent dictionary, holding a dictionary for each state
'''
return holder
return reduce(lambda acc, cur: __updateCounter__(acc, cur), pincodes, [])
if __name__ == '__main__':
print('[!]This module is expected to be used as a backend handler')
exit(0)