fix(common): compat with 'hdiffmap.json'

miHoYo introduced a new hdiff map, so yeah.
This commit is contained in:
2025-01-14 21:51:31 +07:00
parent da9c930d2e
commit f5007b6aa5
3 changed files with 83 additions and 51 deletions

View File

@ -21,6 +21,30 @@ from vollerei.utils import HDiffPatch, HPatchZPatchError, download
_hdiff = HDiffPatch()
def _extract_files(
archive: py7zr.SevenZipFile | zipfile.ZipFile, files, path: PathLike
):
if isinstance(archive, py7zr.SevenZipFile):
# .7z archive
archive.extract(path, files)
else:
# .zip archive
archive.extractall(path, files)
def _open_archive(file: Path | IOBase) -> py7zr.SevenZipFile | zipfile.ZipFile:
archive: py7zr.SevenZipFile | zipfile.ZipFile = None
try:
archive = py7zr.SevenZipFile(file, "r")
except py7zr.exceptions.Bad7zFile:
# Try to open it as a zip file
try:
archive = zipfile.ZipFile(file, "r")
except zipfile.BadZipFile:
raise ValueError("Archive is not a valid 7z or zip file.")
return archive
def apply_update_archive(
game: GameABC, archive_file: Path | IOBase, auto_repair: bool = True
) -> None:
@ -46,25 +70,7 @@ def apply_update_archive(
if isinstance(archive, py7zr.SevenZipFile):
archive.reset()
def extract_files(
archive: py7zr.SevenZipFile | zipfile.ZipFile, files, path: PathLike
):
if isinstance(archive, py7zr.SevenZipFile):
# .7z archive
archive.extract(path, files)
else:
# .zip archive
archive.extractall(path, files)
archive: py7zr.SevenZipFile | zipfile.ZipFile = None
try:
archive = py7zr.SevenZipFile(archive_file, "r")
except py7zr.exceptions.Bad7zFile:
# Try to open it as a zip file
try:
archive = zipfile.ZipFile(archive_file, "r")
except zipfile.BadZipFile:
raise ValueError("Archive is not a valid 7z or zip file.")
archive = _open_archive(archive_file)
# Get files list (we don't want to extract all of them)
files = archive.namelist()
@ -78,7 +84,7 @@ def apply_update_archive(
# Think for me a better name for this variable
txtfiles = None
if isinstance(archive, py7zr.SevenZipFile):
txtfiles = archive.read(["deletefiles.txt", "hdifffiles.txt"])
txtfiles = archive.read(["deletefiles.txt", "hdifffiles.txt", "hdiffmap.json"])
# Reset archive to extract files
archive.reset()
try:
@ -108,63 +114,88 @@ def apply_update_archive(
# hdiffpatch implementation
# Read hdifffiles.txt to get the files to patch
hdifffiles = []
# Hdifffile format is [(source file, target file)]
# While the patch file is named as target file + ".hdiff"
hdifffiles: list[tuple[str, str]] = []
new_hdiff_map = False
if txtfiles is not None:
hdiffbytes = txtfiles["hdifffiles.txt"].read()
old_hdiff_map = txtfiles.get("hdifffiles.txt")
if old_hdiff_map is not None:
hdiffbytes = old_hdiff_map.read()
else:
new_hdiff_map = True
hdiffbytes = txtfiles["hdiffmap.json"].read()
else:
hdiffbytes = archive.read("hdifffiles.txt")
# Archive file must be a zip file
if zipfile.Path(archive).joinpath("hdifffiles.txt").is_file():
hdiffbytes = archive.read("hdifffiles.txt")
else:
new_hdiff_map = True
hdiffbytes = archive.read("hdiffmap.json")
if hdiffbytes is not str:
# Typing
hdiffbytes: bytes
hdiffbytes = hdiffbytes.decode()
for x in hdiffbytes.split("\r\n"):
try:
hdifffiles.append(json.loads(x.strip())["remoteName"])
except json.JSONDecodeError:
pass
if new_hdiff_map:
mapping = json.loads(hdiffbytes)
for diff in mapping["diff_map"]:
hdifffiles.append((diff["source_file_name"], diff["target_file_name"]))
else:
for x in hdiffbytes.split("\r\n"):
try:
name = json.loads(x.strip())["remoteName"]
hdifffiles.append((name, name))
except json.JSONDecodeError:
pass
# Patch function
def patch(file, patch_file):
patchpath = game.cache.joinpath(patch_file)
def patch(source_file: Path, target_file: Path, patch_file: str):
patch_path = game.cache.joinpath(patch_file)
# Spaghetti code :(, fuck my eyes.
file = file.rename(file.with_suffix(file.suffix + ".bak"))
bak_src_file = source_file.rename(
source_file.with_suffix(source_file.suffix + ".bak")
)
try:
_hdiff.patch_file(file, file.with_suffix(""), patchpath)
_hdiff.patch_file(bak_src_file, target_file, patch_path)
except HPatchZPatchError:
if auto_repair:
try:
game.repair_file(game.path.joinpath(file.with_suffix("")))
# The game repairs file by downloading the latest file, in this case we want the target file
# instead of source file. Honestly I haven't tested this but I hope it works.
game.repair_file(target_file)
except Exception:
# Let the game download the file.
file.rename(file.with_suffix(""))
bak_src_file.rename(file.with_suffix(""))
else:
file.unlink()
bak_src_file.unlink()
else:
# Let the game download the file.
file.rename(file.with_suffix(""))
bak_src_file.rename(file.with_suffix(""))
return
else:
# Remove old file, since we don't need it anymore.
bak_src_file.unlink()
finally:
patchpath.unlink()
# Remove old file, since we don't need it anymore.
file.unlink()
patch_path.unlink()
# Multi-threaded patching
patch_jobs = []
patch_files = []
for file_str in hdifffiles:
file = game.path.joinpath(file_str)
if not file.exists():
for source_file, target_file in hdifffiles:
source_path = game.path.joinpath(source_file)
if not source_path.exists():
# Not patching since we don't have the file
continue
patch_file: str = file_str + ".hdiff"
target_path = game.path.joinpath(target_file)
patch_file: str = target_file + ".hdiff"
# Remove hdiff files from files list to extract
files.remove(patch_file)
# Add file to extract list
patch_files.append(patch_file)
patch_jobs.append([patch, [file, patch_file]])
patch_jobs.append([patch, [source_path, target_path, patch_file]])
# Extract patch files to temporary dir
extract_files(archive, patch_files, game.cache)
_extract_files(archive, patch_files, game.cache)
reset_if_py7zr(archive) # For the next extraction
# Create new ThreadPoolExecutor for patching
patch_executor = concurrent.futures.ThreadPoolExecutor()
@ -173,7 +204,7 @@ def apply_update_archive(
patch_executor.shutdown(wait=True)
# Extract files from archive after we have filtered out the patch files
extract_files(archive, files, game.path)
_extract_files(archive, files, game.path)
# Close the archive
archive.close()
@ -190,8 +221,7 @@ def install_archive(game: GameABC, archive_file: Path | IOBase) -> None:
"""
if game.is_installed():
raise GameAlreadyInstalledError("Game is already installed.")
# It's literally 3 lines but okay
archive = py7zr.SevenZipFile(archive_file, "r")
archive = _open_archive(archive_file)
archive.extractall(game.path)
archive.close()

View File

@ -1,4 +1,3 @@
LATEST_VERSION = (2, 5, 0)
MD5SUMS = {
"1.0.5": {
"cn": {

View File

@ -1,3 +1,4 @@
from os import PathLike
import platform
import subprocess
from zipfile import ZipFile
@ -86,9 +87,11 @@ class HDiffPatch:
def hpatchz(self) -> str | None:
return self._get_binary("hpatchz")
def patch_file(self, in_file, out_file, patch_file):
def patch_file(self, in_file: PathLike, out_file: PathLike, patch_file: PathLike):
try:
subprocess.check_call([self.hpatchz(), "-f", in_file, patch_file, out_file])
subprocess.check_call(
[self.hpatchz(), "-f", str(in_file), str(patch_file), str(out_file)]
)
except subprocess.CalledProcessError as e:
raise HPatchZPatchError("Patch error") from e