Compare commits

...

66 Commits

Author SHA1 Message Date
a084c61cae fix: disable repair command
The new HoYoPlay launcher doesn't have the ability to get scattered files from the remote anymore, "repairing" the game will just download the whole game again.
2025-05-21 04:22:38 +07:00
10d5f3f208 fix(common): merge zip file too 2025-03-03 17:04:05 +07:00
7707a64733 fix(common): bruh 2025-03-03 17:00:40 +07:00
76bccf6a6f fix(common): support zip file 2025-03-03 16:50:51 +07:00
874686a95c chore: change default path to user
So we can use it on Linux
2025-03-02 03:04:02 +07:00
1863fafd85 fix(common): manually close the merged archive 2025-02-26 18:48:46 +07:00
ac76df577b fix(common): install archive for split files
It worked before, but not now because I switched to py7zr backend for a while now.
2025-02-26 18:41:01 +07:00
ed641f890d fix(common): remove installed check 2025-02-02 22:53:27 +07:00
9e64bfc531 docs: fix again 2025-02-02 00:04:39 +07:00
25cfcdf0f0 fix: lol 2025-02-02 00:04:14 +07:00
b2eec5ee30 docs: fix 2025-02-02 00:01:32 +07:00
7dbe890bf3 fix: remove game channel from games
Lol
2025-02-01 23:58:09 +07:00
976308ac85 feat: unify multiple games into one 2025-02-01 23:51:53 +07:00
fba6cecc38 fix(common): add 'hdiffmap.json' to ignore list
So we don't extract the file lol.
2025-01-15 15:13:56 +07:00
f5007b6aa5 fix(common): compat with 'hdiffmap.json'
miHoYo introduced a new hdiff map, so yeah.
2025-01-14 21:51:31 +07:00
da9c930d2e fix(genshin): handle key error 2025-01-01 12:40:39 +07:00
fc3a43bec7 fix(genshin): voicepack english 2024-12-31 11:16:07 +07:00
844453eabc fix(genshin): voicepack detection 2024-12-31 01:52:01 +07:00
d6d1fdee6e fix: handle .zip update archive
Also fixes some minor bug
2024-12-18 19:10:34 +07:00
7440b6041f chore(ci): i forgor 2024-12-18 18:31:22 +07:00
75649df729 chore(ci): use poetry to install docs 2024-12-18 18:29:53 +07:00
707bcc14c3 chore(ci): fix 2024-12-18 18:27:31 +07:00
18aa7935cb chore(ci): fix 2024-12-18 18:21:12 +07:00
ecd204428d chore: docs 2024-12-18 18:18:33 +07:00
d920aea2b8 chore: add proxy game to readme 2024-12-18 17:40:43 +07:00
441a06fb5b fix(zzz); voicepack language detection 2024-12-18 17:39:48 +07:00
a33bdaa473 feat: implement zzz
It literally is just copy-pasting
2024-12-18 17:35:04 +07:00
4a7bc3d0b4 chore(cli): migrate to one file
Easier to maintain but slightly slower execution speed lol
2024-12-18 15:20:24 +07:00
1948d1f741 chore: add whatever spicy thingy here 2024-12-18 13:30:28 +07:00
6f384c1cc5 chore: add some information to readme 2024-12-13 19:42:36 +07:00
ebc0f2f3f5 feat(cli): support genshin
It literally is just copy-pasting lol, since almost all code are shared.
2024-12-13 19:35:51 +07:00
0c83958ee5 feat(cli/hsr): implement 'voicepack install' & other changes
Rename 'voicepack update-all' to 'voicepack update', allowing update only a voicepack instead of having to upate all of them
2024-12-13 19:30:54 +07:00
95ef9409f5 feat(cli/hsr): add 'hsr install download' command
Yeah
2024-12-13 13:22:38 +07:00
659366620b feat(cli/hsr): support game installation 2024-12-13 12:54:59 +07:00
5304cfbde1 chore: fix a bit
Wtf me
2024-12-13 11:59:51 +07:00
2331f9404a feat(game/genshin): initial copy of hsr code
Lol
2024-12-13 11:56:30 +07:00
a3df498444 chore: move repair-related functions to common
Preparation for Genshin Impact support xD
2024-12-13 11:26:48 +07:00
8ba00754ee chore: add more info to readme 2024-12-12 18:20:21 +07:00
4e0c4ed2c0 chore: refactor readme 2024-10-26 00:36:24 +07:00
7e5a60bb33 fix: applying update archive
It now works, previously I thought py7zr works the same as zipfile but it's different af so I had to change some code for that.
2024-10-23 20:46:44 +07:00
f02f1d5988 fix(common/update): zipfile -> py7zr
So py7zr does have some differences from zipfile
2024-10-22 13:10:14 +07:00
0bac04bdbd fix: use py7zr
Game archive now uses .7z, wow mihoyo
2024-10-22 00:34:57 +07:00
4d8f4008f2 fix(cli/hsr): download update now works
Also change e.__context__ to traceback.format_exc() to give better and actually useful error messages.
2024-10-21 13:34:38 +07:00
208c6efd1e fix(hsr/repair): fix the decompressed url
They changed it lol
2024-10-21 13:33:49 +07:00
156c42c1f3 fix: fix predownload object
Wow.
2024-10-21 13:33:17 +07:00
8ff2a388d7 fix: migrate to HoYoPlay
For now, only update game and voicepack are working

This commit also fixes a couple of bugs too. Tbf I enjoyed HoYoPlay until I don't have enough space to update my HSR so yeah 💀
2024-09-10 16:47:20 +07:00
08c51d2fd8 feat(repair): rework the repair feature
Mostly usable now
2024-06-18 02:49:52 +07:00
e8f63f175f Merge branch 'master' of himeko.tretrauit.me:tretrauit/vollerei 2024-06-06 20:21:06 +07:00
acd457babe fix(repair): join game path with pkg_version 2024-06-06 20:19:45 +07:00
bf5bdf7618 fix(repair): join game path with pkg_version 2024-06-06 20:17:22 +07:00
f5e7417cdf feat(hsr): repair game
Apparently doesn't completely work on my setup (NTFS + Btrfs) but yours may. If it doesn't fix the game then copy the file it downloaded from temp to the game directory and it should work.
2024-05-28 10:58:27 +07:00
bbdb8d3596 fix: revert back to ThreadPoolExecutor
ProcessPoolExecutor didn't work for me :(
2024-05-08 17:21:34 +07:00
f4e09a7aad fix: handle hdiffpatch downloading for each architecture 2024-03-27 12:20:29 +07:00
9569019fcf fix(cli/hsr): fix voicepack downloading 2024-03-25 22:57:00 +07:00
8c0e03ebfa feat(cli/hsr): add "update download"
Basically download the update only.
2024-03-25 20:14:14 +07:00
34f65c00f3 feat(cli/hsr): add more commands 2024-02-06 14:39:41 +07:00
a7d763d847 feat(hsr): add voicepack related commands 2024-02-06 14:24:44 +07:00
fa011a11a1 fix(cli): fix 416 response code 2024-02-06 14:24:15 +07:00
c2f81637e2 feat(hsr): add --from-version 2024-02-06 13:34:22 +07:00
7e18185b52 fix(hsr): wrong function call 2024-02-06 13:30:47 +07:00
63f5c36391 chore: fix dependencies 2024-02-06 12:12:42 +07:00
bdf1f8d785 fix: use packaging.version instead of distutils
distutils is dead
2024-02-06 12:04:53 +07:00
70eb0e9443 feat: add hi3 stub
HI3 isn't working atm, don't expect too much
2024-02-06 11:48:16 +07:00
ca03718bf1 chore: rename _path to path 2024-01-28 18:26:48 +07:00
9f9a63c9b0 feat(hsr): implement voicepack update 2024-01-28 17:11:16 +07:00
4c851b999a fix(hsr): proper implement --noconfirm 2024-01-25 20:18:56 +07:00
59 changed files with 4986 additions and 1905 deletions

52
.github/workflows/docs.yml vendored Normal file
View File

@ -0,0 +1,52 @@
name: Build & Deploy
on:
push:
pull_request:
workflow_dispatch:
jobs:
# Build job
build:
# Specify runner + build & upload the static files as an artifact
runs-on: ubuntu-latest
steps:
- name: Checkout code
id: checkout-code
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.13'
cache: 'pip' # caching pip dependencies
- name: Install dependencies
id: install
run: |
pipx install poetry --python $(which python)
poetry install --only docs
- name: Build docs
id: build
run: |
poetry run mkdocs build
- name: Upload static files as artifact
id: deployment
uses: actions/upload-pages-artifact@v3 # or specific "vX.X.X" version tag for this action
with:
path: site/
# Deployment job
deploy:
# Grant GITHUB_TOKEN the permissions required to make a Pages deployment
permissions:
pages: write # to deploy to Pages
id-token: write # to verify the deployment originates from an appropriate source
environment:
name: github-pages
url: ${{ steps.deployment.outputs.page_url }}
runs-on: ubuntu-latest
needs: build
steps:
- name: Deploy to GitHub Pages
id: deployment
uses: actions/deploy-pages@v4

View File

@ -4,6 +4,17 @@
An open-source launcher for anime games
## Documentation
https://tretrauit.me/vollerei/
## Installation
Assumming you have `pipx` installed, execute the following command:
```bash
pipx install git+https://git.tretrauit.me/tretrauit/vollerei --preinstall tqdm
```
## Why?
I've done [worthless-launcher](https://tretrauit.gitlab.io/worthless-launcher) for an open-world anime game,
@ -11,9 +22,33 @@ since I want to support other anime games and the launcher code is very messy, t
## Features
+ Nothing, I haven't written any code yet.
### Turn-based game / Open-world game / Proxy game
- [x] Cross-platform support
> Tested on Windows and Linux myself, although should work on most major OSes where `HDiffPatch` is supported.
- [x] Does *not* require administrator/root privileges
> Though if issues occur during installation/upgrading process, you can always try running the program with elevated privileges to fix them.
- [x] Download the game update (including pre-downloads if available)
- [x] Get the game version
- [x] Get installed voicepacks
- [x] Installation
- [x] Patch the game for unsupported platforms (with telemetry checking)
> Not applicable for the "Open-world game", since patching is unneeded to play the game.
- [x] Repair the game (Smarter than the official launcher!)
- [x] Update the game
- [x] Update voicepacks
- [ ] Uninstall the game (Just remove the game directory lol)
- [x] Voicepacks installation
#### Advanced features
- [x] Apply the update archives
- [x] Download the update archives
- [x] Easy to use API
### Notes
### Other games (Dreamseeker game)
I haven't developed for them yet, but since most of the code is shared I'll do that when I have the motivation to do so.
~~Help me get motivated by going to https://paypal.me/tretrauit and send me a coffee lol~~
## Notes
This launcher tries to mimic the official launcher behaviour as much as possible but if a ban appears, I will
not be responsible for it. (*Turn-based game* have a ban wave already, see AAGL discord for more info)

7
docs/abc/game.md Normal file
View File

@ -0,0 +1,7 @@
# Game
::: vollerei.abc.launcher.game.GameABC
handler: python
options:
show_root_heading: true
show_source: true

9
docs/common/functions.md Normal file
View File

@ -0,0 +1,9 @@
# Function for all games
Since you should use the specific implementation for the target game instead, these functions may not be correctly documented.
::: vollerei.common.functions
handler: python
options:
show_root_heading: true
show_source: true

View File

@ -0,0 +1,7 @@
# Game
::: vollerei.game.launcher.Game
handler: python
options:
show_root_heading: true
show_source: true

7
docs/genshin/game.md Normal file
View File

@ -0,0 +1,7 @@
# Game
::: vollerei.genshin.Game
handler: python
options:
show_root_heading: true
show_source: true

7
docs/hsr/game.md Normal file
View File

@ -0,0 +1,7 @@
# Game
::: vollerei.hsr.Game
handler: python
options:
show_root_heading: true
show_source: true

7
docs/hsr/patcher.md Normal file
View File

@ -0,0 +1,7 @@
# Patcher
::: vollerei.hsr.Patcher
handler: python
options:
show_root_heading: true
show_source: true

8
docs/hsr/patchtype.md Normal file
View File

@ -0,0 +1,8 @@
# PatchType
::: vollerei.hsr.PatchType
handler: python
options:
show_root_heading: true
show_source: true

17
docs/index.md Normal file
View File

@ -0,0 +1,17 @@
# Welcome to MkDocs
For full documentation visit [mkdocs.org](https://www.mkdocs.org).
## Commands
* `mkdocs new [dir-name]` - Create a new project.
* `mkdocs serve` - Start the live-reloading docs server.
* `mkdocs build` - Build the documentation site.
* `mkdocs -h` - Print help message and exit.
## Project layout
mkdocs.yml # The configuration file.
docs/
index.md # The documentation homepage.
... # Other markdown pages, images and other files.

7
docs/zzz/game.md Normal file
View File

@ -0,0 +1,7 @@
# Game
::: vollerei.zzz.Game
handler: python
options:
show_root_heading: true
show_source: true

38
mkdocs.yml Normal file
View File

@ -0,0 +1,38 @@
site_name: Vollerei
site_url: https://tretrauit.me/vollerei/
site_author: tretrauit
site_description: >-
Documentation for the Vollerei project, a Python library for managing and
interacting with HoYoVerse games.
repo_name: teppyboy/vollerei
repo_url: https://github.com/teppyboy/vollerei
# Copyright
copyright: Copyright © 2023 - 2024 tretrauit
theme:
name: material
palette:
- media: "(prefers-color-scheme)"
toggle:
icon: material/link
name: Switch to light mode
- media: "(prefers-color-scheme: light)"
scheme: default
primary: indigo
accent: indigo
toggle:
icon: material/toggle-switch
name: Switch to dark mode
- media: "(prefers-color-scheme: dark)"
scheme: slate
primary: black
accent: indigo
toggle:
icon: material/toggle-switch-off
name: Switch to system preference
plugins:
- search
- mkdocstrings

2465
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -11,14 +11,30 @@ python = "^3.11"
platformdirs = "^3.5.1"
requests = "^2.31.0"
cleo = "^2.1.0"
packaging = "^23.2"
py7zr = "^0.22.0"
multivolumefile = "^0.2.3"
[tool.poetry.group.cli]
optional = true
[tool.poetry.group.cli.dependencies]
tqdm = "^4.65.0"
[tool.poetry.group.dev]
optional = true
[tool.poetry.group.dev.dependencies]
pytest = "^7.3.1"
pre-commit = "^3.3.3"
[tool.poetry.group.cli.dependencies]
tqdm = "^4.65.0"
[tool.poetry.group.docs.dependencies]
mkdocstrings-python = "^1.12.2"
mkdocs-material = "^9.5.49"
[tool.poetry.scripts]
vollerei = 'vollerei.cli:run'
[build-system]
requires = ["poetry-core"]

View File

@ -2,6 +2,7 @@ from abc import ABC, abstractmethod
from os import PathLike
from pathlib import Path
from typing import Any
from vollerei.common.api import resource
class GameABC(ABC):
@ -10,6 +11,7 @@ class GameABC(ABC):
"""
path: Path
cache: Path
version_override: tuple[int, int, int] | None
channel_override: Any
@ -37,9 +39,6 @@ class GameABC(ABC):
Args:
game_path (PathLike, optional): Path to install the game to.
Returns:
None
"""
pass
@ -69,6 +68,44 @@ class GameABC(ABC):
"""
pass
def repair_file(
self, file: PathLike, pre_download: bool = False, game_info=None
) -> None:
"""
Repairs a game file.
This will automatically handle backup and restore the file if the repair
fails.
Args:
file (PathLike): The file to repair.
pre_download (bool): Whether to get the pre-download version.
Defaults to False.
"""
pass
def repair_files(
self,
files: list[PathLike],
pre_download: bool = False,
game_info: resource.Game = None,
) -> None:
"""
Repairs multiple game files.
This will automatically handle backup and restore the file if the repair
fails.
This method is not multi-threaded, so it may take a while to repair
multiple files.
Args:
files (PathLike): The files to repair.
pre_download (bool): Whether to get the pre-download version.
Defaults to False.
"""
pass
def get_version(self) -> tuple[int, int, int]:
"""
Get the game version
@ -77,15 +114,23 @@ class GameABC(ABC):
"""
pass
def get_update(self):
def get_version_config(self) -> tuple[int, int, int]:
"""
Get the game update
Gets the current installed game version from config.ini.
Using this is not recommended, as only official launcher creates
and uses this file, instead you should use `get_version()`.
This returns (0, 0, 0) if the version could not be found.
Returns:
tuple[int, int, int]: Game version.
"""
pass
def get_voiceover_update(self, language: str):
def get_update(self) -> resource.Patch | None:
"""
Get the voiceover update
Get the game update
"""
pass
@ -94,3 +139,18 @@ class GameABC(ABC):
Get the game channel
"""
pass
def get_remote_game(
self, pre_download: bool = False
) -> resource.Main | resource.PreDownload:
"""
Gets the current game information from remote.
Args:
pre_download (bool): Whether to get the pre-download version.
Defaults to False.
Returns:
A `Game` object that contains the game information.
"""
pass

View File

@ -1,9 +1,9 @@
from cleo.application import Application
from vollerei.cli import hsr
from vollerei.cli import commands
application = Application()
for command in hsr.commands:
application.add(command())
for command in commands.exports:
application.add(command)
def run():

982
vollerei/cli/commands.py Normal file
View File

@ -0,0 +1,982 @@
import copy
import traceback
from cleo.commands.command import Command
from cleo.helpers import option, argument
from pathlib import PurePath
from platform import system
from vollerei.abc.launcher.game import GameABC
from vollerei.common.api import resource
from vollerei.common.enums import GameChannel, VoicePackLanguage
from vollerei.cli import utils
from vollerei.exceptions.game import GameError
from vollerei.exceptions.patcher import PatcherError, PatchUpdateError
from vollerei.genshin import Game as GenshinGame
from vollerei.hsr import Game as HSRGame, Patcher as HSRPatcher
from vollerei.hsr.patcher import PatchType as HSRPatchType
from vollerei.zzz import Game as ZZZGame
from vollerei import paths
patcher = HSRPatcher()
default_options = [
option("channel", "c", description="Game channel", flag=False, default="overseas"),
option("force", "f", description="Force the command to run"),
option(
"game-path",
"g",
description="Path to the game installation",
flag=False,
default=".",
),
option("patch-type", "p", description="Patch type", flag=False),
option("temporary-path", "t", description="Temporary path", flag=False),
option("silent", "s", description="Silent mode"),
option("noconfirm", "y", description="Do not ask for confirmation (yes to all)"),
]
class State:
game: GameABC = None
def callback(
command: Command,
):
"""
Base callback for all commands
"""
game_path = command.option("game-path")
channel = command.option("channel")
silent = command.option("silent")
noconfirm = command.option("noconfirm")
temporary_path = command.option("temporary-path")
if isinstance(channel, str):
channel = GameChannel[channel.capitalize()]
elif isinstance(channel, int):
channel = GameChannel(channel)
if temporary_path:
paths.set_base_path(temporary_path)
if command.name.startswith("hsr"):
State.game = HSRGame(game_path, temporary_path)
patch_type = command.option("patch-type")
if patch_type is None:
patch_type = HSRPatchType.Jadeite
elif isinstance(patch_type, str):
patch_type = HSRPatchType[patch_type]
elif isinstance(patch_type, int):
patch_type = HSRPatchType(patch_type)
patcher.patch_type = patch_type
elif command.name.startswith("genshin"):
State.game = GenshinGame(game_path)
elif command.name.startswith("zzz"):
State.game = ZZZGame(game_path)
else:
raise ValueError("Invalid game type")
if channel:
State.game.channel_override = channel
utils.silent_message = silent
if noconfirm:
utils.no_confirm = noconfirm
def confirm(
question: str, default: bool = False, true_answer_regex: str = r"(?i)^y"
):
command.line(
f"<question>{question} (yes/no)</question> [<comment>{'yes' if default else 'no'}</comment>] y"
)
return True
command.confirm = confirm
command.add_style("warn", fg="yellow")
def set_version_config(self: Command):
self.line("Setting version config... ")
try:
State.game.set_version_config()
except Exception as e:
self.line_error(f"<warn>Couldn't set version config: {e}</warn>")
self.line_error(
"This won't affect the overall experience, but if you're using the official launcher"
)
self.line_error(
"you may have to edit the file 'config.ini' manually to reflect the latest version."
)
self.line(
f"The game has been updated to version: <comment>{State.game.get_version_str()}</comment>"
)
class VoicepackListInstalled(Command):
name = "hsr voicepack list-installed"
description = "Get the installed voicepacks"
options = default_options
def handle(self):
callback(command=self)
installed_voicepacks_str = [
f"<comment>{x.name}</comment>"
for x in State.game.get_installed_voicepacks()
]
self.line(f"Installed voicepacks: {', '.join(installed_voicepacks_str)}")
class VoicepackList(Command):
name = "hsr voicepack list"
description = "Get all available voicepacks"
options = default_options + [
option("pre-download", description="Pre-download the game if available"),
]
def handle(self):
callback(command=self)
pre_download = self.option("pre-download")
remote_game = State.game.get_remote_game(pre_download=pre_download)
available_voicepacks_str = [
f"<comment>{x.language.name} ({x.language.value})</comment>"
for x in remote_game.major.audio_pkgs
]
self.line(f"Available voicepacks: {', '.join(available_voicepacks_str)}")
class VoicepackInstall(Command):
name = "hsr voicepack install"
description = "Installs the specified installed voicepacks"
options = default_options + [
option("pre-download", description="Pre-download the game if available"),
]
arguments = [
argument(
"language", description="Languages to install", multiple=True, optional=True
)
]
def handle(self):
callback(command=self)
pre_download = self.option("pre-download")
# Typing manually because pylance detect it as Any
languages: list[str] = self.argument("language")
# Get installed voicepacks
language_objects = []
for language in languages:
language = language.lower()
try:
language_objects.append(VoicePackLanguage[language.capitalize()])
except KeyError:
try:
language_objects.append(VoicePackLanguage.from_remote_str(language))
except ValueError:
self.line_error(f"<error>Invalid language: {language}</error>")
if len(language_objects) == 0:
self.line_error(
"<error>No valid languages specified, you must specify a language to install</error>"
)
return
progress = utils.ProgressIndicator(self)
progress.start("Fetching install package information... ")
try:
game_info = State.game.get_remote_game(pre_download=pre_download)
except Exception as e:
progress.finish(
f"<error>Fetching failed with following error: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish(
"<comment>Installation information fetched successfully.</comment>"
)
if not self.confirm("Do you want to install the specified voicepacks?"):
self.line("<error>Installation aborted.</error>")
return
# Voicepack update
for remote_voicepack in game_info.major.audio_pkgs:
if remote_voicepack.language not in language_objects:
continue
self.line(
f"Downloading install package for language: <comment>{remote_voicepack.language.name}</comment>... "
)
archive_file = State.game.cache.joinpath(
PurePath(remote_voicepack.url).name
)
try:
download_result = utils.download(
remote_voicepack.url, archive_file, file_len=remote_voicepack.size
)
except Exception as e:
self.line_error(f"<error>Couldn't download package: {e}</error>")
return
if not download_result:
self.line_error("<error>Download failed.</error>")
return
self.line("Download completed.")
progress = utils.ProgressIndicator(self)
progress.start("Installing package...")
try:
State.game.install_archive(archive_file)
except Exception as e:
progress.finish(
f"<error>Couldn't apply package: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish(
f"<comment>Package applied for language {remote_voicepack.language.name}.</comment>"
)
self.line(
f"The voicepacks have been installed to version: <comment>{State.game.get_version_str()}</comment>"
)
class VoicepackUpdate(Command):
name = "hsr voicepack update"
description = (
"Updates the specified installed voicepacks, if not specified, updates all"
)
options = default_options + [
option(
"auto-repair", "R", description="Automatically repair the game if needed"
),
option("pre-download", description="Pre-download the game if available"),
option(
"from-version", description="Update from a specific version", flag=False
),
]
arguments = [
argument(
"language", description="Languages to update", multiple=True, optional=True
)
]
def handle(self):
callback(command=self)
auto_repair = self.option("auto-repair")
pre_download = self.option("pre-download")
from_version = self.option("from-version")
# Typing manually because pylance detect it as Any
languages: list[str] = self.argument("language")
if auto_repair:
self.line("<comment>Auto-repair is enabled.</comment>")
if from_version:
self.line(f"Updating from version: <comment>{from_version}</comment>")
State.game.version_override = from_version
# Get installed voicepacks
if len(languages) == 0:
self.line(
"<comment>No languages specified, updating all installed voicepacks...</comment>"
)
installed_voicepacks = State.game.get_installed_voicepacks()
if len(languages) > 0:
languages = [x.lower() for x in languages]
# Support both English and en-us and en
installed_voicepacks = [
x
for x in installed_voicepacks
if x.name.lower() in languages
or x.value.lower() in languages
or x.name.lower()[:2] in languages
]
installed_voicepacks_str = [
f"<comment>{str(x.name)}</comment>" for x in installed_voicepacks
]
self.line(f"Updating voicepacks: {', '.join(installed_voicepacks_str)}")
progress = utils.ProgressIndicator(self)
progress.start("Checking for updates... ")
try:
update_diff: resource.Patch | None = State.game.get_update(
pre_download=pre_download
)
game_info = State.game.get_remote_game(pre_download=pre_download)
except Exception as e:
progress.finish(
f"<error>Update checking failed with following error: {e} \n{traceback.format_exc()}</error>"
)
return
if update_diff is None:
progress.finish("<comment>Game is already updated.</comment>")
return
progress.finish("<comment>Update available.</comment>")
self.line(
f"The current version is: <comment>{State.game.get_version_str()}</comment>"
)
self.line(
f"The latest version is: <comment>{game_info.major.version}</comment>"
)
if not self.confirm("Do you want to update the game?"):
self.line("<error>Update aborted.</error>")
return
# Voicepack update
for remote_voicepack in update_diff.audio_pkgs:
if remote_voicepack.language not in installed_voicepacks:
continue
# Voicepack is installed, update it
archive_file = State.game.cache.joinpath(
PurePath(remote_voicepack.url).name
)
self.line(
f"Downloading update package for voicepack language '{remote_voicepack.language.name}'..."
)
try:
download_result = utils.download(
remote_voicepack.url, archive_file, file_len=remote_voicepack.size
)
except Exception as e:
self.line_error(f"<error>Couldn't download update: {e}</error>")
return
if not download_result:
self.line_error("<error>Download failed.</error>")
return
self.line("Download completed.")
progress = utils.ProgressIndicator(self)
progress.start("Applying update package...")
try:
State.game.apply_update_archive(
archive_file=archive_file, auto_repair=auto_repair
)
except Exception as e:
progress.finish(
f"<error>Couldn't apply update: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish(
f"<comment>Update applied for language {remote_voicepack.language.name}.</comment>"
)
State.game.version_override = game_info.major.version
set_version_config(self=self)
State.game.version_override = None
class PatchTypeCommand(Command):
name = "hsr patch type"
description = "Get the patch type of the game"
options = default_options
def handle(self):
callback(command=self)
self.line(f"Patch type: <comment>{patcher.patch_type.name}</comment>")
class UpdatePatchCommand(Command):
name = "hsr patch update"
description = "Updates the patch"
options = default_options
def handle(self):
callback(command=self)
progress = utils.ProgressIndicator(self)
progress.start("Updating patch... ")
try:
patcher.update_patch()
except PatchUpdateError as e:
progress.finish(
f"<error>Patch update failed with following error: {e} \n{traceback.format_exc()}</error>"
)
else:
progress.finish("<comment>Patch updated!</comment>")
class PatchInstallCommand(Command):
name = "hsr patch install"
description = "Installs the patch"
options = default_options
def jadeite(self):
progress = utils.ProgressIndicator(self)
progress.start("Installing patch... ")
try:
jadeite_dir = patcher.patch_game(game=State.game)
except PatcherError as e:
progress.finish(
f"<error>Patch installation failed with following error: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish("<comment>Patch installed!</comment>")
print()
exe_path = jadeite_dir.joinpath("jadeite.exe")
self.line(f"Jadeite executable is located at: <question>{exe_path}</question>")
self.line(
"You need to <warn>run the game using Jadeite</warn> to use the patch."
)
self.line(f'E.g: <question>{exe_path} "{State.game.path}"</question>')
print()
self.line(
"To activate the experimental patching method, set the environment variable BREAK_CATHACK=1"
)
self.line(
"Read more about it here: https://codeberg.org/mkrsym1/jadeite/issues/37"
)
print()
self.line(
"Please don't spread this project to public, we just want to play the game."
)
self.line(
"And for your own sake, please only <warn>use test accounts</warn>, as there is an <warn>extremely high risk of getting banned.</warn>"
)
def astra(self):
progress = utils.ProgressIndicator(self)
progress.start("Installing patch... ")
try:
patcher.patch_game(game=State.game)
except PatcherError as e:
progress.finish(
f"<error>Patch installation failed with following error: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish("<comment>Patch installed!</comment>")
self.line()
self.line(
"Please don't spread this project to public, we just want to play the game."
)
self.line(
"And for your own sake, please only use testing accounts, as there is an extremely high risk of getting banned."
)
def handle(self):
callback(command=self)
if system() == "Windows":
self.line(
"Windows is <comment>officialy supported</comment> by the game, so no patching is needed."
)
self.line(
"By patching the game, <warn>you are violating the ToS of the game.</warn>"
)
if not self.confirm("Do you want to patch the game?"):
self.line("<error>Patching aborted.</error>")
return
progress = utils.ProgressIndicator(self)
progress.start("Checking telemetry hosts... ")
telemetry_list = patcher.check_telemetry()
if telemetry_list:
progress.finish("<warn>Telemetry hosts were found.</warn>")
self.line("Below is the list of telemetry hosts that need to be blocked:")
print()
for host in telemetry_list:
self.line(f"{host}")
print()
self.line(
"To prevent the game from sending data about the patch, "
+ "we need to <comment>block these hosts.</comment>"
)
if not self.confirm("Do you want to block them?"):
self.line("<error>Patching aborted.</error>")
self.line(
"<error>Please block these hosts manually then try again.</error>"
)
return
try:
patcher.block_telemetry(telemetry_list=telemetry_list)
except Exception:
self.line_error(
f"<error>Couldn't block telemetry hosts: {traceback.format_exc()}</error>"
)
# There's a good reason for this.
if system() != "Windows":
self.line(
"<error>Cannot continue, please block them manually then try again.</error>"
)
return
self.line("<warn>Continuing anyway...</warn>")
else:
progress.finish("<comment>No telemetry hosts found.</comment>")
progress = utils.ProgressIndicator(self)
progress.start("Updating patch... ")
try:
patcher.update_patch()
except PatchUpdateError as e:
progress.finish(
f"<error>Patch update failed with following error: {e} \n{traceback.format_exc()}</error>"
)
else:
progress.finish("<comment>Patch updated.</comment>")
match patcher.patch_type:
case HSRPatchType.Jadeite:
self.jadeite()
case HSRPatchType.Astra:
self.astra()
PatchCommand = copy.deepcopy(PatchInstallCommand)
PatchCommand.name = "hsr patch"
class PatchTelemetryCommand(Command):
name = "hsr patch telemetry"
description = "Checks for telemetry hosts and block them."
options = default_options
def handle(self):
progress = utils.ProgressIndicator(self)
progress.start("Checking telemetry hosts... ")
telemetry_list = patcher.check_telemetry()
if telemetry_list:
progress.finish("<warn>Telemetry hosts were found.</warn>")
self.line("Below is the list of telemetry hosts that need to be blocked:")
print()
for host in telemetry_list:
self.line(f"{host}")
print()
self.line(
"To prevent the game from sending data about the patch, "
+ "we need to <comment>block these hosts.</comment>"
)
if not self.confirm("Do you want to block them?"):
self.line("<error>Blocking aborted.</error>")
return
try:
patcher.block_telemetry(telemetry_list=telemetry_list)
except Exception:
self.line_error(
f"<error>Couldn't block telemetry hosts: {traceback.format_exc()}</error>"
)
else:
progress.finish("<comment>No telemetry hosts found.</comment>")
class GetVersionCommand(Command):
name = "hsr version"
description = "Gets the local game version"
options = default_options
def handle(self):
callback(command=self)
try:
self.line(
f"<comment>Version:</comment> {'.'.join(str(x) for x in State.game.get_version())}"
)
except GameError as e:
self.line_error(f"<error>Couldn't get game version: {e}</error>")
class InstallCommand(Command):
name = "hsr install"
description = (
"Installs the latest version of the game to the specified path (default: current directory). "
+ "Note that this will not install the default voicepack (English), you need to install it manually."
)
options = default_options + [
option("pre-download", description="Pre-download the game if available"),
]
def handle(self):
callback(command=self)
pre_download = self.option("pre-download")
progress = utils.ProgressIndicator(self)
progress.start("Fetching install package information... ")
try:
game_info = State.game.get_remote_game(pre_download=pre_download)
except Exception as e:
progress.finish(
f"<error>Fetching failed with following error: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish(
"<comment>Installation information fetched successfully.</comment>"
)
if not self.confirm("Do you want to install the game?"):
self.line("<error>Installation aborted.</error>")
return
self.line("Downloading install package...")
first_pkg_out_path = None
for game_pkg in game_info.major.game_pkgs:
out_path = State.game.cache.joinpath(PurePath(game_pkg.url).name)
if not first_pkg_out_path:
first_pkg_out_path = out_path
try:
download_result = utils.download(
game_pkg.url, out_path, file_len=game_pkg.size
)
except Exception as e:
self.line_error(
f"<error>Couldn't download install package: {e}</error>"
)
return
if not download_result:
self.line_error("<error>Download failed.</error>")
return
self.line("Download completed.")
progress = utils.ProgressIndicator(self)
progress.start("Installing package...")
try:
State.game.install_archive(first_pkg_out_path)
except Exception as e:
progress.finish(
f"<error>Couldn't install package: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish("<comment>Package applied for the base game.</comment>")
self.line("Setting version config... ")
State.game.version_override = game_info.major.version
set_version_config(self=self)
State.game.version_override = None
self.line(
f"The game has been installed to version: <comment>{State.game.get_version_str()}</comment>"
)
class UpdateCommand(Command):
name = "hsr update"
description = "Updates the local game if available"
options = default_options + [
option(
"auto-repair", "R", description="Automatically repair the game if needed"
),
option("pre-download", description="Pre-download the game if available"),
option(
"from-version", description="Update from a specific version", flag=False
),
]
def handle(self):
callback(command=self)
auto_repair = self.option("auto-repair")
pre_download = self.option("pre-download")
from_version = self.option("from-version")
if auto_repair:
self.line("<comment>Auto-repair is enabled.</comment>")
if from_version:
self.line(f"Updating from version: <comment>{from_version}</comment>")
State.game.version_override = from_version
progress = utils.ProgressIndicator(self)
progress.start("Checking for updates... ")
try:
update_diff = State.game.get_update(pre_download=pre_download)
game_info = State.game.get_remote_game(pre_download=pre_download)
except Exception as e:
progress.finish(
f"<error>Update checking failed with following error: {e} \n{traceback.format_exc()}</error>"
)
return
if update_diff is None or isinstance(game_info.major, str | None):
progress.finish("<comment>Game is already updated.</comment>")
return
progress.finish("<comment>Update available.</comment>")
self.line(
f"The current version is: <comment>{State.game.get_version_str()}</comment>"
)
self.line(
f"The latest version is: <comment>{game_info.major.version}</comment>"
)
if not self.confirm("Do you want to update the game?"):
self.line("<error>Update aborted.</error>")
return
self.line("Downloading update package...")
update_game_url = update_diff.game_pkgs[0].url
out_path = State.game.cache.joinpath(PurePath(update_game_url).name)
try:
download_result = utils.download(
update_game_url, out_path, file_len=update_diff.game_pkgs[0].size
)
except Exception as e:
self.line_error(f"<error>Couldn't download update: {e}</error>")
return
if not download_result:
self.line_error("<error>Download failed.</error>")
return
self.line("Download completed.")
progress = utils.ProgressIndicator(self)
progress.start("Applying update package...")
try:
State.game.apply_update_archive(out_path, auto_repair=auto_repair)
except Exception as e:
progress.finish(
f"<error>Couldn't apply update: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish("<comment>Update applied for base game.</comment>")
# Get installed voicepacks
installed_voicepacks = State.game.get_installed_voicepacks()
# Voicepack update
for remote_voicepack in update_diff.audio_pkgs:
if remote_voicepack.language not in installed_voicepacks:
continue
# Voicepack is installed, update it
archive_file = State.game.cache.joinpath(
PurePath(remote_voicepack.url).name
)
self.line(
f"Downloading update package for voicepack language '{remote_voicepack.language.name}'..."
)
try:
download_result = utils.download(
remote_voicepack.url, archive_file, file_len=remote_voicepack.size
)
except Exception as e:
self.line_error(f"<error>Couldn't download update: {e}</error>")
return
if not download_result:
self.line_error("<error>Download failed.</error>")
return
self.line("Download completed.")
progress = utils.ProgressIndicator(self)
progress.start("Applying update package...")
try:
State.game.apply_update_archive(
archive_file=archive_file, auto_repair=auto_repair
)
except Exception as e:
progress.finish(
f"<error>Couldn't apply update: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish(
f"<comment>Update applied for language {remote_voicepack.language.name}.</comment>"
)
self.line("Setting version config... ")
State.game.version_override = game_info.major.version
set_version_config(self=self)
State.game.version_override = None
self.line(
f"The game has been updated to version: <comment>{State.game.get_version_str()}</comment>"
)
class RepairCommand(Command):
name = "hsr repair"
description = "Tries to repair the local game"
options = default_options
def handle(self):
callback(command=self)
self.line(
"This command will try to repair the game by downloading missing/broken files."
)
self.line(
"There will be no progress available, so please be patient and just wait."
)
if not self.confirm(
"Do you want to repair the game (this will take a long time!)?"
):
self.line("<error>Repairation aborted.</error>")
return
progress = utils.ProgressIndicator(self)
progress.start("Repairing game files (no progress available)... ")
try:
State.game.repair_game()
except Exception as e:
progress.finish(
f"<error>Repairation failed with following error: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish("<comment>Repairation completed.</comment>")
class InstallDownloadCommand(Command):
name = "hsr install download"
description = (
"Downloads the latest version of the game. "
+ "Note that this will not download the default voicepack (English), you need to download it manually."
)
options = default_options + [
option("pre-download", description="Pre-download the game if available"),
]
def handle(self):
callback(command=self)
pre_download = self.option("pre-download")
progress = utils.ProgressIndicator(self)
progress.start("Fetching install package information... ")
try:
game_info = State.game.get_remote_game(pre_download=pre_download)
except Exception as e:
progress.finish(
f"<error>Fetching failed with following error: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish(
"<comment>Installation information fetched successfully.</comment>"
)
if not self.confirm("Do you want to download the game?"):
self.line("<error>Download aborted.</error>")
return
self.line("Downloading install package...")
first_pkg_out_path = None
for game_pkg in game_info.major.game_pkgs:
out_path = State.game.cache.joinpath(PurePath(game_pkg.url).name)
if not first_pkg_out_path:
first_pkg_out_path = out_path
try:
download_result = utils.download(
game_pkg.url, out_path, file_len=game_pkg.size
)
except Exception as e:
self.line_error(
f"<error>Couldn't download install package: {e}</error>"
)
return
if not download_result:
self.line_error("<error>Download failed.</error>")
return
self.line("Download completed.")
class UpdateDownloadCommand(Command):
name = "hsr update download"
description = "Download the update for the local game if available"
options = default_options + [
option(
"auto-repair", "R", description="Automatically repair the game if needed"
),
option("pre-download", description="Pre-download the game if available"),
option(
"from-version", description="Update from a specific version", flag=False
),
]
def handle(self):
callback(command=self)
auto_repair = self.option("auto-repair")
pre_download = self.option("pre-download")
from_version = self.option("from-version")
if auto_repair:
self.line("<comment>Auto-repair is enabled.</comment>")
if from_version:
self.line(f"Updating from version: <comment>{from_version}</comment>")
State.game.version_override = from_version
progress = utils.ProgressIndicator(self)
progress.start("Checking for updates... ")
try:
update_diff = State.game.get_update(pre_download=pre_download)
game_info = State.game.get_remote_game(pre_download=pre_download)
except Exception as e:
progress.finish(
f"<error>Update checking failed with following error: {e} \n{traceback.format_exc()}</error>"
)
return
if update_diff is None or isinstance(game_info.major, str | None):
progress.finish("<comment>Game is already updated.</comment>")
return
progress.finish("<comment>Update available.</comment>")
self.line(
f"The current version is: <comment>{State.game.get_version_str()}</comment>"
)
self.line(
f"The latest version is: <comment>{game_info.major.version}</comment>"
)
if not self.confirm("Do you want to download the update?"):
self.line("<error>Download aborted.</error>")
return
self.line("Downloading update package...")
update_game_url = update_diff.game_pkgs[0].url
out_path = State.game.cache.joinpath(PurePath(update_game_url).name)
try:
download_result = utils.download(
update_game_url, out_path, file_len=update_diff.game_pkgs[0].size
)
except Exception as e:
self.line_error(
f"<error>Couldn't download update: {e} \n{traceback.format_exc()}</error>"
)
return
if not download_result:
self.line_error("<error>Download failed.</error>")
return
self.line("Download completed.")
# Get installed voicepacks
installed_voicepacks = State.game.get_installed_voicepacks()
# Voicepack update
for remote_voicepack in update_diff.audio_pkgs:
if remote_voicepack.language not in installed_voicepacks:
continue
# Voicepack is installed, update it
archive_file = State.game.cache.joinpath(
PurePath(remote_voicepack.url).name
)
self.line(
f"Downloading update package for voicepack language '{remote_voicepack.language.name}'..."
)
try:
download_result = utils.download(
remote_voicepack.url, archive_file, file_len=remote_voicepack.size
)
except Exception as e:
self.line_error(f"<error>Couldn't download update: {e}</error>")
return
if not download_result:
self.line_error("<error>Download failed.</error>")
return
self.line("Download completed.")
class ApplyInstallArchive(Command):
name = "hsr install apply-archive"
description = "Applies the install archive"
arguments = [argument("path", description="Path to the install archive")]
options = default_options
def handle(self):
callback(command=self)
install_archive = self.argument("path")
progress = utils.ProgressIndicator(self)
progress.start("Applying install package...")
try:
State.game.install_archive(install_archive)
except Exception as e:
progress.finish(
f"<error>Couldn't apply package: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish("<comment>Package applied.</comment>")
set_version_config(self=self)
class ApplyUpdateArchive(Command):
name = "hsr update apply-archive"
description = "Applies the update archive to the local game"
arguments = [argument("path", description="Path to the update archive")]
options = default_options + [
option(
"auto-repair", "R", description="Automatically repair the game if needed"
),
]
def handle(self):
callback(command=self)
update_archive = self.argument("path")
auto_repair = self.option("auto-repair")
progress = utils.ProgressIndicator(self)
progress.start("Applying update package...")
try:
State.game.apply_update_archive(update_archive, auto_repair=auto_repair)
except Exception as e:
progress.finish(
f"<error>Couldn't apply update: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish("<comment>Update applied.</comment>")
set_version_config(self=self)
# This is the list for HSR commands, we'll add Genshin commands later
classes = [
ApplyInstallArchive,
ApplyUpdateArchive,
GetVersionCommand,
InstallCommand,
InstallDownloadCommand,
PatchCommand,
PatchInstallCommand,
PatchTelemetryCommand,
PatchTypeCommand,
# RepairCommand,
UpdatePatchCommand,
UpdateCommand,
UpdateDownloadCommand,
VoicepackInstall,
VoicepackList,
VoicepackListInstalled,
VoicepackUpdate,
]
exports = []
for command in classes:
init_command = command()
exports.append(init_command)
if "patch" in command.name:
continue
command_name = command.name[4:]
genshin_init_command = copy.deepcopy(init_command)
genshin_init_command.name = f"genshin {command_name}"
exports.append(genshin_init_command)
zzz_init_command = copy.deepcopy(init_command)
zzz_init_command.name = f"zzz {command_name}"
exports.append(zzz_init_command)

View File

@ -1,386 +0,0 @@
from cleo.commands.command import Command
from cleo.helpers import option, argument
from copy import deepcopy
from platform import system
from vollerei.hsr.launcher.enums import GameChannel
from vollerei.cli import utils
from vollerei.exceptions.game import GameError
from vollerei.hsr import Game, Patcher
from vollerei.exceptions.patcher import PatcherError, PatchUpdateError
from vollerei.hsr.patcher import PatchType
patcher = Patcher()
default_options = [
option("channel", "c", description="Game channel", flag=False, default="overseas"),
option("force", "f", description="Force the command to run"),
option(
"game-path",
"g",
description="Path to the game installation",
flag=False,
default=".",
),
option("patch-type", "p", description="Patch type", flag=False),
option("temporary-path", "t", description="Temporary path", flag=False),
option("silent", "s", description="Silent mode"),
option("noconfirm", "y", description="Do not ask for confirmation"),
]
class State:
game: Game = None
def callback(
command: Command,
):
"""
Base callback for all commands
"""
game_path = command.option("game-path")
patch_type = command.option("patch-type")
channel = command.option("channel")
silent = command.option("silent")
noconfirm = command.option("noconfirm")
temporary_path = command.option("temporary-path")
if isinstance(channel, str):
channel = GameChannel[channel.capitalize()]
elif isinstance(channel, int):
channel = GameChannel(channel)
State.game: Game = Game(game_path, temporary_path)
if channel:
State.game.channel_override = channel
if patch_type is None:
patch_type = PatchType.Jadeite
elif isinstance(patch_type, str):
patch_type = PatchType[patch_type]
elif isinstance(patch_type, int):
patch_type = PatchType(patch_type)
patcher.patch_type = patch_type
utils.silent_message = silent
if noconfirm:
utils.no_confirm = noconfirm
command.add_style("warn", fg="yellow")
class PatchTypeCommand(Command):
name = "hsr patch type"
description = "Get the patch type of the game"
options = default_options
def handle(self):
callback(command=self)
self.line(f"<comment>Patch type:</comment> {patcher.patch_type.name}")
class UpdatePatchCommand(Command):
name = "hsr patch update"
description = "Updates the patch"
options = default_options
def handle(self):
callback(command=self)
progress = utils.ProgressIndicator(self)
progress.start("Updating patch... ")
try:
patcher.update_patch()
except PatchUpdateError as e:
progress.finish(
f"<error>Patch update failed with following error: {e} ({e.__context__})</error>"
)
else:
progress.finish("<comment>Patch updated!</comment>")
class PatchInstallCommand(Command):
name = "hsr patch install"
description = "Installs the patch"
options = default_options
def jadeite(self):
progress = utils.ProgressIndicator(self)
progress.start("Installing patch... ")
try:
jadeite_dir = patcher.patch_game(game=State.game)
except PatcherError as e:
progress.finish(
f"<error>Patch installation failed with following error: {e} ({e.__context__})</error>"
)
return
progress.finish("<comment>Patch installed!</comment>")
print()
exe_path = jadeite_dir.joinpath("jadeite.exe")
self.line(f"Jadeite executable is located at: <question>{exe_path}</question>")
self.line(
"You need to <warn>run the game using Jadeite</warn> to use the patch."
)
self.line(
f'E.g: <question>I_WANT_A_BAN=1 {exe_path} "{State.game.path}"</question>'
)
print()
self.line(
"Please don't spread this project to public, we just want to play the game."
)
self.line(
"And for your own sake, please only <warn>use test accounts</warn>, as there is an <warn>extremely high risk of getting banned.</warn>"
)
def astra(self):
progress = utils.ProgressIndicator(self)
progress.start("Installing patch... ")
try:
patcher.patch_game(game=State.game)
except PatcherError as e:
progress.finish(
f"<error>Patch installation failed with following error: {e} ({e.__context__})</error>"
)
return
progress.finish("<comment>Patch installed!</comment>")
self.line()
self.line(
"Please don't spread this project to public, we just want to play the game."
)
self.line(
"And for your own sake, please only use testing accounts, as there is an extremely high risk of getting banned."
)
def handle(self):
callback(command=self)
if system() == "Windows":
self.line(
"Windows is <comment>officialy supported</comment> by the game, so no patching is needed."
)
self.line(
"By patching the game, <warn>you are violating the ToS of the game.</warn>"
)
if not self.confirm("Do you want to patch the game?"):
self.line("<error>Patching aborted.</error>")
return
progress = utils.ProgressIndicator(self)
progress.start("Checking telemetry hosts... ")
telemetry_list = patcher.check_telemetry()
if telemetry_list:
progress.finish("<warn>Telemetry hosts were found.</warn>")
self.line("Below is the list of telemetry hosts that need to be blocked:")
print()
for host in telemetry_list:
self.line(f"{host}")
print()
self.line(
"To prevent the game from sending data about the patch, "
+ "we need to <comment>block these hosts.</comment>"
)
if not self.confirm("Do you want to block them?"):
self.line("<error>Patching aborted.</error>")
self.line(
"<error>Please block these hosts manually then try again.</error>"
)
return
try:
patcher.block_telemetry(telemetry_list=telemetry_list)
except Exception as e:
self.line_error(
f"<error>Couldn't block telemetry hosts: {e.__context__}</error>"
)
# There's a good reason for this.
if system() != "Windows":
self.line(
"<error>Cannot continue, please block them manually then try again.</error>"
)
return
self.line("<warn>Continuing anyway...</warn>")
else:
progress.finish("<comment>No telemetry hosts found.</comment>")
progress = utils.ProgressIndicator(self)
progress.start("Updating patch... ")
try:
patcher.update_patch()
except PatchUpdateError as e:
progress.finish(
f"<error>Patch update failed with following error: {e} ({e.__context__})</error>"
)
else:
progress.finish("<comment>Patch updated.</comment>")
match patcher.patch_type:
case PatchType.Jadeite:
self.jadeite()
case PatchType.Astra:
self.astra()
PatchCommand = deepcopy(PatchInstallCommand)
PatchCommand.name = "hsr patch"
class PatchTelemetryCommand(Command):
name = "hsr patch telemetry"
description = "Checks for telemetry hosts and block them."
options = default_options
def handle(self):
progress = utils.ProgressIndicator(self)
progress.start("Checking telemetry hosts... ")
telemetry_list = patcher.check_telemetry()
if telemetry_list:
progress.finish("<warn>Telemetry hosts were found.</warn>")
self.line("Below is the list of telemetry hosts that need to be blocked:")
print()
for host in telemetry_list:
self.line(f"{host}")
print()
self.line(
"To prevent the game from sending data about the patch, "
+ "we need to <comment>block these hosts.</comment>"
)
if not self.confirm("Do you want to block them?"):
self.line("<error>Blocking aborted.</error>")
return
try:
patcher.block_telemetry(telemetry_list=telemetry_list)
except Exception as e:
self.line_error(
f"<error>Couldn't block telemetry hosts: {e.__context__}</error>"
)
else:
progress.finish("<comment>No telemetry hosts found.</comment>")
class GetVersionCommand(Command):
name = "hsr version"
description = "Gets the local game version"
options = default_options
def handle(self):
callback(command=self)
try:
self.line(
f"<comment>Version:</comment> {'.'.join(str(x) for x in State.game.get_version())}"
)
except GameError as e:
self.line_error(f"<error>Couldn't get game version: {e}</error>")
class UpdateCommand(Command):
name = "hsr update"
description = "Updates the local game if available"
options = default_options + [
option(
"auto-repair", "R", description="Automatically repair the game if needed"
),
option("pre-download", description="Pre-download the game if available"),
]
def handle(self):
callback(command=self)
auto_repair = self.option("auto-repair")
pre_download = self.option("pre-download")
if auto_repair:
self.line("<comment>Auto-repair is enabled.</comment>")
progress = utils.ProgressIndicator(self)
progress.start("Checking for updates... ")
try:
update_diff = State.game.get_update(pre_download=pre_download)
game_info = State.game.get_remote_game(pre_download=pre_download)
except Exception as e:
progress.finish(
f"<error>Update checking failed with following error: {e} ({e.__context__})</error>"
)
return
if update_diff is None:
progress.finish("<comment>Game is already updated.</comment>")
return
progress.finish("<comment>Update available.</comment>")
self.line(
f"The current version is: <comment>{State.game.get_version_str()}</comment>"
)
self.line(
f"The latest version is: <comment>{game_info.latest.version}</comment>"
)
if not self.confirm("Do you want to update the game?"):
self.line("<error>Update aborted.</error>")
return
self.line("Downloading update package...")
out_path = State.game._cache.joinpath(update_diff.name)
try:
download_result = utils.download(
update_diff.path, out_path, file_len=update_diff.size
)
except Exception as e:
self.line_error(f"<error>Couldn't download update: {e}</error>")
return
if not download_result:
self.line_error("<error>Download failed.</error>")
return
self.line("Download completed.")
progress = utils.ProgressIndicator(self)
progress.start("Applying update package...")
try:
State.game.apply_update_archive(out_path, auto_repair=auto_repair)
except Exception as e:
progress.finish(
f"<error>Couldn't apply update: {e} ({e.__context__})</error>"
)
return
progress.finish("<comment>Update applied.</comment>")
self.line("Setting version config... ")
self.set_version_config()
self.line(
f"The game has been updated to version: <comment>{State.game.get_version_str()}</comment>"
)
class ApplyUpdateArchive(Command):
name = "hsr update apply-archive"
description = "Applies the update archive to the local game"
arguments = [argument("path", description="Path to the update archive")]
options = default_options + [
option(
"auto-repair", "R", description="Automatically repair the game if needed"
),
]
def handle(self):
callback(command=self)
auto_repair = self.option("auto-repair")
update_archive = self.argument("path")
if auto_repair:
self.line("<comment>Auto-repair is enabled.</comment>")
progress = utils.ProgressIndicator(self)
progress.start("Applying update package...")
try:
State.game.apply_update_archive(update_archive, auto_repair=auto_repair)
except Exception as e:
progress.finish(
f"<error>Couldn't apply update: {e} ({e.__context__})</error>"
)
return
progress.finish("<comment>Update applied.</comment>")
self.line("Setting version config... ")
try:
State.game.set_version_config()
except Exception as e:
self.line_error(f"<warn>Couldn't set version config: {e}</warn>")
self.line_error(
"This won't affect the overall experience, but if you're using the official launcher"
)
self.line_error(
"you may have to edit the file 'config.ini' manually to reflect the latest version."
)
self.line(
f"The game has been updated to version: <comment>{State.game.get_version_str()}</comment>"
)
commands = [
ApplyUpdateArchive,
GetVersionCommand,
PatchCommand,
PatchInstallCommand,
PatchTelemetryCommand,
PatchTypeCommand,
UpdatePatchCommand,
UpdateCommand,
]

View File

@ -47,6 +47,7 @@ class ProgressIndicator:
interval=interval, values=values
)
self.thread = Thread(target=self.auto_advance)
self.thread.daemon = True
def start(self, message: str):
"""
@ -73,9 +74,9 @@ def download(url, out: Path, file_len: int = None, overwrite: bool = False) -> b
out.touch()
# Streaming, so we can iterate over the response.
response = requests.get(url=url, headers=headers, stream=True)
response.raise_for_status()
if response.status_code == 416:
return
return True
response.raise_for_status()
# Sizes in bytes.
total_size = int(response.headers.get("content-length", 0))
block_size = 32768

View File

@ -1,4 +1,35 @@
from vollerei.common.api.resource import Resource
import requests
from vollerei.common.api import resource
from vollerei.common.enums import GameChannel
from vollerei.constants import LAUNCHER_API
__all__ = ["Resource"]
__all__ = ["GamePackage"]
def get_game_packages(
channel: GameChannel = GameChannel.Overseas,
) -> list[resource.GameInfo]:
"""
Get game packages information from the launcher API.
Default channel is overseas.
Args:
channel: Game channel to get the resource information from.
Returns:
Resource: Game resource information.
"""
resource_path: dict = None
match channel:
case GameChannel.Overseas:
resource_path = LAUNCHER_API.OS
case GameChannel.China:
resource_path = LAUNCHER_API.CN
return resource.from_dict(
requests.get(
resource_path["url"] + LAUNCHER_API.RESOURCE_PATH,
params=resource_path["params"],
).json()["data"]
)

View File

@ -1,406 +1,156 @@
"""
Class wrapper for API endpoint /resource
"""
from vollerei.common.enums import VoicePackLanguage
class Segment:
"""
A segment of the game archive.
Attributes:
path (str): Segment download path.
md5 (str): Segment md5 checksum.
package_size (int | None): Segment package size.
"""
path: str
md5: str
# str -> int and checked if int is 0 then None
package_size: int | None
def __init__(self, path: str, md5: str, package_size: int | None) -> None:
self.path = path
self.md5 = md5
self.package_size = package_size
@staticmethod
def from_dict(data: dict) -> "Segment":
return Segment(
data["path"],
data["md5"],
int(data["package_size"])
if data["package_size"] and data["package_size"] != "0"
else None,
)
class VoicePack:
"""
Voice pack information
`name` maybe converted from `path` if the server returns empty string.
Attributes:
language (VoicePackLanguage): Language of the voice pack.
name (str): Voice pack archive name.
path (str): Voice pack download path.
size (int): Voice pack size.
md5 (str): Voice pack md5 checksum.
package_size (int): Voice pack package size.
"""
language: VoicePackLanguage
name: str
path: str
# str -> int
size: int
md5: str
# str -> int
package_size: int
def __init__(
self,
language: VoicePackLanguage,
name: str,
path: str,
size: int,
md5: str,
package_size: int,
) -> None:
self.language = language
self.name = name
self.path = path
self.size = size
self.md5 = md5
self.package_size = package_size
@staticmethod
def from_dict(data: dict) -> "VoicePack":
return VoicePack(
VoicePackLanguage.from_remote_str(data["language"]),
data["name"],
data["path"],
int(data["size"]),
data["md5"],
int(data["package_size"]),
)
class Diff:
"""
Game resource diff from a version to latest information
Attributes:
TODO
"""
name: str
version: str
path: str
# str -> int
size: int
md5: str
is_recommended_update: bool
voice_packs: list[VoicePack]
# str -> int
package_size: int
def __init__(
self,
name: str,
version: str,
path: str,
size: int,
md5: str,
is_recommended_update: bool,
voice_packs: list[VoicePack],
package_size: int,
) -> None:
self.name = name
self.version = version
self.path = path
self.size = size
self.md5 = md5
self.is_recommended_update = is_recommended_update
self.voice_packs = voice_packs
self.package_size = package_size
@staticmethod
def from_dict(data: dict) -> "Diff":
return Diff(
data["name"],
data["version"],
data["path"],
int(data["size"]),
data["md5"],
data["is_recommended_update"],
[VoicePack.from_dict(i) for i in data["voice_packs"]],
int(data["package_size"]),
)
class Latest:
"""
Latest game resource information
`name` maybe converted from `path` if the server returns empty string,
and if `path` is empty too then it'll convert the name from the first
segment of `segments` list.
`path` maybe None if the server returns empty string, in that case
you'll have to download the game using `segments` list and merge them.
`voice_packs` will be empty for Star Rail, they force you to download
in-game instead.
`decompressed_path` is useful for repairing game files by only having
to re-download the corrupted files.
`segments` is a list of game archive segments, you'll have to download
them and merge them together to get the full game archive. Not available
on Star Rail.
Attributes:
name (str): Game archive name.
version (str): Game version in the archive.
path (str | None): Game archive download path.
size (int): Game archive size in bytes.
md5 (str): Game archive MD5 checksum.
entry (str): Game entry file (e.g. GenshinImpact.exe).
voice_packs (list[VoicePack]): Game voice packs.
decompressed_path (str | None): Game archive decompressed path.
segments (list[Segment]): Game archive segments.
package_size (int): Game archive package size in bytes.
"""
name: str
version: str
path: str | None
# str -> int
size: int
md5: str
entry: str
voice_packs: list[VoicePack]
# str but checked for empty string
decompressed_path: str | None
segments: list[Segment]
# str -> int
package_size: int
def __init__(
self,
name: str,
version: str,
path: str,
size: int,
md5: str,
entry: str,
voice_packs: list[VoicePack],
decompressed_path: str | None,
segments: list[Segment],
package_size: int,
) -> None:
self.name = name
self.version = version
self.path = path
self.size = size
self.md5 = md5
self.entry = entry
self.voice_packs = voice_packs
self.decompressed_path = decompressed_path
self.segments = segments
self.package_size = package_size
@staticmethod
def from_dict(data: dict) -> "Latest":
if data["name"] == "":
if data["path"] == "":
data["name"] = data["segments"][0]["path"].split("/")[-1]
else:
data["name"] = data["path"].split("/")[-1]
return Latest(
data["name"],
data["version"],
data["path"] if data["path"] != "" else None,
int(data["size"]),
data["md5"],
data["entry"],
[VoicePack.from_dict(i) for i in data["voice_packs"]],
data["decompressed_path"] if data["decompressed_path"] != "" else None,
[Segment.from_dict(i) for i in data["segments"]],
int(data["package_size"]),
)
from typing import Union
class Game:
latest: Latest
diffs: list[Diff]
def __init__(self, latest: Latest, diffs: list[Diff]) -> None:
self.latest = latest
self.diffs = diffs
def __init__(self, id: str, biz: str):
self.id = id
self.biz = biz
@staticmethod
def from_dict(data: dict) -> "Game":
return Game(
Latest.from_dict(data["latest"]), [Diff.from_dict(i) for i in data["diffs"]]
)
return Game(id=data["id"], biz=data["biz"])
class Plugin:
name: str
# str but checked for empty string
version: str | None
path: str
# str -> int
size: int
md5: str
# str but checked for empty string
entry: str | None
# str -> int
package_size: int
def __init__(
self,
name: str,
version: str | None,
path: str,
size: int,
md5: str,
entry: str | None,
package_size: int,
) -> None:
self.name = name
self.version = version
self.path = path
class GamePackage:
def __init__(self, url: str, md5: str, size: int, decompressed_size: int):
self.url = url
self.md5 = md5
self.size = size
self.md5 = md5
self.entry = entry
self.package_size = package_size
self.decompressed_size = decompressed_size
@staticmethod
def from_dict(data: dict) -> "Plugin":
return Plugin(
data["name"],
data["version"] if data["version"] != "" else None,
data["path"],
int(data["size"]),
data["md5"],
data["entry"] if data["entry"] != "" else None,
int(data["package_size"]),
)
def from_dict(data: list[dict]) -> list["GamePackage"]:
game_pkgs = []
for pkg in data:
game_pkgs.append(
GamePackage(
url=pkg["url"],
md5=pkg["md5"],
size=int(pkg["size"]),
decompressed_size=int(pkg["decompressed_size"]),
)
)
return game_pkgs
class LauncherPlugin:
plugins: list[Plugin]
# str -> int
version: int
def __init__(self, plugins: list[Plugin], version: int) -> None:
self.plugins = plugins
self.version = version
@staticmethod
def from_dict(data: dict) -> "LauncherPlugin":
return LauncherPlugin(
[Plugin.from_dict(i) for i in data["plugins"]], int(data["version"])
)
class DeprecatedPackage:
name: str
md5: str
def __init__(self, name: str, md5: str) -> None:
self.name = name
self.md5 = md5
@staticmethod
def from_dict(data: dict) -> "DeprecatedPackage":
return DeprecatedPackage(data["name"], data["md5"])
class DeprecatedFile:
path: str
# str but checked for empty string
md5: str | None
def __init__(self, path: str, md5: str | None) -> None:
self.path = path
self.md5 = md5
@staticmethod
def from_dict(data: dict) -> "DeprecatedFile":
return DeprecatedFile(data["path"], data["md5"] if data["md5"] != "" else None)
class Resource:
"""
Data class for /resource endpoint
I'm still unclear about `force_update` and `sdk` attributes, so I'll
leave them as None for now.
Attributes:
game (Game): Game resource information.
plugin (LauncherPlugin): Launcher plugin information.
web_url (str): Game official launcher web URL.
force_update (None): Not used by official launcher I guess?
pre_download_game (Game | None): Pre-download game resource information.
deprecated_packages (list[DeprecatedPackage]): Deprecated game packages.
sdk (None): Maybe for Bilibili version of Genshin?
deprecated_files (list[DeprecatedFile]): Deprecated game files.
"""
# I'm generous enough to convert the string into int
# for you guys, wtf Mihoyo?
game: Game
# ?? Mihoyo for plugin["plugins"] which is a list of Plugin objects
plugin: LauncherPlugin
web_url: str
# ?? Mihoyo
force_update: None
# Will be a Game object if a pre-download is available.
pre_download_game: Game | None
deprecated_packages: list[DeprecatedPackage]
# Maybe a SDK for Bilibili version in Genshin?
sdk: None
deprecated_files: list[DeprecatedFile]
class AudioPackage:
def __init__(
self,
game: Game,
plugin: Plugin,
web_url: str,
force_update: None,
pre_download_game: Game | None,
deprecated_packages: list[DeprecatedPackage],
sdk: None,
deprecated_files: list[DeprecatedFile],
) -> None:
self.game = game
self.plugin = plugin
self.web_url = web_url
self.force_update = force_update
self.pre_download_game = pre_download_game
self.deprecated_packages = deprecated_packages
self.sdk = sdk
self.deprecated_files = deprecated_files
language: VoicePackLanguage,
url: str,
md5: str,
size: int,
decompressed_size: int,
):
self.language = language
self.url = url
self.md5 = md5
self.size = size
self.decompressed_size = decompressed_size
@staticmethod
def from_dict(json: dict) -> "Resource":
return Resource(
Game.from_dict(json["game"]),
LauncherPlugin.from_dict(json["plugin"]),
json["web_url"],
json["force_update"],
Game.from_dict(json["pre_download_game"])
if json["pre_download_game"]
else None,
[DeprecatedPackage.from_dict(x) for x in json["deprecated_packages"]],
json["sdk"],
[DeprecatedFile.from_dict(x) for x in json["deprecated_files"]],
def from_dict(data: list[dict]) -> "AudioPackage":
audio_pkgs = []
for pkg in data:
audio_pkgs.append(
AudioPackage(
language=VoicePackLanguage.from_remote_str(pkg["language"]),
url=pkg["url"],
md5=pkg["md5"],
size=int(pkg["size"]),
decompressed_size=int(pkg["decompressed_size"]),
)
)
return audio_pkgs
class Major:
def __init__(
self,
version: str,
game_pkgs: list[GamePackage],
audio_pkgs: list[AudioPackage],
res_list_url: str,
):
self.version = version
self.game_pkgs = game_pkgs
self.audio_pkgs = audio_pkgs
self.res_list_url = res_list_url
@staticmethod
def from_dict(data: dict) -> "Major":
return Major(
version=data["version"],
game_pkgs=GamePackage.from_dict(data["game_pkgs"]),
audio_pkgs=AudioPackage.from_dict(data["audio_pkgs"]),
res_list_url=data["res_list_url"],
)
# Currently patch has the same fields as major
Patch = Major
class Main:
def __init__(self, major: Major, patches: list[Patch]):
self.major = major
self.patches = patches
@staticmethod
def from_dict(data: dict) -> "Main":
return Main(
major=Major.from_dict(data["major"]),
patches=[Patch.from_dict(x) for x in data["patches"]],
)
class PreDownload:
def __init__(self, major: Major | str | None, patches: list[Patch]):
self.major = major
self.patches = patches
# Union to fix the typing issue.
@staticmethod
def from_dict(data: dict | None) -> Union["PreDownload", None]:
# pre_download can be null in the server for certain games
# e.g. HI3:
# "pre_download": null
# while in GI it is the following:
# "pre_download": {
# "major": null,
# "patches": []
# }
if data is None:
return None
return PreDownload(
major=(
data["major"]
if isinstance(data["major"], str | None)
else Major.from_dict(data["major"])
),
patches=[Patch.from_dict(x) for x in data["patches"]],
)
# Why miHoYo uses the same name "game_packages" for this big field and smol field
class GameInfo:
def __init__(self, game: Game, main: Main, pre_download: PreDownload):
self.game = game
self.main = main
self.pre_download = pre_download
@staticmethod
def from_dict(data: dict) -> "GameInfo":
return GameInfo(
game=Game.from_dict(data["game"]),
main=Main.from_dict(data["main"]),
pre_download=PreDownload.from_dict(data["pre_download"]),
)
def from_dict(data: dict) -> list[GameInfo]:
game_pkgs = []
for pkg in data["game_packages"]:
game_pkgs.append(GameInfo.from_dict(pkg))
return game_pkgs

View File

@ -1,6 +1,18 @@
from enum import Enum
class GameType(Enum):
Genshin = 0
HSR = 1
ZZZ = 3
HI3 = 4
class GameChannel(Enum):
Overseas = 0
China = 1
class VoicePackLanguage(Enum):
Japanese = "ja-jp"
Chinese = "zh-cn"
@ -25,3 +37,23 @@ class VoicePackLanguage(Enum):
return VoicePackLanguage.English
else:
raise ValueError(f"Invalid language string: {s}")
@staticmethod
def from_zzz_name(s: str) -> "VoicePackLanguage":
"""
Converts a language string from ZZZ file name to a VoicePackLanguage enum.
Only English is tested for now.
"""
if s == "Jp":
return VoicePackLanguage.Japanese
elif s == "Cn":
return VoicePackLanguage.Chinese
elif s == "Tw":
return VoicePackLanguage.Taiwanese
elif s == "Kr":
return VoicePackLanguage.Korean
elif s == "En":
return VoicePackLanguage.English
else:
raise ValueError(f"Invalid language string: {s}")

View File

@ -1,15 +1,50 @@
import concurrent.futures
from io import IOBase
import json
from pathlib import Path
import hashlib
import multivolumefile
import py7zr
import zipfile
from io import IOBase
from os import PathLike
from pathlib import Path
from shutil import move
from vollerei.abc.launcher.game import GameABC
from vollerei.utils import HDiffPatch, HPatchZPatchError
from vollerei.common.api import resource
from vollerei.exceptions.game import (
RepairError,
GameNotInstalledError,
ScatteredFilesNotAvailableError,
)
from vollerei.utils import HDiffPatch, HPatchZPatchError, download
_hdiff = HDiffPatch()
def _extract_files(
archive: py7zr.SevenZipFile | zipfile.ZipFile, files, path: PathLike
):
if isinstance(archive, py7zr.SevenZipFile):
# .7z archive
archive.extract(path, files)
else:
# .zip archive
archive.extractall(path, files)
def _open_archive(file: Path | IOBase) -> py7zr.SevenZipFile | zipfile.ZipFile:
archive: py7zr.SevenZipFile | zipfile.ZipFile = None
try:
archive = py7zr.SevenZipFile(file, "r")
except py7zr.exceptions.Bad7zFile:
# Try to open it as a zip file
try:
archive = zipfile.ZipFile(file, "r")
except zipfile.BadZipFile:
raise ValueError("Archive is not a valid 7z or zip file.")
return archive
def apply_update_archive(
game: GameABC, archive_file: Path | IOBase, auto_repair: bool = True
) -> None:
@ -18,7 +53,8 @@ def apply_update_archive(
voicepack update.
Because this function is shared for all games, you should use the game's
`apply_update_archive()` method instead.
`apply_update_archive()` method instead, which additionally applies required
methods for that game.
"""
# Most code here are copied from worthless-launcher.
# worthless-launcher uses asyncio for multithreading while this one uses
@ -28,25 +64,45 @@ def apply_update_archive(
# Install HDiffPatch
_hdiff.hpatchz()
# Open archive
archive = zipfile.ZipFile(archive_file, "r")
def reset_if_py7zr(archive):
if isinstance(archive, py7zr.SevenZipFile):
archive.reset()
archive = _open_archive(archive_file)
# Get files list (we don't want to extract all of them)
files = archive.namelist()
# Don't extract these files (they're useless and if the game isn't patched then
# it'll raise 31-4xxx error in Genshin)
for file in ["deletefiles.txt", "hdifffiles.txt"]:
for file in ["deletefiles.txt", "hdifffiles.txt", "hdiffmap.json"]:
try:
files.remove(file)
except ValueError:
pass
# Think for me a better name for this variable
txtfiles = None
if isinstance(archive, py7zr.SevenZipFile):
txtfiles = archive.read(["deletefiles.txt", "hdifffiles.txt", "hdiffmap.json"])
# Reset archive to extract files
archive.reset()
try:
# miHoYo loves CRLF
deletefiles = archive.read("deletefiles.txt").decode().split("\r\n")
except IOError:
if txtfiles is not None:
deletebytes = txtfiles["deletefiles.txt"].read()
else:
deletebytes = archive.read("deletefiles.txt")
if deletebytes is not str:
# Typing
deletebytes: bytes
deletebytes = deletebytes.decode()
deletefiles = deletebytes.split("\r\n")
except (IOError, KeyError):
pass
else:
for file_str in deletefiles:
file = game.path.joinpath(file)
file = game.path.joinpath(file_str)
if file == game.path:
# Don't delete the game folder
continue
@ -58,63 +114,89 @@ def apply_update_archive(
# hdiffpatch implementation
# Read hdifffiles.txt to get the files to patch
hdifffiles = []
for x in archive.read("hdifffiles.txt").decode().split("\r\n"):
try:
hdifffiles.append(json.loads(x.strip())["remoteName"])
except json.JSONDecodeError:
pass
# Hdifffile format is [(source file, target file)]
# While the patch file is named as target file + ".hdiff"
hdifffiles: list[tuple[str, str]] = []
new_hdiff_map = False
if txtfiles is not None:
old_hdiff_map = txtfiles.get("hdifffiles.txt")
if old_hdiff_map is not None:
hdiffbytes = old_hdiff_map.read()
else:
new_hdiff_map = True
hdiffbytes = txtfiles["hdiffmap.json"].read()
else:
# Archive file must be a zip file
if zipfile.Path(archive).joinpath("hdifffiles.txt").is_file():
hdiffbytes = archive.read("hdifffiles.txt")
else:
new_hdiff_map = True
hdiffbytes = archive.read("hdiffmap.json")
if hdiffbytes is not str:
# Typing
hdiffbytes: bytes
hdiffbytes = hdiffbytes.decode()
if new_hdiff_map:
mapping = json.loads(hdiffbytes)
for diff in mapping["diff_map"]:
hdifffiles.append((diff["source_file_name"], diff["target_file_name"]))
else:
for x in hdiffbytes.split("\r\n"):
try:
name = json.loads(x.strip())["remoteName"]
hdifffiles.append((name, name))
except json.JSONDecodeError:
pass
# Patch function
def extract_and_patch(file, patch_file):
patchpath = game._cache.joinpath(patch_file)
# Delete old patch file if exists
patchpath.unlink(missing_ok=True)
# Extract patch file
archive.extract(patch_file, game.temppath)
file = file.rename(file.with_suffix(file.suffix + ".bak"))
def patch(source_file: Path, target_file: Path, patch_file: str):
patch_path = game.cache.joinpath(patch_file)
# Spaghetti code :(, fuck my eyes.
bak_src_file = source_file.rename(
source_file.with_suffix(source_file.suffix + ".bak")
)
try:
_hdiff.patch_file(file, file.with_suffix(""), patchpath)
_hdiff.patch_file(bak_src_file, target_file, patch_path)
except HPatchZPatchError:
if auto_repair:
try:
game.repair_file(game.path.joinpath(file.with_suffix("")))
# The game repairs file by downloading the latest file, in this case we want the target file
# instead of source file. Honestly I haven't tested this but I hope it works.
game.repair_file(target_file)
except Exception:
# Let the game download the file.
file.rename(file.with_suffix(""))
bak_src_file.rename(file.with_suffix(""))
else:
file.unlink()
bak_src_file.unlink()
else:
# Let the game download the file.
file.rename(file.with_suffix(""))
bak_src_file.rename(file.with_suffix(""))
return
else:
# Remove old file, since we don't need it anymore.
bak_src_file.unlink()
finally:
patchpath.unlink()
# Remove old file, since we don't need it anymore.
file.unlink()
def extract_or_repair(file):
# Extract file
try:
archive.extract(file, game.path)
except Exception as e:
# Repair file
if not auto_repair:
raise e
game.repair_file(game.path.joinpath(file))
patch_path.unlink()
# Multi-threaded patching
patch_jobs = []
for file_str in hdifffiles:
file = game.path.joinpath(file_str)
if not file.exists():
patch_files = []
for source_file, target_file in hdifffiles:
source_path = game.path.joinpath(source_file)
if not source_path.exists():
# Not patching since we don't have the file
continue
patch_file: str = file_str + ".hdiff"
target_path = game.path.joinpath(target_file)
patch_file: str = target_file + ".hdiff"
# Remove hdiff files from files list to extract
files.remove(patch_file)
patch_jobs.append([extract_and_patch, [file, patch_file]])
# Add file to extract list
patch_files.append(patch_file)
patch_jobs.append([patch, [source_path, target_path, patch_file]])
# Extract patch files to temporary dir
_extract_files(archive, patch_files, game.cache)
reset_if_py7zr(archive) # For the next extraction
# Create new ThreadPoolExecutor for patching
patch_executor = concurrent.futures.ThreadPoolExecutor()
for job in patch_jobs:
@ -122,13 +204,200 @@ def apply_update_archive(
patch_executor.shutdown(wait=True)
# Extract files from archive after we have filtered out the patch files
# Using ThreadPoolExecutor instead of archive.extractall() because
# archive.extractall() can crash with large archives, and it doesn't
# handle broken files.
extract_executor = concurrent.futures.ThreadPoolExecutor()
for file in files:
extract_executor.submit(extract_or_repair, file)
extract_executor.shutdown(wait=True)
_extract_files(archive, files, game.path)
# Close the archive
archive.close()
def install_archive(game: GameABC, archive_file: Path | IOBase) -> None:
"""
Applies an install archive to the game, it can be the game itself or a
voicepack one.
Args:
game (GameABC): The game to install the archive for.
archive_file (Path | IOBase): The archive file to install, if it's a
split archive then this is the first part.
Because this function is shared for all games, you should use the game's
`install_archive()` method instead, which additionally applies required
methods for that game.
"""
archive: py7zr.SevenZipFile | zipfile.ZipFile = None
archive_path = Path(archive_file)
target_archive = None
if archive_path.suffix == ".001":
archive_path_merged = archive_path.with_suffix("")
target_archive = multivolumefile.open(archive_path_merged, mode='rb')
if archive_path_merged.suffix == ".zip":
# .zip split archive
archive = zipfile.ZipFile(target_archive, "r")
else:
archive = py7zr.SevenZipFile(target_archive, "r")
else:
archive = _open_archive(archive_file)
archive.extractall(game.path)
archive.close()
if target_archive:
target_archive.close()
def _repair_file(game: GameABC, file: PathLike, game_info: resource.Main) -> None:
# .replace("\\", "/") is needed because Windows uses backslashes :)
relative_file = file.relative_to(game.path)
url = game_info.major.res_list_url + "/" + str(relative_file).replace("\\", "/")
# Backup the file
if file.exists():
backup_file = file.with_suffix(file.suffix + ".bak")
if backup_file.exists():
backup_file.unlink()
file.rename(backup_file)
dest_file = file.with_suffix("")
else:
dest_file = file
try:
# Download the file
temp_file = game.cache.joinpath(relative_file)
dest_file.parent.mkdir(parents=True, exist_ok=True)
print(f"Downloading repair file {url} to {temp_file}")
download(url, temp_file, overwrite=True, stream=True)
# Move the file
move(temp_file, dest_file)
print("OK")
except Exception as e:
# Restore the backup
print("Failed", e)
if file.exists():
file.rename(file.with_suffix(""))
raise e
# Delete the backup
if file.exists():
file.unlink(missing_ok=True)
def repair_files(
game: GameABC,
files: list[PathLike],
pre_download: bool = False,
game_info: resource.Game = None,
) -> None:
"""
Repairs multiple game files.
This will automatically handle backup and restore the file if the repair
fails.
Args:
game (GameABC): The game to repair the files for.
files (PathLike): The files to repair.
pre_download (bool): Whether to get the pre-download version.
Defaults to False.
"""
if not game.is_installed():
raise GameNotInstalledError("Game is not installed.")
files_path = [Path(file) for file in files]
for file in files_path:
if not file.is_relative_to(game.path):
raise ValueError("File is not in the game folder.")
if not game_info:
game_info = game.get_remote_game(pre_download=pre_download)
if game_info.latest.decompressed_path is None:
raise ScatteredFilesNotAvailableError("Scattered files are not available.")
executor = concurrent.futures.ThreadPoolExecutor()
for file in files_path:
executor.submit(_repair_file, file, game=game_info)
# self._repair_file(file, game=game)
executor.shutdown(wait=True)
def repair_game(
game: GameABC,
pre_download: bool = False,
) -> None:
"""
Tries to repair the game by reading "pkg_version" file and downloading the
mismatched files from the server.
Because this function is shared for all games, you should use the game's
`repair_game()` method instead, which additionally applies required
methods for that game.
"""
# Most code here are copied from worthless-launcher.
# worthless-launcher uses asyncio for multithreading while this one uses
# ThreadPoolExecutor, probably better for this use case.
if not game.is_installed():
raise GameNotInstalledError("Game is not installed.")
game_info = game.get_remote_game(pre_download=pre_download)
pkg_version_file = game.path.joinpath("pkg_version")
pkg_version: dict[str, dict[str, str]] = {}
if not pkg_version_file.is_file():
try:
game.repair_file(game.path.joinpath("pkg_version"), game_info=game_info)
except Exception as e:
raise RepairError(
"pkg_version file not found, most likely you need to download the full game again."
) from e
with pkg_version_file.open("r") as f:
for line in f.readlines():
line = line.strip()
if not line:
continue
line_json = json.loads(line)
pkg_version[line_json["remoteName"]] = {
"md5": line_json["md5"],
"fileSize": line_json["fileSize"],
}
read_needed_files: list[Path] = []
target_files: list[Path] = []
repair_executor = concurrent.futures.ThreadPoolExecutor()
for file in game.path.rglob("*"):
# Ignore webCaches folder (because it's user data)
if file.is_dir():
continue
if "webCaches" in str(file):
continue
def verify(file_path: Path):
nonlocal target_files
nonlocal pkg_version
relative_path = file_path.relative_to(game.path)
relative_path_str = str(relative_path).replace("\\", "/")
# print(relative_path_str)
# Wtf mihoyo, you build this game for Windows and then use Unix path separator :moyai:
try:
target_file = pkg_version.pop(relative_path_str)
if target_file:
with file_path.open("rb", buffering=0) as f:
file_hash = hashlib.file_digest(f, "md5").hexdigest()
if file_hash == target_file["md5"]:
return
print(
f"Hash mismatch for {target_file['remoteName']} ({file_hash}; expected {target_file['md5']})"
)
target_files.append(file_path)
except KeyError:
# File not found in pkg_version
read_needed_files.append(file_path)
repair_executor.submit(verify, file)
repair_executor.shutdown(wait=True)
for file in read_needed_files:
try:
with file.open("rb", buffering=0) as f:
# We only need to read 4 bytes to see if the file is readable or not
f.read(4)
except Exception:
print(f"File '{file}' is corrupted.")
target_files.append(file)
# value not used for now
for key, _ in pkg_version.items():
target_file = game.path.joinpath(key)
if target_file.is_file():
continue
print(f"{key} not found.")
target_files.append(target_file)
if not target_files:
return
print("Begin repairing files...")
game.repair_files(target_files, game_info=game_info)

View File

@ -1,3 +1,21 @@
class LAUNCHER_API:
"""Launcher API constants."""
RESOURCE_PATH: str = "hyp/hyp-connect/api/getGamePackages"
OS: dict = {
"url": "https://sg-hyp-api.hoyoverse.com/",
"params": {
"launcher_id": "VYTpXlbWo8",
},
}
CN: dict = {
"url": "https://hyp-api.mihoyo.com/",
"params": {
"launcher_id": "jGHBHlcOq1",
},
}
TELEMETRY_HOSTS = [
# Global
"log-upload-os.hoyoverse.com",

View File

@ -25,7 +25,13 @@ class GameAlreadyInstalledError(GameError):
pass
class ScatteredFilesNotAvailableError(GameError):
class RepairError(GameError):
"""Error occurred while repairing the game."""
pass
class ScatteredFilesNotAvailableError(RepairError):
"""Scattered files are not available."""
pass

View File

@ -0,0 +1,86 @@
from vollerei.common.enums import GameChannel
from vollerei.abc.launcher.game import GameABC
from vollerei.exceptions.game import GameNotInstalledError
def get_channel(game: GameABC) -> GameChannel:
"""
Gets the current game channel.
Returns:
GameChannel: The current game channel.
"""
if game.channel_override:
return game.channel_override
if not game.is_installed():
raise GameNotInstalledError("Game path is not set.")
if game.path.joinpath("YuanShen.exe").is_file():
return GameChannel.China
return GameChannel.Overseas
def get_version(game: GameABC) -> tuple[int, int, int]:
"""
Gets the current installed game version.
Credits to An Anime Team for the code that does the magic:
https://github.com/an-anime-team/anime-game-core/blob/main/src/games/genshin/game.rs#L52
If the above method fails, it'll fallback to read the config.ini file
for the version, which is not recommended (as described in
`get_version_config()` docs)
This returns (0, 0, 0) if the version could not be found
(usually indicates the game is not installed), and in fact `is_installed()` uses
this method to check if the game is installed too.
Returns:
tuple[int, int, int]: The version as a tuple of integers.
"""
data_file = game.data_folder().joinpath("globalgamemanagers")
if not data_file.exists():
return game.get_version_config()
def bytes_to_int(byte_array: list[bytes]) -> int:
bytes_as_int = int.from_bytes(byte_array, byteorder="big")
actual_int = bytes_as_int - 48 # 48 is the ASCII code for 0
return actual_int
version_bytes: list[list[bytes]] = [[], [], []]
version_ptr = 0
correct = True
try:
with data_file.open("rb") as f:
f.seek(4000)
for byte in f.read(10000):
match byte:
case 0:
version_bytes = [[], [], []]
version_ptr = 0
correct = True
case 46:
version_ptr += 1
if version_ptr > 2:
correct = False
case 95:
if (
correct
and len(version_bytes[0]) > 0
and len(version_bytes[1]) > 0
and len(version_bytes[2]) > 0
):
return (
bytes_to_int(version_bytes[0]),
bytes_to_int(version_bytes[1]),
bytes_to_int(version_bytes[2]),
)
case _:
if correct and byte in b"0123456789":
version_bytes[version_ptr].append(byte)
else:
correct = False
except Exception:
pass
# Fallback to config.ini
return game.get_version_config()

View File

@ -0,0 +1,15 @@
MD5SUMS = {
"1.0.5": {
"cn": {
"StarRailBase.dll": "66c42871ce82456967d004ccb2d7cf77",
"UnityPlayer.dll": "0c866c44bb3752031a8c12ffe935b26f",
},
"os": {
"StarRailBase.dll": "8aa3790aafa3dd176678392f3f93f435",
"UnityPlayer.dll": "f17b9b7f9b8c9cbd211bdff7771a80c2",
},
}
}
# Patches
ASTRA_REPO = "https://notabug.org/mkrsym1/astra"
JADEITE_REPO = "https://codeberg.org/mkrsym1/jadeite/"

View File

@ -0,0 +1,98 @@
from hashlib import md5
from vollerei.common.enums import GameChannel
from vollerei.abc.launcher.game import GameABC
from vollerei.game.hsr.constants import MD5SUMS
def get_channel(game: GameABC) -> GameChannel | None:
"""
Gets the current game channel.
Only works for Star Rail version 1.0.5, other versions will return the
overridden channel or GameChannel.Overseas if no channel is overridden.
This is not needed for game patching, since the patcher will automatically
detect the channel.
Returns:
GameChannel: The current game channel.
"""
version = game.version_override or game.get_version()
if version == (1, 0, 5):
for channel, v in MD5SUMS["1.0.5"].values():
for file, md5sum in v.values():
if md5(game.path.joinpath(file).read_bytes()).hexdigest() != md5sum:
continue
match channel:
case "cn":
return GameChannel.China
case "os":
return GameChannel.Overseas
return None
def get_version(game: GameABC) -> tuple[int, int, int]:
"""
Gets the current installed game version.
Credits to An Anime Team for the code that does the magic:
https://github.com/an-anime-team/anime-game-core/blob/main/src/games/star_rail/game.rs#L49
If the above method fails, it'll fallback to read the config.ini file
for the version, which is not recommended (as described in
`get_version_config()` docs)
This returns (0, 0, 0) if the version could not be found
(usually indicates the game is not installed), and in fact `is_installed()` uses
this method to check if the game is installed too.
Returns:
tuple[int, int, int]: The version as a tuple of integers.
"""
data_file = game.data_folder().joinpath("data.unity3d")
if not data_file.exists():
return game.get_version_config()
def bytes_to_int(byte_array: list[bytes]) -> int:
bytes_as_int = int.from_bytes(byte_array, byteorder="big")
actual_int = bytes_as_int - 48 # 48 is the ASCII code for 0
return actual_int
version_bytes: list[list[bytes]] = [[], [], []]
version_ptr = 0
correct = True
try:
with data_file.open("rb") as f:
f.seek(0x7D0) # 2000 in decimal
for byte in f.read(10000):
match byte:
case 0:
version_bytes = [[], [], []]
version_ptr = 0
correct = True
case 46:
version_ptr += 1
if version_ptr > 2:
correct = False
case 38:
if (
correct
and len(version_bytes[0]) > 0
and len(version_bytes[1]) > 0
and len(version_bytes[2]) > 0
):
return (
bytes_to_int(version_bytes[0]),
bytes_to_int(version_bytes[1]),
bytes_to_int(version_bytes[2]),
)
case _:
if correct and byte in b"0123456789":
version_bytes[version_ptr].append(byte)
else:
correct = False
except Exception:
pass
# Fallback to config.ini
return game.get_version_config()

View File

@ -0,0 +1,5 @@
# Re-exports
from vollerei.game.launcher.manager import Game
__all__ = ["Game"]

View File

@ -0,0 +1,32 @@
from vollerei.common.api import get_game_packages, resource
from vollerei.common.enums import GameChannel, GameType
def get_game_package(
game_type: GameType, channel: GameChannel = GameChannel.Overseas
) -> resource.GameInfo:
"""
Get game package information from the launcher API.
Doesn't work with HI3 but well, we haven't implemented anything for that game yet.
Default channel is overseas.
Args:
channel: Game channel to get the resource information from.
Returns:
GameInfo: Game resource information.
"""
find_str: str
match game_type:
case GameType.HSR:
find_str = "hkrpg"
case GameType.Genshin:
find_str = "hk4e"
case GameType.ZZZ:
find_str = "nap"
game_packages = get_game_packages(channel=channel)
for package in game_packages:
if find_str in package.game.biz:
return package

View File

@ -0,0 +1,510 @@
from configparser import ConfigParser
from io import IOBase
from os import PathLike
from pathlib import Path, PurePath
from vollerei.abc.launcher.game import GameABC
from vollerei.common import ConfigFile, functions
from vollerei.common.api import resource
from vollerei.common.enums import GameType, VoicePackLanguage, GameChannel
from vollerei.exceptions.game import (
GameAlreadyUpdatedError,
GameNotInstalledError,
PreDownloadNotAvailable,
)
from vollerei.game.launcher import api
from vollerei.game.hsr import functions as hsr_functions
from vollerei.game.genshin import functions as genshin_functions
from vollerei.game.zzz import functions as zzz_functions
from vollerei import paths
from vollerei.utils import download
class Game(GameABC):
"""
Manages the game installation
For Star Rail and Zenless Zone Zero:
Since channel detection isn't implemented yet, most functions assume you're
using the overseas version of the game. You can override channel by setting
the property `channel_override` to the channel you want to use.
"""
def __init__(
self, game_type: GameType, path: PathLike = None, cache_path: PathLike = None
):
self._path: Path | None = Path(path) if path else None
if not cache_path:
cache_path = paths.cache_path
cache_path = Path(cache_path)
self._game_type = game_type
self.cache: Path = cache_path.joinpath(f"game/{self._game_type.name.lower()}/")
self.cache.mkdir(parents=True, exist_ok=True)
self._version_override: tuple[int, int, int] | None = None
self._channel_override: GameChannel | None = None
@property
def version_override(self) -> tuple[int, int, int] | None:
"""
Overrides the game version.
This can be useful if you want to override the version of the game
and additionally working around bugs.
"""
return self._version_override
@version_override.setter
def version_override(self, version: tuple[int, int, int] | str | None):
if isinstance(version, str):
version = tuple(int(i) for i in version.split("."))
self._version_override = version
@property
def channel_override(self) -> GameChannel | None:
"""
Overrides the game channel.
Because game channel detection isn't implemented yet, you may need
to use this for some functions to work.
This can be useful if you want to override the channel of the game
and additionally working around bugs.
"""
return self._channel_override
@channel_override.setter
def channel_override(self, channel: GameChannel | str | None):
if isinstance(channel, str):
channel = GameChannel[channel]
self._channel_override = channel
@property
def path(self) -> Path | None:
"""
Paths to the game folder.
"""
return self._path
@path.setter
def path(self, path: PathLike):
self._path = Path(path)
def data_folder(self) -> Path:
"""
Paths to the game data folder.
Returns:
Path: The path to the game data folder.
"""
try:
match self._game_type:
case GameType.Genshin:
match self.get_channel():
case GameChannel.China:
return self._path.joinpath("YuanShen_Data")
case GameChannel.Overseas:
return self._path.joinpath("GenshinImpact_Data")
case GameType.HSR:
return self._path.joinpath("StarRail_Data")
case GameType.ZZZ:
return self._path.joinpath("ZenlessZoneZero_Data")
except AttributeError:
raise GameNotInstalledError("Game path is not set.")
def is_installed(self) -> bool:
"""
Checks if the game is installed.
Returns:
bool: True if the game is installed, False otherwise.
"""
if self._path is None:
return False
match self._game_type:
case GameType.Genshin:
match self.get_channel():
case GameChannel.China:
if not self._path.joinpath("YuanShen.exe").exists():
return False
case GameChannel.Overseas:
if not self._path.joinpath("GenshinImpact.exe").exists():
return False
case GameType.HSR:
if not self._path.joinpath("StarRail.exe").exists():
return False
case GameType.ZZZ:
if not self._path.joinpath("ZenlessZoneZero.exe").exists():
return False
if not self.data_folder().is_dir():
return False
if self.get_version() == (0, 0, 0):
return False
return True
def get_channel(self) -> GameChannel:
"""
Gets the current game channel.
Only works for Genshin and Star Rail version 1.0.5, other versions will return
the overridden channel or GameChannel.Overseas if no channel is overridden.
This is not needed for game patching, since the patcher will automatically
detect the channel.
Returns:
GameChannel: The current game channel.
"""
match self._game_type:
case GameType.HSR:
return (
hsr_functions.get_channel(self)
or self._channel_override
or GameChannel.Overseas
)
case GameType.Genshin:
return (
genshin_functions.get_channel(self)
or self._channel_override
or GameChannel.Overseas
)
case _:
return self._channel_override or GameChannel.Overseas
def get_version_config(self) -> tuple[int, int, int]:
"""
Gets the current installed game version from config.ini.
Using this is not recommended, as only official launcher creates
and uses this file, instead you should use `get_version()`.
This returns (0, 0, 0) if the version could not be found.
Returns:
tuple[int, int, int]: Game version.
"""
cfg_file = self._path.joinpath("config.ini")
if not cfg_file.exists():
return (0, 0, 0)
cfg = ConfigFile(cfg_file)
# Fk u miHoYo
if "general" in cfg.sections():
version_str = cfg.get("general", "game_version", fallback="0.0.0")
elif "General" in cfg.sections():
version_str = cfg.get("General", "game_version", fallback="0.0.0")
else:
return (0, 0, 0)
if version_str.count(".") != 2:
return (0, 0, 0)
try:
version = tuple(int(i) for i in version_str.split("."))
except Exception:
return (0, 0, 0)
return version
def set_version_config(self):
"""
Sets the current installed game version to config.ini.
Only works for the global version of the game, not the Chinese one (since I don't have
them installed to test).
This method is meant to keep compatibility with the official launcher only.
"""
cfg_file = self._path.joinpath("config.ini")
if cfg_file.exists():
cfg = ConfigFile(cfg_file)
cfg.set("general", "game_version", self.get_version_str())
cfg.save()
else:
cfg_dict = {
"general": {
"channel": 1,
"cps": "hyp_hoyoverse",
"game_version": self.get_version_str(),
"sub_channel": 0,
# This probably should be fetched from the server but well
"plugin_n06mjyc2r3_version": "1.1.0",
"uapc": None, # Honestly what's this?
}
}
match self._game_type:
case GameType.Genshin:
cfg_dict["general"]["uapc"] = {
"hk4e_global": {"uapc": "f55586a8ce9f_"},
"hyp": {"uapc": "f55586a8ce9f_"},
}
case GameType.HSR:
cfg_dict["general"]["uapc"] = {
"hkrpg_global": {"uapc": "f5c7c6262812_"},
"hyp": {"uapc": "f55586a8ce9f_"},
}
case GameType.ZZZ:
cfg_dict["general"]["uapc"] = {
"nap_global": {"uapc": "f55586a8ce9f_"},
"hyp": {"uapc": "f55586a8ce9f_"},
}
cfg = ConfigParser()
cfg.read_dict(cfg_dict)
cfg.write(cfg_file.open("w"))
def get_version(self) -> tuple[int, int, int]:
"""
Gets the current installed game version.
Credits to An Anime Team for the code that does the magic, see the source
in `hsr/functions.py`, `genshin/functions.py` and `zzz/functions.py` for more info
If the above method fails, it'll fallback to read the config.ini file
for the version, which is not recommended (as described in
`get_version_config()` docs)
This returns (0, 0, 0) if the version could not be found
(usually indicates the game is not installed), and in fact `is_installed()` uses
this method to check if the game is installed too.
Returns:
tuple[int, int, int]: The version as a tuple of integers.
"""
match self._game_type:
case GameType.HSR:
return hsr_functions.get_version(self)
case GameType.Genshin:
return genshin_functions.get_version(self)
case GameType.ZZZ:
return zzz_functions.get_version(self)
case _:
return self.get_version_config()
def get_version_str(self) -> str:
"""
Gets the current installed game version as a string.
Because this method uses `get_version()`, you should read the docs of
that method too.
Returns:
str: The version as a string.
"""
return ".".join(str(i) for i in self.get_version())
def get_installed_voicepacks(self) -> list[VoicePackLanguage]:
"""
Gets the installed voicepacks.
Returns:
list[VoicePackLanguage]: A list of installed voicepacks.
"""
if not self.is_installed():
raise GameNotInstalledError("Game is not installed.")
voicepacks = []
blacklisted_words = ["SFX"]
audio_package: Path
match self._game_type:
case GameType.Genshin:
audio_package = self.data_folder().joinpath(
"StreamingAssets/AudioAssets/"
)
if audio_package.joinpath("AudioPackage").is_dir():
audio_package = audio_package.joinpath("AudioPackage")
case GameType.HSR:
audio_package = self.data_folder().joinpath(
"Persistent/Audio/AudioPackage/Windows/"
)
case GameType.ZZZ:
audio_package = self.data_folder().joinpath(
"StreamingAssets/Audio/Windows/Full/"
)
for child in audio_package.iterdir():
if child.resolve().is_dir() and child.name not in blacklisted_words:
name = child.name
if name.startswith("English"):
name = "English"
voicepack: VoicePackLanguage
try:
if self._game_type == GameType.ZZZ:
voicepack = VoicePackLanguage.from_zzz_name(child.name)
else:
voicepack = VoicePackLanguage[name]
voicepacks.append(voicepack)
except (ValueError, KeyError):
pass
return voicepacks
def get_remote_game(
self, pre_download: bool = False
) -> resource.Main | resource.PreDownload:
"""
Gets the current game information from remote.
Args:
pre_download (bool): Whether to get the pre-download version.
Defaults to False.
Returns:
A `Main` or `PreDownload` object that contains the game information.
"""
channel = self._channel_override or self.get_channel()
if pre_download:
game = api.get_game_package(
game_type=self._game_type, channel=channel
).pre_download
if not game:
raise PreDownloadNotAvailable("Pre-download version is not available.")
return game
return api.get_game_package(game_type=self._game_type, channel=channel).main
def get_update(self, pre_download: bool = False) -> resource.Patch | None:
"""
Gets the current game update.
Args:
pre_download (bool): Whether to get the pre-download version.
Defaults to False.
Returns:
A `Patch` object that contains the update information or
`None` if the game is not installed or already up-to-date.
"""
if not self.is_installed():
return None
version = (
".".join(str(x) for x in self._version_override)
if self._version_override
else self.get_version_str()
)
for patch in self.get_remote_game(pre_download=pre_download).patches:
if patch.version == version:
return patch
return None
def repair_file(
self,
file: PathLike,
pre_download: bool = False,
game_info: resource.Game = None,
) -> None:
"""
Repairs a game file.
This will automatically handle backup and restore the file if the repair
fails.
Args:
file (PathLike): The file to repair.
pre_download (bool): Whether to get the pre-download version.
Defaults to False.
"""
return self.repair_files([file], pre_download=pre_download, game_info=game_info)
def repair_files(
self,
files: list[PathLike],
pre_download: bool = False,
game_info: resource.Game = None,
) -> None:
"""
Repairs multiple game files.
This will automatically handle backup and restore the file if the repair
fails.
Args:
files (PathLike): The files to repair.
pre_download (bool): Whether to get the pre-download version.
Defaults to False.
game_info (resource.Game): The game information to use for repair.
"""
functions.repair_files(
self, files, pre_download=pre_download, game_info=game_info
)
def repair_game(self) -> None:
"""
Tries to repair the game by reading "pkg_version" file and downloading the
mismatched files from the server.
"""
functions.repair_game(self)
def install_archive(self, archive_file: PathLike | IOBase) -> None:
"""
Applies an install archive to the game, it can be the game itself or a
voicepack one.
`archive_file` can be a path to the archive file or a file-like object,
like if you have very high amount of RAM and want to download the archive
to memory instead of disk, this can be useful for you.
Args:
archive_file (PathLike | IOBase): The archive file.
"""
if not isinstance(archive_file, IOBase):
archive_file = Path(archive_file)
functions.install_archive(self, archive_file)
def apply_update_archive(
self, archive_file: PathLike | IOBase, auto_repair: bool = True
) -> None:
"""
Applies an update archive to the game, it can be the game update or a
voicepack update.
`archive_file` can be a path to the archive file or a file-like object,
like if you have very high amount of RAM and want to download the update
to memory instead of disk, this can be useful for you.
`auto_repair` is used to determine whether to repair the file if it's
broken. If it's set to False, then it'll raise an exception if the file
is broken.
Args:
archive_file (PathLike | IOBase): The archive file.
auto_repair (bool, optional): Whether to repair the file if it's broken.
Defaults to True.
"""
if not self.is_installed():
raise GameNotInstalledError("Game is not installed.")
if not isinstance(archive_file, IOBase):
archive_file = Path(archive_file)
# Hello hell again, dealing with HDiffPatch and all the things again.
functions.apply_update_archive(self, archive_file, auto_repair=auto_repair)
def install_update(
self, update_info: resource.Patch = None, auto_repair: bool = True
):
"""
Installs an update from a `Patch` object.
You may want to download the update manually and pass it to
`apply_update_archive()` instead for better control, and after that
execute `set_version_config()` to set the game version.
Args:
update_info (Diff, optional): The update information. Defaults to None.
auto_repair (bool, optional): Whether to repair the file if it's broken.
Defaults to True.
"""
if not self.is_installed():
raise GameNotInstalledError("Game is not installed.")
if not update_info:
update_info = self.get_update()
if not update_info or update_info.version == self.get_version_str():
raise GameAlreadyUpdatedError("Game is already updated.")
update_url = update_info.game_pkgs[0].url
# Base game update
archive_file = self.cache.joinpath(PurePath(update_url).name)
download(update_url, archive_file)
self.apply_update_archive(archive_file=archive_file, auto_repair=auto_repair)
# Get installed voicepacks
installed_voicepacks = self.get_installed_voicepacks()
# Voicepack update
for remote_voicepack in update_info.audio_pkgs:
if remote_voicepack.language not in installed_voicepacks:
continue
# Voicepack is installed, update it
archive_file = self.cache.joinpath(PurePath(remote_voicepack.url).name)
download(remote_voicepack.url, archive_file)
self.apply_update_archive(
archive_file=archive_file, auto_repair=auto_repair
)
self.set_version_config()

223
vollerei/game/patcher.py Normal file
View File

@ -0,0 +1,223 @@
from enum import Enum
from shutil import copy2, rmtree
from packaging import version
from vollerei.abc.patcher import PatcherABC
from vollerei.common import telemetry
from vollerei.exceptions.game import GameNotInstalledError
from vollerei.exceptions.patcher import (
VersionNotSupportedError,
PatcherError,
PatchUpdateError,
)
from vollerei.game.launcher.manager import Game, GameChannel
from vollerei.utils import download_and_extract, Git, Xdelta3
from vollerei.paths import tools_data_path
from vollerei.hsr.constants import ASTRA_REPO, JADEITE_REPO
class PatchType(Enum):
"""
Patch type
Astra: The old patch which patch the game directly (not recommended).
Jadeite: The new patch which patch the game in memory by DLL injection.
"""
Astra = 0
Jadeite = 1
class Patcher(PatcherABC):
"""
Patch helper for HSR and HI3.
By default this will use Jadeite as it is maintained and more stable.
"""
def __init__(self, patch_type: PatchType = PatchType.Jadeite):
self._patch_type: PatchType = patch_type
self._path = tools_data_path.joinpath("patcher")
self._path.mkdir(parents=True, exist_ok=True)
self._jadeite = self._path.joinpath("jadeite")
self._astra = self._path.joinpath("astra")
self._git = Git()
self._xdelta3 = Xdelta3()
@property
def patch_type(self) -> PatchType:
"""
Patch type, can be either Astra or Jadeite
"""
return self._patch_type
@patch_type.setter
def patch_type(self, value: PatchType):
self._patch_type = value
def _update_astra(self):
self._git.pull_or_clone(ASTRA_REPO, self._astra)
def _update_jadeite(self):
release_info = self._git.get_latest_release(JADEITE_REPO)
file = self._git.get_latest_release_dl(release_info)[0]
file_version = release_info["tag_name"][1:] # Remove "v" prefix
current_version = None
if self._jadeite.joinpath("version").exists():
with open(self._jadeite.joinpath("version"), "r") as f:
current_version = f.read()
if current_version:
if version.parse(file_version) <= version.parse(current_version):
return
download_and_extract(file, self._jadeite)
with open(self._jadeite.joinpath("version"), "w") as f:
f.write(file_version)
def update_patch(self):
"""
Update the patch
"""
try:
match self._patch_type:
case PatchType.Astra:
self._update_astra()
case PatchType.Jadeite:
self._update_jadeite()
except Exception as e:
raise PatchUpdateError("Failed to update patch.") from e
def _patch_astra(self, game: Game):
if game.get_version() != (1, 0, 5):
raise VersionNotSupportedError(
"Only version 1.0.5 is supported by Astra patch."
)
self._update_astra()
file_type = None
match game.get_channel():
case GameChannel.China:
file_type = "cn"
case GameChannel.Overseas:
file_type = "os"
# Backup and patch
for file in ["UnityPlayer.dll", "StarRailBase.dll"]:
game.path.joinpath(file).rename(game.path.joinpath(f"{file}.bak"))
self._xdelta3.patch_file(
self._astra.joinpath(f"{file_type}/diffs/{file}.vcdiff"),
game.path.joinpath(f"{file}.bak"),
game.path.joinpath(file),
)
# Copy files
for file in self._astra.joinpath(f"{file_type}/files/").rglob("*"):
if file.suffix == ".bat":
continue
if file.is_dir():
game.path.joinpath(
file.relative_to(self._astra.joinpath(f"{file_type}/files/"))
).mkdir(parents=True, exist_ok=True)
copy2(
file,
game.path.joinpath(
file.relative_to(self._astra.joinpath(f"{file_type}/files/"))
),
)
def _patch_jadeite(self):
"""
"Patch" the game with Jadeite patch.
Unlike Astra patch, Jadeite patch does not modify the game files directly
but uses DLLs to patch the game in memory and it has an injector to do that
automatically.
"""
self._update_jadeite()
return self._jadeite
def _unpatch_astra(self, game: Game):
if game.get_version() != (1, 0, 5):
raise VersionNotSupportedError(
"Only version 1.0.5 is supported by Astra patch."
)
self._update_astra()
file_type = None
match game.get_channel():
case GameChannel.China:
file_type = "cn"
case GameChannel.Overseas:
file_type = "os"
# Restore
for file in ["UnityPlayer.dll", "StarRailBase.dll"]:
if game.path.joinpath(f"{file}.bak").exists():
game.path.joinpath(file).unlink()
game.path.joinpath(f"{file}.bak").rename(game.path.joinpath(file))
# Remove files
for file in self._astra.joinpath(f"{file_type}/files/").rglob("*"):
if file.suffix == ".bat":
continue
file_rel = file.relative_to(self._astra.joinpath(f"{file_type}/files/"))
game_path = game.path.joinpath(file_rel)
if game_path.is_file():
game_path.unlink()
elif game_path.is_dir():
try:
game_path.rmdir()
except OSError:
pass
def _unpatch_jadeite(self):
rmtree(self._jadeite, ignore_errors=True)
def patch_game(self, game: Game):
"""
Patch the game
If you use Jadeite (by default), this will just download Jadeite files
and won't actually patch the game because Jadeite will do that automatically.
Args:
game (Game): The game to patch
"""
if not game.is_installed():
raise PatcherError(GameNotInstalledError("Game is not installed"))
match self._patch_type:
case PatchType.Astra:
self._patch_astra(game)
case PatchType.Jadeite:
return self._patch_jadeite()
def unpatch_game(self, game: Game):
"""
Unpatch the game
If you use Jadeite (by default), this will just delete Jadeite files.
Note that Honkai Impact 3rd uses Jadeite too, so executing this will
delete the files needed by both games.
Args:
game (Game): The game to unpatch
"""
if not game.is_installed():
raise PatcherError(GameNotInstalledError("Game is not installed"))
match self._patch_type:
case PatchType.Astra:
self._unpatch_astra(game)
case PatchType.Jadeite:
self._unpatch_jadeite()
def check_telemetry(self) -> list[str]:
"""
Check if telemetry servers are accessible by the user
Returns:
list[str]: A list of telemetry servers that are accessible
"""
return telemetry.check_telemetry()
def block_telemetry(self, telemetry_list: list[str] = None):
"""
Block the telemetry servers
If telemetry_list is not provided, it will be checked automatically.
Args:
telemetry_list (list[str], optional): A list of telemetry servers to block.
"""
telemetry.block_telemetry(telemetry_list)

View File

@ -0,0 +1,66 @@
from vollerei.abc.launcher.game import GameABC
def get_version(game: GameABC) -> tuple[int, int, int]:
"""
Gets the current installed game version.
Credits to An Anime Team for the code that does the magic:
https://github.com/an-anime-team/anime-game-core/blob/main/src/games/zzz/game.rs#L49
If the above method fails, it'll fallback to read the config.ini file
for the version, which is not recommended (as described in
`get_version_config()` docs)
This returns (0, 0, 0) if the version could not be found
(usually indicates the game is not installed), and in fact `is_installed()` uses
this method to check if the game is installed too.
Returns:
tuple[int, int, int]: The version as a tuple of integers.
"""
data_file = game.data_folder().joinpath("globalgamemanagers")
if not data_file.exists():
return game.get_version_config()
def bytes_to_int(byte_array: list[bytes]) -> int:
bytes_as_int = int.from_bytes(byte_array, byteorder="big")
actual_int = bytes_as_int - 48 # 48 is the ASCII code for 0
return actual_int
version_bytes: list[list[bytes]] = [[], [], []]
version_ptr = 0
correct = True
try:
with data_file.open("rb") as f:
f.seek(4000)
for byte in f.read(10000):
match byte:
case 0:
if (
correct
and len(version_bytes[0]) > 0
and len(version_bytes[1]) > 0
and len(version_bytes[2]) > 0
):
found_version = tuple(
bytes_to_int(i) for i in version_bytes
)
return found_version
version_bytes = [[], [], []]
version_ptr = 0
correct = True
case b".":
version_ptr += 1
if version_ptr > 2:
correct = False
case _:
if correct and byte in b"0123456789":
version_bytes[version_ptr].append(byte)
else:
correct = False
except Exception:
pass
# Fallback to config.ini
return game.get_version_config()

View File

@ -0,0 +1,5 @@
# Re-exports
from vollerei.genshin.launcher import Game
__all__ = ["Game"]

View File

@ -0,0 +1,5 @@
# Re-exports
from vollerei.genshin.launcher.game import Game
__all__ = ["Game"]

View File

@ -0,0 +1,20 @@
from vollerei.common.api import get_game_packages, resource
from vollerei.common.enums import GameChannel
def get_game_package(channel: GameChannel = GameChannel.Overseas) -> resource.GameInfo:
"""
Get game package information from the launcher API.
Default channel is overseas.
Args:
channel: Game channel to get the resource information from.
Returns:
Resource: Game resource information.
"""
game_packages = get_game_packages(channel=channel)
for package in game_packages:
if "hk4e" in package.game.biz:
return package

View File

@ -0,0 +1,12 @@
from os import PathLike
from vollerei.game.launcher.manager import Game as CommonGame
from vollerei.common.enums import GameType
class Game(CommonGame):
"""
Manages the game installation
"""
def __init__(self, path: PathLike = None, cache_path: PathLike = None):
super().__init__(GameType.Genshin, path, cache_path)

View File

@ -0,0 +1,73 @@
from configparser import ConfigParser
from io import IOBase
from os import PathLike
from pathlib import Path
from vollerei.abc.launcher.game import GameABC
from vollerei.common import ConfigFile, functions
from vollerei.common.api import resource
from vollerei.common.enums import VoicePackLanguage
from vollerei.exceptions.game import (
GameAlreadyUpdatedError,
GameNotInstalledError,
PreDownloadNotAvailable,
ScatteredFilesNotAvailableError,
)
from vollerei.hi3.launcher.enums import GameChannel
from vollerei.hsr.launcher import api
from vollerei import paths
from vollerei.utils import download
class Game(GameABC):
"""
Manages the game installation
Since channel detection isn't implemented yet, most functions assume you're
using the overseas version of the game. You can override channel by setting
the property `channel_override` to the channel you want to use.
"""
def __init__(self, path: PathLike = None, cache_path: PathLike = None):
self._path: Path | None = Path(path) if path else None
if not cache_path:
cache_path = paths.cache_path
cache_path = Path(cache_path)
self.cache: Path = cache_path.joinpath("game/hi3/")
self.cache.mkdir(parents=True, exist_ok=True)
self._version_override: tuple[int, int, int] | None = None
self._channel_override: GameChannel | None = None
@property
def version_override(self) -> tuple[int, int, int] | None:
"""
Overrides the game version.
This can be useful if you want to override the version of the game
and additionally working around bugs.
"""
return self._version_override
@version_override.setter
def version_override(self, version: tuple[int, int, int] | str | None):
if isinstance(version, str):
version = tuple(int(i) for i in version.split("."))
self._version_override = version
@property
def channel_override(self) -> GameChannel | None:
"""
Overrides the game channel.
Because game channel detection isn't implemented yet, you may need
to use this for some functions to work.
This can be useful if you want to override the channel of the game
and additionally working around bugs.
"""
return self._channel_override
@channel_override.setter
def channel_override(self, channel: GameChannel | str | None):
if isinstance(channel, str):
channel = GameChannel[channel]
self._channel_override = channel

6
vollerei/hi3/patcher.py Normal file
View File

@ -0,0 +1,6 @@
from vollerei.hsr.patcher import Patcher, PatchType
# Re-exports Patcher and PatchType from HSR because they use the same patcher
# which is Jadeite.
__all__ = ["Patcher", "PatchType"]

View File

@ -1,6 +1,6 @@
# Re-exports
from vollerei.hsr.patcher import Patcher, PatchType
from vollerei.hsr.launcher import Game, GameChannel
from vollerei.hsr.launcher import Game
__all__ = ["Patcher", "PatchType", "Game", "GameChannel"]
__all__ = ["Patcher", "PatchType", "Game"]

View File

@ -1,38 +1 @@
class LAUNCHER_API:
"""Launcher API constants."""
RESOURCE_PATH: str = "mdk/launcher/api/resource"
OS: dict = {
"url": "https://hkrpg-launcher-static.hoyoverse.com/hkrpg_global/",
"params": {
"channel_id": 1,
"key": "vplOVX8Vn7cwG8yb",
"launcher_id": 35,
},
}
CN: dict = {
"url": "https://api-launcher.mihoyo.com/hkrpg_cn/mdk/launcher/api/resource",
"params": {
"channel_id": 1,
"key": "6KcVuOkbcqjJomjZ",
"launcher_id": 33,
},
}
LATEST_VERSION = (1, 6, 0)
MD5SUMS = {
"1.0.5": {
"cn": {
"StarRailBase.dll": "66c42871ce82456967d004ccb2d7cf77",
"UnityPlayer.dll": "0c866c44bb3752031a8c12ffe935b26f",
},
"os": {
"StarRailBase.dll": "8aa3790aafa3dd176678392f3f93f435",
"UnityPlayer.dll": "f17b9b7f9b8c9cbd211bdff7771a80c2",
},
}
}
# Patches
ASTRA_REPO = "https://notabug.org/mkrsym1/astra"
JADEITE_REPO = "https://codeberg.org/mkrsym1/jadeite/"
from vollerei.game.hsr.constants import * # noqa: F403 because we just want to re-export

View File

@ -1,5 +1,5 @@
# Re-exports
from vollerei.hsr.launcher.game import Game, GameChannel
from vollerei.hsr.launcher.game import Game
__all__ = ["Game", "GameChannel"]
__all__ = ["Game"]

View File

@ -1,13 +1,10 @@
import requests
from vollerei.common.api import Resource
from vollerei.hsr.constants import LAUNCHER_API
from vollerei.hsr.launcher.enums import GameChannel
from vollerei.common.api import get_game_packages, resource
from vollerei.common.enums import GameChannel
def get_resource(channel: GameChannel = GameChannel.Overseas) -> Resource:
def get_game_package(channel: GameChannel = GameChannel.Overseas) -> resource.GameInfo:
"""
Get game resource information from the launcher API.
Get game package information from the launcher API.
Default channel is overseas.
@ -17,15 +14,7 @@ def get_resource(channel: GameChannel = GameChannel.Overseas) -> Resource:
Returns:
Resource: Game resource information.
"""
resource_path: dict = None
match channel:
case GameChannel.Overseas:
resource_path = LAUNCHER_API.OS
case GameChannel.China:
resource_path = LAUNCHER_API.CN
return Resource.from_dict(
requests.get(
resource_path["url"] + LAUNCHER_API.RESOURCE_PATH,
params=resource_path["params"],
).json()["data"]
)
game_packages = get_game_packages(channel=channel)
for package in game_packages:
if "hkrpg" in package.game.biz:
return package

View File

@ -1,6 +0,0 @@
from enum import Enum
class GameChannel(Enum):
Overseas = 0
China = 1

View File

@ -1,24 +1,9 @@
from hashlib import md5
from io import IOBase
from os import PathLike
from pathlib import Path
from vollerei.abc.launcher.game import GameABC
from vollerei.common import ConfigFile, functions
from vollerei.common.api import resource
from vollerei.exceptions.game import (
GameAlreadyUpdatedError,
GameNotInstalledError,
PreDownloadNotAvailable,
ScatteredFilesNotAvailableError,
)
from vollerei.hsr.constants import MD5SUMS
from vollerei.hsr.launcher.enums import GameChannel
from vollerei.hsr.launcher import api
from vollerei import paths
from vollerei.utils import download
from vollerei.game.launcher.manager import Game as CommonGame
from vollerei.common.enums import GameType
class Game(GameABC):
class Game(CommonGame):
"""
Manages the game installation
@ -28,372 +13,4 @@ class Game(GameABC):
"""
def __init__(self, path: PathLike = None, cache_path: PathLike = None):
self._path: Path | None = Path(path) if path else None
if not cache_path:
cache_path = paths.cache_path
cache_path = Path(cache_path)
self._cache: Path = cache_path.joinpath("game/hsr/")
self._cache.mkdir(parents=True, exist_ok=True)
self._version_override: tuple[int, int, int] | None = None
self._channel_override: GameChannel | None = None
@property
def version_override(self) -> tuple[int, int, int] | None:
"""
Overrides the game version.
This can be useful if you want to override the version of the game
and additionally working around bugs.
"""
return self._version_override
@version_override.setter
def version_override(self, version: tuple[int, int, int] | str | None):
if isinstance(version, str):
version = tuple(int(i) for i in version.split("."))
self._version_override = version
@property
def channel_override(self) -> GameChannel | None:
"""
Overrides the game channel.
Because game channel detection isn't implemented yet, you may need
to use this for some functions to work.
This can be useful if you want to override the channel of the game
and additionally working around bugs.
"""
return self._channel_override
@channel_override.setter
def channel_override(self, channel: GameChannel | str | None):
if isinstance(channel, str):
channel = GameChannel[channel]
self._channel_override = channel
@property
def path(self) -> Path | None:
"""
Paths to the game folder.
"""
return self._path
@path.setter
def path(self, path: PathLike):
self._path = Path(path)
def data_folder(self) -> Path:
"""
Paths to the game data folder.
"""
try:
return self._path.joinpath("StarRail_Data")
except AttributeError:
raise GameNotInstalledError("Game path is not set.")
def is_installed(self) -> bool:
"""
Checks if the game is installed.
Returns:
bool: True if the game is installed, False otherwise.
"""
if self._path is None:
return False
if (
not self._path.joinpath("StarRail.exe").exists()
or not self._path.joinpath("StarRailBase.dll").exists()
or not self._path.joinpath("StarRail_Data").exists()
):
return False
if self.get_version() == (0, 0, 0):
return False
return True
def get_version_config(self) -> tuple[int, int, int]:
"""
Gets the current installed game version from config.ini.
Using this is not recommended, as only official launcher creates
and uses this file, instead you should use `get_version()`.
This returns (0, 0, 0) if the version could not be found.
Returns:
tuple[int, int, int]: Game version.
"""
cfg_file = self._path.joinpath("config.ini")
if not cfg_file.exists():
return (0, 0, 0)
cfg = ConfigFile(cfg_file)
if "General" not in cfg.sections():
return (0, 0, 0)
if "game_version" not in cfg["General"]:
return (0, 0, 0)
version_str = cfg["General"]["game_version"]
if version_str.count(".") != 2:
return (0, 0, 0)
try:
version = tuple(int(i) for i in version_str.split("."))
except Exception:
return (0, 0, 0)
return version
def set_version_config(self):
"""
Sets the current installed game version to config.ini.
This method is meant to keep compatibility with the official launcher only.
"""
cfg_file = self._path.joinpath("config.ini")
if not cfg_file.exists():
raise FileNotFoundError("config.ini not found.")
cfg = ConfigFile(cfg_file)
cfg.set("General", "game_version", self.get_version_str())
cfg.save()
def get_version(self) -> tuple[int, int, int]:
"""
Gets the current installed game version.
Credits to An Anime Team for the code that does the magic:
https://github.com/an-anime-team/anime-game-core/blob/main/src/games/star_rail/game.rs#L49
If the above method fails, it'll fallback to read the config.ini file
for the version, which is not recommended (as described in
`get_version_config()` docs)
This returns (0, 0, 0) if the version could not be found
(usually indicates the game is not installed), and in fact `is_installed()` uses
this method to check if the game is installed too.
Returns:
tuple[int, int, int]: The version as a tuple of integers.
"""
data_file = self.data_folder().joinpath("data.unity3d")
if not data_file.exists():
return (0, 0, 0)
def bytes_to_int(byte_array: list[bytes]) -> int:
bytes_as_int = int.from_bytes(byte_array, byteorder="big")
actual_int = bytes_as_int - 48 # 48 is the ASCII code for 0
return actual_int
allowed = [48, 49, 50, 51, 52, 53, 54, 55, 56, 57]
version_bytes: list[list[bytes]] = [[], [], []]
version_ptr = 0
correct = True
try:
with self.data_folder().joinpath("data.unity3d").open("rb") as f:
f.seek(0x7D0) # 2000 in decimal
for byte in f.read(10000):
match byte:
case 0:
version_bytes = [[], [], []]
version_ptr = 0
correct = True
case 46:
version_ptr += 1
if version_ptr > 2:
correct = False
case 38:
if (
correct
and len(version_bytes[0]) > 0
and len(version_bytes[1]) > 0
and len(version_bytes[2]) > 0
):
return (
bytes_to_int(version_bytes[0]),
bytes_to_int(version_bytes[1]),
bytes_to_int(version_bytes[2]),
)
case _:
if correct and byte in allowed:
version_bytes[version_ptr].append(byte)
else:
correct = False
except Exception:
pass
# Fallback to config.ini
return self.get_version_config()
def get_version_str(self) -> str:
"""
Gets the current installed game version as a string.
Because this method uses `get_version()`, you should read the docs of
that method too.
Returns:
str: The version as a string.
"""
return ".".join(str(i) for i in self.get_version())
def get_channel(self) -> GameChannel:
"""
Gets the current game channel.
Only works for Star Rail version 1.0.5, other versions will return the
overridden channel or GameChannel.Overseas if no channel is overridden.
This is not needed for game patching, since the patcher will automatically
detect the channel.
Returns:
GameChannel: The current game channel.
"""
version = self._version_override or self.get_version()
if version == (1, 0, 5):
for channel, v in MD5SUMS["1.0.5"].values():
for file, md5sum in v.values():
if (
md5(self._path.joinpath(file).read_bytes()).hexdigest()
!= md5sum
):
continue
match channel:
case "cn":
return GameChannel.China
case "os":
return GameChannel.Overseas
else:
# if self._path.joinpath("StarRail_Data").is_dir():
# return GameChannel.Overseas
# elif self._path.joinpath("StarRail_Data").exists():
# return GameChannel.China
# No reliable method there, so we'll just return the overridden channel or
# fallback to overseas.
return self._channel_override or GameChannel.Overseas
def get_remote_game(self, pre_download: bool = False) -> resource.Game:
"""
Gets the current game information from remote.
Args:
pre_download (bool): Whether to get the pre-download version.
Defaults to False.
Returns:
A `Game` object that contains the game information.
"""
channel = self._channel_override or self.get_channel()
if pre_download:
game = api.get_resource(channel=channel).pre_download_game
if not game:
raise PreDownloadNotAvailable("Pre-download version is not available.")
return game
return api.get_resource(channel=channel).game
def get_update(self, pre_download: bool = False) -> resource.Diff | None:
"""
Gets the current game update.
Args:
pre_download (bool): Whether to get the pre-download version.
Defaults to False.
Returns:
A `Diff` object that contains the update information or
`None` if the game is not installed or already up-to-date.
"""
if not self.is_installed():
return None
version = (
".".join(str(x) for x in self._version_override)
if self._version_override
else self.get_version_str()
)
for diff in self.get_remote_game(pre_download=pre_download).diffs:
if diff.version == version:
return diff
return None
def repair_file(self, file: PathLike, pre_download: bool = False) -> None:
"""
Repairs a game file.
This will automatically handle backup and restore the file if the repair
fails.
Args:
file (PathLike): The file to repair.
pre_download (bool): Whether to get the pre-download version.
Defaults to False.
"""
if not self.is_installed():
raise GameNotInstalledError("Game is not installed.")
file = Path(file)
if not file.is_relative_to(self._path):
raise ValueError("File is not in the game folder.")
game = self.get_remote_game(pre_download=pre_download)
if game.latest.decompressed_path is None:
raise ScatteredFilesNotAvailableError("Scattered files are not available.")
url = game.latest.decompressed_path + "/" + file.relative_to(self._path)
# Backup the file
file.rename(file.with_suffix(file.suffix + ".bak"))
try:
# Download the file
download(url, file.with_suffix(""))
except Exception:
# Restore the backup
file.rename(file.with_suffix(""))
raise
else:
# Delete the backup
file.unlink(missing_ok=True)
def apply_update_archive(
self, archive_file: PathLike | IOBase, auto_repair: bool = True
) -> None:
"""
Applies an update archive to the game, it can be the game update or a
voicepack update.
`archive_file` can be a path to the archive file or a file-like object,
like if you have very high amount of RAM and want to download the update
to memory instead of disk, this can be useful for you.
`auto_repair` is used to determine whether to repair the file if it's
broken. If it's set to False, then it'll raise an exception if the file
is broken.
Args:
archive_file (PathLike | IOBase): The archive file.
auto_repair (bool, optional): Whether to repair the file if it's broken.
Defaults to True.
"""
if not self.is_installed():
raise GameNotInstalledError("Game is not installed.")
if not isinstance(archive_file, IOBase):
archive_file = Path(archive_file)
# Hello hell again, dealing with HDiffPatch and all the things again.
functions.apply_update_archive(self, archive_file, auto_repair=auto_repair)
def install_update(
self, update_info: resource.Diff = None, auto_repair: bool = True
):
"""
Installs an update from a `Diff` object.
You may want to download the update manually and pass it to
`apply_update_archive()` instead for better control, and after that
execute `set_version_config()` to set the game version.
Args:
update_info (Diff, optional): The update information. Defaults to None.
auto_repair (bool, optional): Whether to repair the file if it's broken.
Defaults to True.
"""
if not self.is_installed():
raise GameNotInstalledError("Game is not installed.")
if not update_info:
update_info = self.get_update()
if not update_info or update_info.version == self.get_version_str():
raise GameAlreadyUpdatedError("Game is already updated.")
archive_file = self._cache.joinpath(update_info.name)
download(update_info.path, archive_file)
self.apply_update_archive(archive_file=archive_file, auto_repair=auto_repair)
self.set_version_config()
super().__init__(GameType.HSR, path, cache_path)

View File

@ -1,15 +1,16 @@
from enum import Enum
from shutil import copy2, rmtree
from distutils.version import StrictVersion
from packaging import version
from vollerei.abc.patcher import PatcherABC
from vollerei.common import telemetry
from vollerei.common.enums import GameChannel
from vollerei.exceptions.game import GameNotInstalledError
from vollerei.exceptions.patcher import (
VersionNotSupportedError,
PatcherError,
PatchUpdateError,
)
from vollerei.hsr.launcher.game import Game, GameChannel
from vollerei.hsr.launcher.game import Game
from vollerei.utils import download_and_extract, Git, Xdelta3
from vollerei.paths import tools_data_path
from vollerei.hsr.constants import ASTRA_REPO, JADEITE_REPO
@ -23,8 +24,8 @@ class PatchType(Enum):
Jadeite: The new patch which patch the game in memory by DLL injection.
"""
Astra: int = 0
Jadeite: int = 1
Astra = 0
Jadeite = 1
class Patcher(PatcherABC):
@ -66,7 +67,7 @@ class Patcher(PatcherABC):
with open(self._jadeite.joinpath("version"), "r") as f:
current_version = f.read()
if current_version:
if StrictVersion(file_version) <= StrictVersion(current_version):
if version.parse(file_version) <= version.parse(current_version):
return
download_and_extract(file, self._jadeite)
with open(self._jadeite.joinpath("version"), "w") as f:
@ -188,6 +189,8 @@ class Patcher(PatcherABC):
Unpatch the game
If you use Jadeite (by default), this will just delete Jadeite files.
Note that Honkai Impact 3rd uses Jadeite too, so executing this will
delete the files needed by both games.
Args:
game (Game): The game to unpatch

View File

@ -9,8 +9,8 @@ class Paths:
"""
base_paths = PlatformDirs("vollerei", "tretrauit", roaming=True)
cache_path = base_paths.site_cache_path
data_path = base_paths.site_data_path
cache_path = base_paths.user_cache_path
data_path = base_paths.user_data_path
tools_data_path = data_path.joinpath("tools")
tools_cache_path = cache_path.joinpath("tools")
launcher_cache_path = cache_path.joinpath("launcher")
@ -20,7 +20,7 @@ class Paths:
def set_base_path(path: PathLike):
path = Path(path)
Paths.base_paths = path
Paths.cache_path = Paths.base_paths.joinpath("Cache")
Paths.cache_path = Paths.base_paths.joinpath("cache")
Paths.data_path = Paths.base_paths
Paths.tools_data_path = Paths.data_path.joinpath("tools")
Paths.tools_cache_path = Paths.cache_path.joinpath("tools")

View File

@ -1,5 +1,6 @@
import requests
import platform
import shutil
from zipfile import ZipFile
from io import BytesIO
from pathlib import Path
@ -53,7 +54,11 @@ __all__ = [
def download(
url: str, out: Path, file_len: int = None, overwrite: bool = False
url: str,
out: Path,
file_len: int = None,
overwrite: bool = False,
stream: bool = True,
) -> None:
"""
Download to a path.
@ -65,22 +70,22 @@ def download(
if overwrite:
out.unlink(missing_ok=True)
headers = {}
mode = "a+b"
if out.exists():
cur_len = (out.stat()).st_size
headers |= {"Range": f"bytes={cur_len}-{file_len if file_len else ''}"}
else:
mode = "w+b"
out.parent.mkdir(parents=True, exist_ok=True)
out.touch()
# Streaming, so we can iterate over the response.
response = requests.get(url=url, headers=headers, stream=True)
response.raise_for_status()
if response.status == 416:
response = requests.get(url=url, headers=headers, stream=stream)
if response.status_code == 416:
print(f"File already downloaded: {out}")
return
# Sizes in bytes.
block_size = 32768
with out.open("ab") as file:
for data in response.iter_content(block_size):
file.write(data)
response.raise_for_status()
with open(out, mode) as file:
shutil.copyfileobj(response.raw, file)
return True

View File

@ -1,3 +1,4 @@
from os import PathLike
import platform
import subprocess
from zipfile import ZipFile
@ -26,22 +27,35 @@ class HDiffPatch:
@staticmethod
def _get_platform_arch():
processor = platform.machine()
match platform.system():
case "Windows":
match platform.architecture()[0]:
case "32bit":
match processor:
case "i386":
return "windows32"
case "64bit":
case "x86_64":
return "windows64"
case "AMD64":
return "windows64"
case "arm":
return "windows_arm32"
case "arm64":
return "windows_arm64"
case "Linux":
match platform.architecture()[0]:
case "32bit":
match processor:
case "i386":
return "linux32"
case "64bit":
case "x86_64":
return "linux64"
case "arm":
return "linux_arm32"
case "arm64":
return "linux_arm64"
case "Darwin":
return "macos"
# TODO: Add support for Android & other architectures
# Rip BSD they need to use Linux compatibility layer to run this
# (or use Wine if they prefer that)
raise PlatformNotSupportedError(
@ -73,9 +87,11 @@ class HDiffPatch:
def hpatchz(self) -> str | None:
return self._get_binary("hpatchz")
def patch_file(self, in_file, out_file, patch_file):
def patch_file(self, in_file: PathLike, out_file: PathLike, patch_file: PathLike):
try:
subprocess.check_call([self.hpatchz(), "-f", in_file, patch_file, out_file])
subprocess.check_call(
[self.hpatchz(), "-f", str(in_file), str(patch_file), str(out_file)]
)
except subprocess.CalledProcessError as e:
raise HPatchZPatchError("Patch error") from e
@ -88,16 +104,11 @@ class HDiffPatch:
params={"Headers": "Accept: application/vnd.github.v3+json"},
)
rsp.raise_for_status()
archive_processor = self._get_platform_arch()
for asset in rsp.json()["assets"]:
if not asset["name"].endswith(".zip"):
continue
if "linux" in asset["name"]:
continue
if "windows" in asset["name"]:
continue
if "macos" in asset["name"]:
continue
if "android" in asset["name"]:
if archive_processor not in asset["name"]:
continue
return asset

5
vollerei/zzz/__init__.py Normal file
View File

@ -0,0 +1,5 @@
# Re-exports
from vollerei.zzz.launcher import Game
__all__ = ["Game"]

View File

@ -0,0 +1,5 @@
# Re-exports
from vollerei.zzz.launcher.game import Game
__all__ = ["Game"]

View File

@ -0,0 +1,20 @@
from vollerei.common.api import get_game_packages, resource
from vollerei.common.enums import GameChannel
def get_game_package(channel: GameChannel = GameChannel.Overseas) -> resource.GameInfo:
"""
Get game package information from the launcher API.
Default channel is overseas.
Args:
channel: Game channel to get the resource information from.
Returns:
Resource: Game resource information.
"""
game_packages = get_game_packages(channel=channel)
for package in game_packages:
if "nap" in package.game.biz:
return package

View File

@ -0,0 +1,16 @@
from os import PathLike
from vollerei.game.launcher.manager import Game as CommonGame
from vollerei.common.enums import GameType
class Game(CommonGame):
"""
Manages the game installation
Since channel detection isn't implemented yet, most functions assume you're
using the overseas version of the game. You can override channel by setting
the property `channel_override` to the channel you want to use.
"""
def __init__(self, path: PathLike = None, cache_path: PathLike = None):
super().__init__(GameType.ZZZ, path, cache_path)

View File