fix: applying update archive

It now works, previously I thought py7zr works the same as zipfile but it's different af so I had to change some code for that.
This commit is contained in:
2024-10-23 20:46:44 +07:00
parent f02f1d5988
commit 7e5a60bb33
4 changed files with 91 additions and 72 deletions

View File

@ -148,7 +148,7 @@ class VoicepackUpdateAll(Command):
game_info = State.game.get_remote_game(pre_download=pre_download)
except Exception as e:
progress.finish(
f"<error>Update checking failed with following error: {e} \n({traceback.format_exc()})</error>"
f"<error>Update checking failed with following error: {e} \n{traceback.format_exc()}</error>"
)
return
if update_diff is None:
@ -192,7 +192,7 @@ class VoicepackUpdateAll(Command):
)
except Exception as e:
progress.finish(
f"<error>Couldn't apply update: {e} \n({traceback.format_exc()})</error>"
f"<error>Couldn't apply update: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish(
@ -228,7 +228,7 @@ class UpdatePatchCommand(Command):
patcher.update_patch()
except PatchUpdateError as e:
progress.finish(
f"<error>Patch update failed with following error: {e} \n({traceback.format_exc()})</error>"
f"<error>Patch update failed with following error: {e} \n{traceback.format_exc()}</error>"
)
else:
progress.finish("<comment>Patch updated!</comment>")
@ -246,7 +246,7 @@ class PatchInstallCommand(Command):
jadeite_dir = patcher.patch_game(game=State.game)
except PatcherError as e:
progress.finish(
f"<error>Patch installation failed with following error: {e} \n({traceback.format_exc()})</error>"
f"<error>Patch installation failed with following error: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish("<comment>Patch installed!</comment>")
@ -279,7 +279,7 @@ class PatchInstallCommand(Command):
patcher.patch_game(game=State.game)
except PatcherError as e:
progress.finish(
f"<error>Patch installation failed with following error: {e} \n({traceback.format_exc()})</error>"
f"<error>Patch installation failed with following error: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish("<comment>Patch installed!</comment>")
@ -344,7 +344,7 @@ class PatchInstallCommand(Command):
patcher.update_patch()
except PatchUpdateError as e:
progress.finish(
f"<error>Patch update failed with following error: {e} \n({traceback.format_exc()})</error>"
f"<error>Patch update failed with following error: {e} \n{traceback.format_exc()}</error>"
)
else:
progress.finish("<comment>Patch updated.</comment>")
@ -437,7 +437,7 @@ class UpdateCommand(Command):
game_info = State.game.get_remote_game(pre_download=pre_download)
except Exception as e:
progress.finish(
f"<error>Update checking failed with following error: {e} \n({traceback.format_exc()})</error>"
f"<error>Update checking failed with following error: {e} \n{traceback.format_exc()}</error>"
)
return
if update_diff is None or isinstance(game_info.major, str | None):
@ -474,7 +474,7 @@ class UpdateCommand(Command):
State.game.apply_update_archive(out_path, auto_repair=auto_repair)
except Exception as e:
progress.finish(
f"<error>Couldn't apply update: {e} \n({traceback.format_exc()})</error>"
f"<error>Couldn't apply update: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish("<comment>Update applied for base game.</comment>")
@ -485,7 +485,12 @@ class UpdateCommand(Command):
if remote_voicepack.language not in installed_voicepacks:
continue
# Voicepack is installed, update it
archive_file = State.game.cache.joinpath(PurePath(remote_voicepack.url).name)
archive_file = State.game.cache.joinpath(
PurePath(remote_voicepack.url).name
)
self.line(
f"Downloading update package for voicepack language '{remote_voicepack.language.name}'..."
)
try:
download_result = utils.download(
remote_voicepack.url, archive_file, file_len=remote_voicepack.size
@ -505,13 +510,14 @@ class UpdateCommand(Command):
)
except Exception as e:
progress.finish(
f"<error>Couldn't apply update: {e} \n({traceback.format_exc()})</error>"
f"<error>Couldn't apply update: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish(
f"<comment>Update applied for language {remote_voicepack.language}.</comment>"
f"<comment>Update applied for language {remote_voicepack.language.name}.</comment>"
)
self.line("Setting version config... ")
State.game.version_override = None
State.game.set_version_config()
self.line(
f"The game has been updated to version: <comment>{State.game.get_version_str()}</comment>"
@ -542,7 +548,7 @@ class RepairCommand(Command):
State.game.repair_game()
except Exception as e:
progress.finish(
f"<error>Repairation failed with following error: {e} \n({traceback.format_exc()})</error>"
f"<error>Repairation failed with following error: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish("<comment>Repairation completed.</comment>")
@ -578,7 +584,7 @@ class UpdateDownloadCommand(Command):
game_info = State.game.get_remote_game(pre_download=pre_download)
except Exception as e:
progress.finish(
f"<error>Update checking failed with following error: {e} \n({traceback.format_exc()})</error>"
f"<error>Update checking failed with following error: {e} \n{traceback.format_exc()}</error>"
)
return
if update_diff is None or isinstance(game_info.major, str | None):
@ -602,7 +608,9 @@ class UpdateDownloadCommand(Command):
update_game_url, out_path, file_len=update_diff.game_pkgs[0].size
)
except Exception as e:
self.line_error(f"<error>Couldn't download update: {e}</error>")
self.line_error(
f"<error>Couldn't download update: {e} \n{traceback.format_exc()}</error>"
)
return
if not download_result:
@ -616,7 +624,12 @@ class UpdateDownloadCommand(Command):
if remote_voicepack.language not in installed_voicepacks:
continue
# Voicepack is installed, update it
archive_file = State.game.cache.joinpath(PurePath(remote_voicepack.url).name)
archive_file = State.game.cache.joinpath(
PurePath(remote_voicepack.url).name
)
self.line(
f"Downloading update package for voicepack language '{remote_voicepack.language.name}'..."
)
try:
download_result = utils.download(
remote_voicepack.url, archive_file, file_len=remote_voicepack.size
@ -652,7 +665,7 @@ class ApplyUpdateArchive(Command):
State.game.apply_update_archive(update_archive, auto_repair=auto_repair)
except Exception as e:
progress.finish(
f"<error>Couldn't apply update: {e} \n({traceback.format_exc()})</error>"
f"<error>Couldn't apply update: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish("<comment>Update applied.</comment>")

View File

@ -20,13 +20,18 @@ class GamePackage:
self.decompressed_size = decompressed_size
@staticmethod
def from_dict(data: dict) -> "GamePackage":
return GamePackage(
url=data["url"],
md5=data["md5"],
size=int(data["size"]),
decompressed_size=int(data["decompressed_size"]),
)
def from_dict(data: list[dict]) -> list["GamePackage"]:
game_pkgs = []
for pkg in data:
game_pkgs.append(
GamePackage(
url=pkg["url"],
md5=pkg["md5"],
size=int(pkg["size"]),
decompressed_size=int(pkg["decompressed_size"]),
)
)
return game_pkgs
class AudioPackage:
@ -45,14 +50,19 @@ class AudioPackage:
self.decompressed_size = decompressed_size
@staticmethod
def from_dict(data: dict) -> "AudioPackage":
return AudioPackage(
language=VoicePackLanguage.from_remote_str(data["language"]),
url=data["url"],
md5=data["md5"],
size=int(data["size"]),
decompressed_size=int(data["decompressed_size"]),
)
def from_dict(data: list[dict]) -> "AudioPackage":
audio_pkgs = []
for pkg in data:
audio_pkgs.append(
AudioPackage(
language=VoicePackLanguage.from_remote_str(pkg["language"]),
url=pkg["url"],
md5=pkg["md5"],
size=int(pkg["size"]),
decompressed_size=int(pkg["decompressed_size"]),
)
)
return audio_pkgs
class Major:
@ -72,8 +82,8 @@ class Major:
def from_dict(data: dict) -> "Major":
return Major(
version=data["version"],
game_pkgs=[GamePackage(**x) for x in data["game_pkgs"]],
audio_pkgs=[AudioPackage(**x) for x in data["audio_pkgs"]],
game_pkgs=GamePackage.from_dict(data["game_pkgs"]),
audio_pkgs=AudioPackage.from_dict(data["audio_pkgs"]),
res_list_url=data["res_list_url"],
)

View File

@ -45,10 +45,14 @@ def apply_update_archive(
pass
# Think for me a better name for this variable
txtfiles = archive.read(["deletefiles.txt", "hdifffiles.txt"])
# Reset archive to extract files
archive.reset()
try:
# miHoYo loves CRLF
deletebytes = txtfiles["deletefiles.txt"].read()
if deletebytes is bytes:
if deletebytes is not str:
# Typing
deletebytes: bytes
deletebytes = deletebytes.decode()
deletefiles = deletebytes.split("\r\n")
except IOError:
@ -69,7 +73,9 @@ def apply_update_archive(
# Read hdifffiles.txt to get the files to patch
hdifffiles = []
hdiffbytes = txtfiles["hdifffiles.txt"].read()
if hdiffbytes is bytes:
if hdiffbytes is not str:
# Typing
hdiffbytes: bytes
hdiffbytes = hdiffbytes.decode()
for x in hdiffbytes.split("\r\n"):
try:
@ -78,13 +84,9 @@ def apply_update_archive(
pass
# Patch function
def extract_and_patch(file, patch_file):
def patch(file, patch_file):
patchpath = game.cache.joinpath(patch_file)
# Delete old patch file if exists
patchpath.unlink(missing_ok=True)
# Extract patch file
# Spaghetti code :(, fuck my eyes.
archive.extract(game.temppath, [patch_file])
file = file.rename(file.with_suffix(file.suffix + ".bak"))
try:
_hdiff.patch_file(file, file.with_suffix(""), patchpath)
@ -106,18 +108,9 @@ def apply_update_archive(
# Remove old file, since we don't need it anymore.
file.unlink()
def extract_or_repair(file: str):
# Extract file
try:
archive.extract(game.path, [file])
except Exception as e:
# Repair file
if not auto_repair:
raise e
game.repair_file(game.path.joinpath(file))
# Multi-threaded patching
patch_jobs = []
patch_files = []
for file_str in hdifffiles:
file = game.path.joinpath(file_str)
if not file.exists():
@ -126,8 +119,13 @@ def apply_update_archive(
patch_file: str = file_str + ".hdiff"
# Remove hdiff files from files list to extract
files.remove(patch_file)
patch_jobs.append([extract_and_patch, [file, patch_file]])
# Add file to extract list
patch_files.append(patch_file)
patch_jobs.append([patch, [file, patch_file]])
# Extract patch files to temporary dir
archive.extract(game.cache, patch_files)
archive.reset() # For the next extraction
# Create new ThreadPoolExecutor for patching
patch_executor = concurrent.futures.ThreadPoolExecutor()
for job in patch_jobs:
@ -135,15 +133,7 @@ def apply_update_archive(
patch_executor.shutdown(wait=True)
# Extract files from archive after we have filtered out the patch files
# Using ProcessPoolExecutor instead of archive.extractall() because
# archive.extractall() can crash with large archives, and it doesn't
# handle broken files.
# ProcessPoolExecutor is faster than ThreadPoolExecutor, and it shouldn't
# cause any problems here.
extract_executor = concurrent.futures.ProcessPoolExecutor()
for file in files:
extract_executor.submit(extract_or_repair, file)
extract_executor.shutdown(wait=True)
archive.extract(game.path, files)
# Close the archive
archive.close()

View File

@ -106,7 +106,6 @@ class Game(GameABC):
return False
if (
not self._path.joinpath("StarRail.exe").exists()
or not self._path.joinpath("StarRailBase.dll").exists()
or not self._path.joinpath("StarRail_Data").exists()
):
return False
@ -166,11 +165,13 @@ class Game(GameABC):
if not cfg_file.exists():
return (0, 0, 0)
cfg = ConfigFile(cfg_file)
if "General" not in cfg.sections():
# Fk u miHoYo
if "general" in cfg.sections():
version_str = cfg.get("general", "game_version", fallback="0.0.0")
elif "General" in cfg.sections():
version_str = cfg.get("General", "game_version", fallback="0.0.0")
else:
return (0, 0, 0)
if "game_version" not in cfg["General"]:
return (0, 0, 0)
version_str = cfg["General"]["game_version"]
if version_str.count(".") != 2:
return (0, 0, 0)
try:
@ -196,10 +197,14 @@ class Game(GameABC):
{
"General": {
"channel": 1,
"cps": "hoyoverse_PC",
"cps": "hyp_hoyoverse",
"game_version": self.get_version_str(),
"sub_channel": 1,
"plugin_2_version": "0.0.1",
"uapc": {
"hkrpg_global": {"uapc": "f5c7c6262812_"},
"hyp": {"uapc": ""},
}, # Honestly what's this?
}
}
)
@ -294,19 +299,22 @@ class Game(GameABC):
if not self.is_installed():
raise GameNotInstalledError("Game is not installed.")
voicepacks = []
blacklisted_words = ["SFX"]
for child in (
self.data_folder()
.joinpath("Persistent/Audio/AudioPackage/Windows/")
.iterdir()
):
if child.is_dir():
if child.resolve().is_dir() and child.name not in blacklisted_words:
try:
voicepacks.append(VoicePackLanguage[child.name])
except ValueError:
pass
return voicepacks
def get_remote_game(self, pre_download: bool = False) -> resource.Main | resource.PreDownload:
def get_remote_game(
self, pre_download: bool = False
) -> resource.Main | resource.PreDownload:
"""
Gets the current game information from remote.
@ -352,9 +360,7 @@ class Game(GameABC):
def _repair_file(self, file: PathLike, game: resource.Main) -> None:
# .replace("\\", "/") is needed because Windows uses backslashes :)
relative_file = file.relative_to(self._path)
url = (
game.major.res_list_url + "/" + str(relative_file).replace("\\", "/")
)
url = game.major.res_list_url + "/" + str(relative_file).replace("\\", "/")
# Backup the file
if file.exists():
backup_file = file.with_suffix(file.suffix + ".bak")