Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to support GraphQL Search API #74

Merged
merged 8 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,48 @@
# custom
li2u-output
.vscode
geckodriver.log

# MacOS
.DS_Store

# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
*.manifest
*.spec
pip-log.txt
pip-delete-this-directory.txt
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
164 changes: 102 additions & 62 deletions linkedin2username.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,57 @@

"""

# The dictionary below is a best-effort attempt to spread a search load
# across sets of geographic locations. This can bypass the 1000 result
# search limit as we are now allowed 1000 per geo set.
# developer.linkedin.com/docs/v1/companies/targeting-company-shares#additionalcodes
# The dictionary below contains geo region codes. Because we are limited to 1000 results per search,
# we can use this to batch searches across regions and get more results.
# I found this in some random JS, so who knows if it will change.
# https://static.licdn.com/aero-v1/sc/h/6pw526ylxpzsa7nu7ht18bo8y
GEO_REGIONS = {
'r0': 'us:0',
'r1': 'ca:0',
'r2': 'gb:0',
'r3': 'au:0|nz:0',
'r4': 'cn:0|hk:0',
'r5': 'jp:0|kr:0|my:0|np:0|ph:0|sg:0|lk:0|tw:0|th:0|vn:0',
'r6': 'in:0',
'r7': 'at:0|be:0|bg:0|hr:0|cz:0|dk:0|fi:0',
'r8': 'fr:0|de:0',
'r9': 'gr:0|hu:0|ie:0|it:0|lt:0|nl:0|no:0|pl:0|pt:0',
'r10': 'ro:0|ru:0|rs:0|sk:0|es:0|se:0|ch:0|tr:0|ua:0',
'r11': ('ar:0|bo:0|br:0|cl:0|co:0|cr:0|do:0|ec:0|gt:0|mx:0|pa:0|pe:0'
'|pr:0|tt:0|uy:0|ve:0'),
'r12': 'af:0|bh:0|il:0|jo:0|kw:0|pk:0|qa:0|sa:0|ae:0'}
"ar": "100446943",
"at": "103883259",
"au": "101452733",
"be": "100565514",
"bg": "105333783",
"ca": "101174742",
"ch": "106693272",
"cl": "104621616",
"de": "101282230",
"dk": "104514075",
"es": "105646813",
"fi": "100456013",
"fo": "104630756",
"fr": "105015875",
"gb": "101165590",
"gf": "105001561",
"gp": "104232339",
"gr": "104677530",
"gu": "107006862",
"hr": "104688944",
"hu": "100288700",
"is": "105238872",
"it": "103350119",
"li": "100878084",
"lu": "104042105",
"mq": "103091690",
"nl": "102890719",
"no": "103819153",
"nz": "105490917",
"pe": "102927786",
"pl": "105072130",
"pr": "105245958",
"pt": "100364837",
"py": "104065273",
"re": "104265812",
"rs": "101855366",
"ru": "101728296",
"se": "105117694",
"sg": "102454443",
"si": "106137034",
"tw": "104187078",
"ua": "102264497",
"us": "103644278",
"uy": "100867946",
"ve": "101490751"
}


class NameMutator():
Expand Down Expand Up @@ -205,7 +237,7 @@ def parse_arguments():
)
parser.add_argument('-d', '--depth', type=int, action='store',
default=False,
help='Search depth (how many loops of 25). If unset, '
help='Search depth (how many loops of 50). If unset, '
'will try to grab them all.')
parser.add_argument('-s', '--sleep', type=int, action='store', default=0,
help='Seconds to sleep between search loops.'
Expand Down Expand Up @@ -405,9 +437,9 @@ def set_inner_loops(staff_count, args):

"""

# We will look for 25 names on each loop. So, we set a maximum amount of
# loops to the amount of staff / 25 +1 more to catch remainders.
loops = int((staff_count / 25) + 1)
# We will look for 50 names on each loop. So, we set a maximum amount of
# loops to the amount of staff / 50 +1 more to catch remainders.
loops = int((staff_count / 50) + 1)

print(f"[*] Company has {staff_count} profiles to check. Some may be anonymous.")

Expand Down Expand Up @@ -435,7 +467,7 @@ def set_inner_loops(staff_count, args):
" might not get them all.\n\n")
else:
print(f"[*] Setting each iteration to a maximum of {loops} loops of"
" 25 results each.\n\n")
" 50 results each.\n\n")
args.depth = loops

return args.depth, args.geoblast
Expand All @@ -448,25 +480,23 @@ def get_results(session, company_id, page, region, keyword):
scrolling through search results.

The mobile site defaults to using a 'count' of 10, but testing shows that
25 is allowed. This behavior will appear to the web server as someone
50 is allowed. This behavior will appear to the web server as someone
scrolling quickly through all available results.
"""
# When using the --geoblast feature, we need to inject our set of region
# codes into the search parameter.
if region:
region = re.sub(':', '%3A', region) # must URL encode this parameter

# Build the base search URL.
url = ('https://www.linkedin.com'
'/voyager/api/search/hits'
f'?facetCurrentCompany=List({company_id})'
f'&facetGeoRegion=List({region})'
f'&keywords=List({keyword})'
'&q=people&maxFacetValues=15'
'&supportedFacets=List(GEO_REGION,CURRENT_COMPANY)'
'&count=25'
'&origin=organization'
f'&start={page * 25}')
url = ('https://www.linkedin.com/voyager/api/graphql?variables=('
f'start:{page * 50},'
f'query:('
f'{f"keywords:{keyword}," if keyword else ""}'
'flagshipSearchIntent:SEARCH_SRP,'
f'queryParameters:List((key:currentCompany,value:List({company_id})),'
f'{f"(key:geoUrn,value:List({region}))," if region else ""}'
'(key:resultType,value:List(PEOPLE))'
'),'
'includeFiltersInResponse:false'
'),count:50)'
'&queryId=voyagerSearchDashClusters.66adc6056cf4138949ca5dcb31bb1749')

# Perform the search for this iteration.
result = session.get(url)
Expand All @@ -475,9 +505,9 @@ def get_results(session, company_id, page, region, keyword):

def find_employees(result):
"""
Takes the text response of an HTTP query, converst to JSON, and extracts employee details.
Takes the text response of an HTTP query, converts to JSON, and extracts employee details.

Retuns a list of dictionary items, or False if none found.
Returns a list of dictionary items, or False if none found.
"""
found_employees = []

Expand All @@ -491,33 +521,43 @@ def find_employees(result):
print(result[:200])
return False

# When you get to the last page of results, the next page will have an empty
# "elements" list.
if not result_json['elements']:
# Walk the data, being careful to avoid key errors
data = result_json.get('data', {})
search_clusters = data.get('searchDashClustersByAll', {})
elements = paging = search_clusters.get('elements', [])
paging = search_clusters.get('paging', {})
total = paging.get('total', 0)

# If we've ended up with empty dicts or zero results left, bail out
if total == 0:
return False

# The "elements" list is the mini-profile you see when scrolling through a
# company's employees. It does not have all info on the person, like their
# entire job history. It only has some basics.
found_employees = []
for body in result_json.get('elements', []):
profile = (
body.get('hitInfo', {})
.get('com.linkedin.voyager.search.SearchProfile', {})
.get('miniProfile', {})
)
first_name = profile.get('firstName', '').strip()
last_name = profile.get('lastName', '').strip()

# Dont include profiles that have only a single name
if first_name and last_name:
full_name = f"{first_name} {last_name}"
occupation = profile.get('occupation', "")
found_employees.append({'full_name': full_name, 'occupation': occupation})
for element in elements:
# For some reason it's nested
for item_body in element.get('items', []):
# Info we want is all under 'entityResult'
entity = item_body['item']['entityResult']

return found_employees
# There's some useless entries we need to skip over
if not entity:
continue

# There is no first/last name fields anymore so we're taking the full name
full_name = entity['title']['text'].strip()

# The name may include extras like "Dr" at the start, so we do some basic stripping
if full_name[:3] == 'Dr ':
full_name = full_name[4:]

occupation = entity['primarySubtitle']['text']

found_employees.append({'full_name': full_name, 'occupation': occupation})

return found_employees


def do_loops(session, company_id, outer_loops, args):
Expand All @@ -544,10 +584,10 @@ def do_loops(session, company_id, outer_loops, args):
try:
for current_loop in outer_loops:
if args.geoblast:
region_name = 'r' + str(current_loop)
current_region = GEO_REGIONS[region_name]
region_name, region_id = list(GEO_REGIONS.items())[current_loop]
current_region = region_id
current_keyword = ''
print(f"\n[*] Looping through region {current_region}")
print(f"\n[*] Looping through region {region_name}")
elif args.keywords:
current_keyword = args.keywords[current_loop]
current_region = ''
Expand All @@ -556,7 +596,7 @@ def do_loops(session, company_id, outer_loops, args):
current_region = ''
current_keyword = ''

# This is the inner loop. It will search results 25 at a time.
# This is the inner loop. It will search results 50 at a time.
for page in range(0, args.depth):
new_names = 0

Expand Down
Loading