fix: handle .zip update archive

Also fixes some minor bug
This commit is contained in:
2024-12-18 19:10:34 +07:00
parent 7440b6041f
commit d6d1fdee6e
6 changed files with 76 additions and 50 deletions

View File

@ -1,4 +1,4 @@
# Common function for all games
# Function for all games
Since you should use the specific implementation for the target game instead, these functions may not be correctly documented.

View File

@ -114,7 +114,7 @@ class GameABC(ABC):
"""
pass
def get_update(self):
def get_update(self) -> resource.Patch | None:
"""
Get the game update
"""
@ -126,7 +126,9 @@ class GameABC(ABC):
"""
pass
def get_remote_game(self, pre_download: bool = False) -> resource.Main | resource.PreDownload:
def get_remote_game(
self, pre_download: bool = False
) -> resource.Main | resource.PreDownload:
"""
Gets the current game information from remote.

View File

@ -5,6 +5,7 @@ from cleo.helpers import option, argument
from pathlib import PurePath
from platform import system
from vollerei.abc.launcher.game import GameABC
from vollerei.common.api import resource
from vollerei.common.enums import GameChannel, VoicePackLanguage
from vollerei.cli import utils
from vollerei.exceptions.game import GameError
@ -141,9 +142,7 @@ class VoicepackList(Command):
class VoicepackInstall(Command):
name = "hsr voicepack install"
description = (
"Installs the specified installed voicepacks"
)
description = "Installs the specified installed voicepacks"
options = default_options + [
option("pre-download", description="Pre-download the game if available"),
]
@ -196,7 +195,9 @@ class VoicepackInstall(Command):
self.line(
f"Downloading install package for language: <comment>{remote_voicepack.language.name}</comment>... "
)
archive_file = State.game.cache.joinpath(PurePath(remote_voicepack.url).name)
archive_file = State.game.cache.joinpath(
PurePath(remote_voicepack.url).name
)
try:
download_result = utils.download(
remote_voicepack.url, archive_file, file_len=remote_voicepack.size
@ -280,7 +281,7 @@ class VoicepackUpdate(Command):
progress = utils.ProgressIndicator(self)
progress.start("Checking for updates... ")
try:
update_diff = State.game.get_update(pre_download=pre_download)
update_diff: resource.Patch | None = State.game.get_update(pre_download=pre_download)
game_info = State.game.get_remote_game(pre_download=pre_download)
except Exception as e:
progress.finish(
@ -295,23 +296,25 @@ class VoicepackUpdate(Command):
f"The current version is: <comment>{State.game.get_version_str()}</comment>"
)
self.line(
f"The latest version is: <comment>{game_info.latest.version}</comment>"
f"The latest version is: <comment>{game_info.major.version}</comment>"
)
if not self.confirm("Do you want to update the game?"):
self.line("<error>Update aborted.</error>")
return
# Voicepack update
for remote_voicepack in update_diff.voice_packs:
for remote_voicepack in update_diff.audio_pkgs:
if remote_voicepack.language not in installed_voicepacks:
continue
# Voicepack is installed, update it
self.line(
f"Downloading update package for language: <comment>{remote_voicepack.language.name}</comment>... "
archive_file = State.game.cache.joinpath(
PurePath(remote_voicepack.url).name
)
self.line(
f"Downloading update package for voicepack language '{remote_voicepack.language.name}'..."
)
archive_file = State.game.cache.joinpath(remote_voicepack.name)
try:
download_result = utils.download(
remote_voicepack.path, archive_file, file_len=update_diff.size
remote_voicepack.url, archive_file, file_len=remote_voicepack.size
)
except Exception as e:
self.line_error(f"<error>Couldn't download update: {e}</error>")
@ -334,10 +337,9 @@ class VoicepackUpdate(Command):
progress.finish(
f"<comment>Update applied for language {remote_voicepack.language.name}.</comment>"
)
State.game.version_override = game_info.major.version
set_version_config(self=self)
self.line(
f"The game has been updated to version: <comment>{State.game.get_version_str()}</comment>"
)
State.game.version_override = None
class PatchTypeCommand(Command):
@ -601,7 +603,7 @@ class InstallCommand(Command):
progress.finish("<comment>Package applied for the base game.</comment>")
self.line("Setting version config... ")
State.game.version_override = game_info.major.version
set_version_config()
set_version_config(self=self)
State.game.version_override = None
self.line(
f"The game has been installed to version: <comment>{State.game.get_version_str()}</comment>"
@ -719,7 +721,7 @@ class UpdateCommand(Command):
)
self.line("Setting version config... ")
State.game.version_override = game_info.major.version
set_version_config()
set_version_config(self=self)
State.game.version_override = None
self.line(
f"The game has been updated to version: <comment>{State.game.get_version_str()}</comment>"
@ -940,7 +942,8 @@ class ApplyUpdateArchive(Command):
)
return
progress.finish("<comment>Update applied.</comment>")
set_version_config()
set_version_config(self=self)
# This is the list for HSR commands, we'll add Genshin commands later
classes = [

View File

@ -2,6 +2,7 @@ import concurrent.futures
import json
import hashlib
import py7zr
import zipfile
from io import IOBase
from os import PathLike
from pathlib import Path
@ -39,9 +40,30 @@ def apply_update_archive(
# Install HDiffPatch
_hdiff.hpatchz()
# Open archive
# archive = zipfile.ZipFile(archive_file, "r")
archive = py7zr.SevenZipFile(archive_file, "r")
def reset_if_py7zr(archive):
if isinstance(archive, py7zr.SevenZipFile):
archive.reset()
def extract_files(archive: py7zr.SevenZipFile | zipfile.ZipFile, files, path: PathLike):
if isinstance(archive, py7zr.SevenZipFile):
# .7z archive
archive.extract(path, files)
else:
# .zip archive
archive.extractall(path, files)
archive: py7zr.SevenZipFile | zipfile.ZipFile = None
try:
archive = py7zr.SevenZipFile(archive_file, "r")
except py7zr.exceptions.Bad7zFile:
# Try to open it as a zip file
try:
archive = zipfile.ZipFile(archive_file, "r")
except zipfile.BadZipFile:
raise ValueError("Archive is not a valid 7z or zip file.")
# Get files list (we don't want to extract all of them)
files = archive.namelist()
# Don't extract these files (they're useless and if the game isn't patched then
@ -52,18 +74,23 @@ def apply_update_archive(
except ValueError:
pass
# Think for me a better name for this variable
txtfiles = archive.read(["deletefiles.txt", "hdifffiles.txt"])
# Reset archive to extract files
archive.reset()
txtfiles = None
if isinstance(archive, py7zr.SevenZipFile):
txtfiles = archive.read(["deletefiles.txt", "hdifffiles.txt"])
# Reset archive to extract files
archive.reset()
try:
# miHoYo loves CRLF
deletebytes = txtfiles["deletefiles.txt"].read()
if txtfiles is not None:
deletebytes = txtfiles["deletefiles.txt"].read()
else:
deletebytes = archive.read("deletefiles.txt")
if deletebytes is not str:
# Typing
deletebytes: bytes
deletebytes = deletebytes.decode()
deletefiles = deletebytes.split("\r\n")
except IOError:
except (IOError, KeyError):
pass
else:
for file_str in deletefiles:
@ -80,7 +107,10 @@ def apply_update_archive(
# hdiffpatch implementation
# Read hdifffiles.txt to get the files to patch
hdifffiles = []
hdiffbytes = txtfiles["hdifffiles.txt"].read()
if txtfiles is not None:
hdiffbytes = txtfiles["hdifffiles.txt"].read()
else:
hdiffbytes = archive.read("hdifffiles.txt")
if hdiffbytes is not str:
# Typing
hdiffbytes: bytes
@ -132,8 +162,8 @@ def apply_update_archive(
patch_jobs.append([patch, [file, patch_file]])
# Extract patch files to temporary dir
archive.extract(game.cache, patch_files)
archive.reset() # For the next extraction
extract_files(archive, patch_files, game.cache)
reset_if_py7zr(archive) # For the next extraction
# Create new ThreadPoolExecutor for patching
patch_executor = concurrent.futures.ThreadPoolExecutor()
for job in patch_jobs:
@ -141,7 +171,7 @@ def apply_update_archive(
patch_executor.shutdown(wait=True)
# Extract files from archive after we have filtered out the patch files
archive.extract(game.path, files)
extract_files(archive, files, game.path)
# Close the archive
archive.close()

View File

@ -30,7 +30,7 @@ class Game(GameABC):
if not cache_path:
cache_path = paths.cache_path
cache_path = Path(cache_path)
self.cache: Path = cache_path.joinpath("game/genshin/")
self.cache: Path = cache_path.joinpath("game/zzz/")
self.cache.mkdir(parents=True, exist_ok=True)
self._version_override: tuple[int, int, int] | None = None
self._channel_override: GameChannel | None = None
@ -192,7 +192,7 @@ class Game(GameABC):
Gets the current installed game version.
Credits to An Anime Team for the code that does the magic:
https://github.com/an-anime-team/anime-game-core/blob/main/src/games/genshin/game.rs#L52
https://github.com/an-anime-team/anime-game-core/blob/main/src/games/zzz/game.rs#L49
If the above method fails, it'll fallback to read the config.ini file
for the version, which is not recommended (as described in
@ -224,25 +224,18 @@ class Game(GameABC):
for byte in f.read(10000):
match byte:
case 0:
if correct and len(version_bytes[0]) > 0 and len(version_bytes[1]) > 0 and len(version_bytes[2]) > 0:
found_version = tuple(
bytes_to_int(i) for i in version_bytes
)
return found_version
version_bytes = [[], [], []]
version_ptr = 0
correct = True
case 46:
case b'.':
version_ptr += 1
if version_ptr > 2:
correct = False
case 95:
if (
correct
and len(version_bytes[0]) > 0
and len(version_bytes[1]) > 0
and len(version_bytes[2]) > 0
):
return (
bytes_to_int(version_bytes[0]),
bytes_to_int(version_bytes[1]),
bytes_to_int(version_bytes[2]),
)
case _:
if correct and byte in b"0123456789":
version_bytes[version_ptr].append(byte)
@ -276,9 +269,7 @@ class Game(GameABC):
raise GameNotInstalledError("Game is not installed.")
voicepacks = []
for child in (
self.data_folder()
.joinpath("StreamingAssets/Audio/Windows/Full/")
.iterdir()
self.data_folder().joinpath("StreamingAssets/Audio/Windows/Full/").iterdir()
):
if child.resolve().is_dir():
try: