Skip to content

Commit

Permalink
unzip_files_in_dir
Browse files Browse the repository at this point in the history
  • Loading branch information
arcangelo7 committed Oct 3, 2022
1 parent 9e4c917 commit c454895
Show file tree
Hide file tree
Showing 19 changed files with 222 additions and 193 deletions.
11 changes: 4 additions & 7 deletions examples_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
# OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.


from math import ceil
from pprint import pprint
from oc_meta.lib.file_manager import get_data, write_csv
from oc_meta.lib.file_manager import get_csv_data, write_csv
from oc_meta.plugins.crossref.crossref_processing import CrossrefProcessing
from pebble import ThreadPool, ProcessFuture
from psutil import Process
Expand All @@ -27,7 +25,6 @@
import os
import requests
import requests_cache
import sys


CITING_DOI = '10.1007/978-3-540-88851-2'
Expand Down Expand Up @@ -84,13 +81,13 @@ def get_citations_and_metadata(crossref_processing:CrossrefProcessing, doi:str)
citation['cited_publication_date'] = next((el['pub_date'] for el in metadata_csv if f"doi:{citation['cited_id']}" in el['id'].split()), '')
return citations_csv, metadata_csv

def get_data_by_requirements(
def get_csv_data_by_requirements(
crossref_processing:CrossrefProcessing,
file_to_be_processed:str,
citations_number_range:Tuple[int, int]=(5,10),
citing_entity_type:str='journal article',
required_types = {'journal article'}) -> Tuple[List[dict], List[dict]]:
data = get_data(file_to_be_processed)
data = get_csv_data(file_to_be_processed)
for row in data:
if row['type'] == citing_entity_type or not citing_entity_type:
ids = row['id'].split()
Expand Down Expand Up @@ -130,7 +127,7 @@ def initializer():
with ThreadPool(max_workers=max_workers, initializer=initializer) as executor:
for file_to_be_processed in files_to_be_processed:
future:ProcessFuture = executor.schedule(
function=get_data_by_requirements,
function=get_csv_data_by_requirements,
args=(crossref_processing, os.path.join(CROSSREF_DUMP_DIR, file_to_be_processed), (9,None), 'journal article', {'journal article', 'other'}))
future.add_done_callback(dump_citations_and_metadata_csvs)

Expand Down
70 changes: 0 additions & 70 deletions oc_meta/lib/archive_manager.py

This file was deleted.

99 changes: 95 additions & 4 deletions oc_meta/lib/file_manager.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,32 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2022 Arcangelo Massari <[email protected]>
#
# Permission to use, copy, modify, and/or distribute this software for any purpose
# with or without fee is hereby granted, provided that the above copyright notice
# and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
# SOFTWARE.


from contextlib import contextmanager
from oc_meta.lib.cleaner import Cleaner
from pathlib import Path
from typing import List, Dict, Set
from zipfile import ZipFile, ZIP_DEFLATED
import csv
import json
import os
import sys
import zipfile


def get_data(filepath:str) -> List[Dict[str, str]]:
def get_csv_data(filepath:str) -> List[Dict[str, str]]:
field_size_changed = False
cur_field_size = 128
data = list()
Expand Down Expand Up @@ -76,7 +95,79 @@ def zipdir(path, ziph):
os.path.join(path, '..')))

def zipit(dir_list:list, zip_name:str) -> None:
zipf = zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED)
zipf = ZipFile(file=zip_name, mode='w', compression=ZIP_DEFLATED, allowZip64=True)
for dir in dir_list:
zipdir(dir, zipf)
zipf.close()
zipf.close()

def zip_json_files_in_dir(src_dir:str, dst_dir:str, replace_files:bool=False) -> None:
'''
This method zips json files individually in all directories starting from a specified root directory.
In other words, this function does not zip the entire folder but individual files
while maintaining the folder hierarchy in the specified output directory.
:params src_dir: the source directory
:type src_dir: str
:params dst_dir: the destination directory
:type dst_dir: str
:params replace_files: True if you want to replace the original unzipped files with their zipped versions. The dafult value is False
:type replace_files: bool
:returns: None
'''
for dirpath, _, filenames in os.walk(src_dir):
for filename in filenames:
src_path = os.path.join(dirpath, filename)
dst_path = os.path.join(
dst_dir,
str(Path(src_path).parent)
.replace(f'{src_dir}{os.sep}', ''))
if not os.path.exists(dst_path):
os.makedirs(dst_path)
with ZipFile(file=os.path.join(dst_path, filename) + '.zip', mode='w', compression=ZIP_DEFLATED, allowZip64=True) as zipf:
zipf.write(src_path, arcname=filename)
if replace_files:
os.remove(src_path)

def unzip_files_in_dir(src_dir:str, dst_dir:str, replace_files:bool=False) -> None:
'''
This method unzips zipped files individually in all directories starting from a specified root directory.
In other words, this function does not unzip the entire folder but individual files
while maintaining the folder hierarchy in the specified output directory.
:params src_dir: the source directory
:type src_dir: str
:params dst_dir: the destination directory
:type dst_dir: str
:params replace_files: True if you want to replace the original zipped files with their unzipped versions, defaults to [False]
:type replace_files: bool
:returns: None
'''
for dirpath, _, filenames in os.walk(src_dir):
for filename in filenames:
if os.path.splitext(filename)[1] == '.zip':
src_path = os.path.join(dirpath, filename)
dst_path = os.path.join(
dst_dir,
str(Path(src_path).parent)
.replace(f'{src_dir}{os.sep}', ''))
if not os.path.exists(dst_path):
os.makedirs(dst_path)
with ZipFile(file=os.path.join(dst_path, filename), mode='r') as zipf:
zipf.extractall(dst_path)
if replace_files:
os.remove(src_path)

def read_zipped_json(filepath:str) -> dict:
'''
This method reads a zipped json file.
:params filepath: the zipped json file path
:type src_dir: str
:returns: dict -- It returns the json file as a dictionary
'''
with ZipFile(filepath, 'r') as zipf:
for filename in zipf.namelist():
with zipf.open(filename) as f:
json_data = f.read()
json_dict = json.loads(json_data.decode("utf-8"))
return json_dict
1 change: 0 additions & 1 deletion oc_meta/lib/finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from time_agnostic_library.agnostic_entity import AgnosticEntity



class ResourceFinder:

def __init__(self, ts_url, base_iri:str):
Expand Down
18 changes: 9 additions & 9 deletions oc_meta/plugins/multiprocess/prepare_multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
# SOFTWARE.


from oc_meta.lib.file_manager import pathoo, get_data, write_csv, sort_files
from oc_meta.lib.master_of_regex import comma_and_spaces, ids_inside_square_brackets, name_and_ids, semicolon_in_people_field
from oc_meta.lib.file_manager import pathoo, get_csv_data, write_csv, sort_files
from oc_meta.lib.master_of_regex import comma_and_spaces, name_and_ids, semicolon_in_people_field
from oc_meta.core.creator import Creator
from typing import Dict, List
from tqdm import tqdm
Expand Down Expand Up @@ -57,7 +57,7 @@ def prepare_relevant_items(csv_dir:str, output_dir:str, items_per_file:int, verb
duplicated_resp_agents = dict()
# Look for all venues, responsible agents, and publishers
for file in files:
data = get_data(file)
data = get_csv_data(file)
_get_duplicated_ids(data=data, ids_found=ids_found, items_by_id=duplicated_ids)
_get_relevant_venues(data=data, ids_found=venues_found, items_by_id=venues_by_id, duplicated_items=duplicated_venues)
_get_resp_agents(data=data, ids_found=resp_agents_found, items_by_id=resp_agents_by_id, duplicated_items=duplicated_resp_agents)
Expand All @@ -67,7 +67,7 @@ def prepare_relevant_items(csv_dir:str, output_dir:str, items_per_file:int, verb
print('[INFO:prepare_multiprocess] Enriching the duplicated bibliographic resources found')
pbar = tqdm(total=len(files))
for file in files:
data = get_data(file)
data = get_csv_data(file)
_enrich_duplicated_ids_found(data, duplicated_ids)
pbar.update() if verbose else None
pbar.close() if verbose else None
Expand Down Expand Up @@ -378,7 +378,7 @@ def __index_all_venues(files:list, verbose:bool) -> dict:
pbar = tqdm(total=len(files))
venues_occurrences = dict()
for file in files:
data = get_data(file)
data = get_csv_data(file)
for row in data:
venues = list()
if row['type'] in VENUES:
Expand Down Expand Up @@ -406,7 +406,7 @@ def __split_csvs_by_venues(files:list, venues_occurrences:dict, output_dir:str,
no_venues_outdata = list()
counter = 0
for file in files:
data = get_data(file)
data = get_csv_data(file)
for row in data:
venues = list()
if row['type'] in VENUES:
Expand Down Expand Up @@ -443,7 +443,7 @@ def __split_csvs_by_venues(files:list, venues_occurrences:dict, output_dir:str,
chunk_no_venues[no_venues_filepath] = no_venues_outdata
for chunk in [chunk_venues, chunk_no_venues]:
for filepath, dump in chunk.items():
all_data = get_data(filepath) if os.path.exists(filepath) else list()
all_data = get_csv_data(filepath) if os.path.exists(filepath) else list()
all_data.extend(dump)
write_csv(filepath, all_data)
del chunk
Expand All @@ -458,7 +458,7 @@ def __split_in_chunks(output_dir:str, chunk_size:int, verbose:bool):
counter = 0
for file in files:
filepath = os.path.join(output_dir, file)
data = get_data(filepath)
data = get_csv_data(filepath)
len_data = len(data)
if len_data > chunk_size:
while len_data > chunk_size:
Expand Down Expand Up @@ -487,7 +487,7 @@ def __dump_if_chunk_size(chunk:dict, existing_files:set, pid:psutil.Process) ->
memory_used = pid.memory_info().rss / (1024.0 ** 3)
if memory_used > 10:
for filepath, dump in chunk.items():
all_data = get_data(filepath) if os.path.exists(filepath) else list()
all_data = get_csv_data(filepath) if os.path.exists(filepath) else list()
all_data.extend(dump)
write_csv(filepath, all_data)
existing_files.add(filepath)
Expand Down
4 changes: 2 additions & 2 deletions oc_meta/run/meta_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from argparse import ArgumentParser
from datetime import datetime
from itertools import cycle
from oc_meta.lib.file_manager import get_data, normalize_path, pathoo, suppress_stdout, init_cache, sort_files, zipit
from oc_meta.lib.file_manager import get_csv_data, normalize_path, pathoo, suppress_stdout, init_cache, sort_files, zipit
from oc_meta.plugins.multiprocess.resp_agents_creator import RespAgentsCreator
from oc_meta.plugins.multiprocess.resp_agents_curator import RespAgentsCurator
from oc_meta.core.creator import Creator
Expand Down Expand Up @@ -95,7 +95,7 @@ def curate_and_create(self, filename:str, cache_path:str, errors_path:str, worke
return {'message': 'skip'}, cache_path, errors_path, filename
try:
filepath = os.path.join(self.input_csv_dir, filename)
data = get_data(filepath)
data = get_csv_data(filepath)
supplier_prefix = f'{self.supplier_prefix}0' if worker_number is None else f'{self.supplier_prefix}{str(worker_number)}0'
# Curator
self.info_dir = os.path.join(self.info_dir, supplier_prefix) if worker_number else self.info_dir
Expand Down
11 changes: 7 additions & 4 deletions oc_meta/run/zip_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@


from argparse import ArgumentParser
from oc_meta.lib.archive_manager import JsonArchiveManager
from oc_meta.lib.file_manager import zip_json_files_in_dir, unzip_files_in_dir


if __name__ == '__main__':
arg_parser = ArgumentParser('zip_process.py', description='Zip JSON files individually in all directories starting from a specified root directory')
arg_parser = ArgumentParser('zip_process.py', description='Zip or unzip JSON files individually in all directories starting from a specified root directory')
arg_parser.add_argument('-o', '--operation', dest='operation', required=True, choices=['zip', 'unzip'], help='Specify "zip" to zip the files, "unzip" otherwise')
arg_parser.add_argument('-s', '--source', dest='src_dir', required=True, help='The source directory')
arg_parser.add_argument('-d', '--destination', dest='dst_dir', required=True, help='The destination directory')
arg_parser.add_argument('-r', '--replace', dest='replace_files', action='store_true', default=False, required=False, help='Specify this argument if you want to replace the original unzipped files with their zipped versions')
args = arg_parser.parse_args()
json_archive_manager = JsonArchiveManager()
json_archive_manager.compress_json_files_in_dir(src_dir=args.src_dir, dst_dir=args.dst_dir, replace_files=args.replace_files)
if args.operation == 'zip':
zip_json_files_in_dir(src_dir=args.src_dir, dst_dir=args.dst_dir, replace_files=args.replace_files)
elif args.operation == 'unzip':
unzip_files_in_dir(src_dir=args.src_dir, dst_dir=args.dst_dir, replace_files=args.replace_files)
8 changes: 4 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Pebble = "^4.6.3"
ramose = "^1.0.6"
oc-idmanager = "0.1.1"
time-agnostic-library = "4.3.2"
oc-ocdm = "^7.1.1"
oc-ocdm = "7.1.2"

[tool.poetry.dev-dependencies]
wget = "^3.2"
Expand Down
Loading

0 comments on commit c454895

Please sign in to comment.