Add option to directly process WARC files

This commit is contained in:
benoit74 2024-07-23 09:27:23 +00:00
parent 459a30a226
commit 8cd1db6eef
No known key found for this signature in database
GPG Key ID: B89606434FC7B530
2 changed files with 103 additions and 22 deletions

View File

@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add `--custom-behaviors` argument to support path/HTTP(S) URL custom behaviors to pass to the crawler (#313) - Add `--custom-behaviors` argument to support path/HTTP(S) URL custom behaviors to pass to the crawler (#313)
- Add daily automated end-to-end tests of a page with Youtube player (#330) - Add daily automated end-to-end tests of a page with Youtube player (#330)
- Add `--warcs` option to directly process WARC files (#301)
### Changed ### Changed

View File

@ -12,6 +12,7 @@ import shutil
import signal import signal
import subprocess import subprocess
import sys import sys
import tarfile
import tempfile import tempfile
import urllib.parse import urllib.parse
from argparse import ArgumentParser from argparse import ArgumentParser
@ -363,6 +364,14 @@ def run(raw_args):
"individual JS files URL/path separated by a comma", "individual JS files URL/path separated by a comma",
) )
parser.add_argument(
"--warcs",
help="Directly convert WARC archives to ZIM, by-passing the crawling phase. "
"This argument must contain the path or HTTP(S) URL to either warc.gz files or"
"to a tar.gz containing the warc.gz files. Single value with individual "
"path/URLs separated by comma",
)
zimit_args, warc2zim_args = parser.parse_known_args(raw_args) zimit_args, warc2zim_args = parser.parse_known_args(raw_args)
# pass a scraper suffix to warc2zim so that both zimit and warc2zim versions are # pass a scraper suffix to warc2zim so that both zimit and warc2zim versions are
@ -499,33 +508,104 @@ def run(raw_args):
f"Output to tempdir: {temp_root_dir} - " f"Output to tempdir: {temp_root_dir} - "
f"{'will keep' if zimit_args.keep else 'will delete'}" f"{'will keep' if zimit_args.keep else 'will delete'}"
) )
logger.info(f"Running browsertrix-crawler crawl: {cmd_line}")
crawl = subprocess.run(cmd_args, check=False)
if crawl.returncode == EXIT_CODE_CRAWLER_LIMIT_HIT:
logger.info("crawl interupted by a limit")
elif crawl.returncode != 0:
raise subprocess.CalledProcessError(crawl.returncode, cmd_args)
if zimit_args.collection: # if warc files are passed, do not run browsertrix crawler but fetch the files if
warc_directory = temp_root_dir.joinpath( # they are provided as an HTTP URL + extract the archive if it is a tar.gz
f"collections/{zimit_args.collection}/archive/" warc_files: list[Path] = []
) if zimit_args.warcs:
else: for warc_location in [
warc_dirs = list(temp_root_dir.rglob("collections/crawl-*/archive/")) warc_location.strip() for warc_location in zimit_args.warcs.split(",")
if len(warc_dirs) == 0: ]:
raise RuntimeError( suffix = "".join(Path(urllib.parse.urlparse(warc_location).path).suffixes)
"Failed to find directory where WARC files have been created" if suffix not in {".tar.gz", ".warc", ".warc.gz"}:
raise Exception(f"Unsupported file at {warc_location}")
filename = tempfile.NamedTemporaryFile(
dir=temp_root_dir,
prefix="warc_",
suffix=suffix,
delete_on_close=False,
) )
elif len(warc_dirs) > 1:
logger.info("Found many WARC files directories, only last one will be used") if not re.match(r"^https?\://", warc_location):
for directory in warc_dirs: # warc_location is not a URL, so it is a path, simply add it to the list
logger.info(f"- {directory}") if not Path(warc_location).exists():
warc_directory = warc_dirs[-1] raise Exception(f"Impossible to find file at {warc_location}")
# if it is a plain warc or warc.gz, simply add it to the list
if suffix in {".warc", ".warc.gz"}:
warc_files.append(Path(warc_location))
continue
# otherwise extract tar.gz but do not delete it afterwards
extract_path = temp_root_dir / f"{filename.name}_files"
logger.info(
f"Extracting WARC(s) from {warc_location} to {extract_path}"
)
with tarfile.open(warc_location, "r:gz") as fh:
# Extract all the contents to the specified directory
fh.extractall(path=extract_path, filter="data")
warc_files.append(Path(extract_path))
continue
# warc_location is a URL, let's download it to a temp name to avoid name
# collisions
warc_file = Path(filename.name)
logger.info(f"Downloading WARC(s) from {warc_location} to {warc_file}")
resp = requests.get(warc_location, timeout=REQUESTS_TIMEOUT)
resp.raise_for_status()
warc_file.write_bytes(resp.content)
# if it is a plain warc or warc.gz, simply add it to the list
if suffix in {".warc", ".warc.gz"}:
warc_files.append(warc_file)
continue
# otherwise extract tar.gz and delete it afterwards
extract_path = temp_root_dir / f"{filename.name}_files"
logger.info(f"Extracting WARC(s) from {warc_file} to {extract_path}")
with tarfile.open(warc_file, "r:gz") as fh:
# Extract all the contents to the specified directory
fh.extractall(path=extract_path, filter="data")
logger.info(f"Deleting archive at {warc_file}")
warc_file.unlink()
warc_files.append(Path(extract_path))
else:
logger.info(f"Running browsertrix-crawler crawl: {cmd_line}")
crawl = subprocess.run(cmd_args, check=False)
if crawl.returncode == EXIT_CODE_CRAWLER_LIMIT_HIT:
logger.info("crawl interupted by a limit")
elif crawl.returncode != 0:
raise subprocess.CalledProcessError(crawl.returncode, cmd_args)
if zimit_args.collection:
warc_files = [
temp_root_dir.joinpath(f"collections/{zimit_args.collection}/archive/")
]
else:
warc_dirs = list(temp_root_dir.rglob("collections/crawl-*/archive/"))
if len(warc_dirs) == 0:
raise RuntimeError(
"Failed to find directory where WARC files have been created"
)
elif len(warc_dirs) > 1:
logger.info(
"Found many WARC files directories, only last one will be used"
)
for directory in warc_dirs:
logger.info(f"- {directory}")
warc_files = [warc_dirs[-1]]
logger.info("") logger.info("")
logger.info("----------") logger.info("----------")
logger.info(f"Processing WARC files in {warc_directory}") logger.info(
warc2zim_args.append(str(warc_directory)) f"Processing WARC files in/at "
f'{" ".join(str(warc_file) for warc_file in warc_files)}'
)
warc2zim_args.extend(str(warc_file) for warc_file in warc_files)
logger.info(f"Calling warc2zim with these args: {warc2zim_args}") logger.info(f"Calling warc2zim with these args: {warc2zim_args}")