Compare commits

...

79 Commits

Author SHA1 Message Date
a084c61cae fix: disable repair command
The new HoYoPlay launcher doesn't have the ability to get scattered files from the remote anymore, "repairing" the game will just download the whole game again.
2025-05-21 04:22:38 +07:00
10d5f3f208 fix(common): merge zip file too 2025-03-03 17:04:05 +07:00
7707a64733 fix(common): bruh 2025-03-03 17:00:40 +07:00
76bccf6a6f fix(common): support zip file 2025-03-03 16:50:51 +07:00
874686a95c chore: change default path to user
So we can use it on Linux
2025-03-02 03:04:02 +07:00
1863fafd85 fix(common): manually close the merged archive 2025-02-26 18:48:46 +07:00
ac76df577b fix(common): install archive for split files
It worked before, but not now because I switched to py7zr backend for a while now.
2025-02-26 18:41:01 +07:00
ed641f890d fix(common): remove installed check 2025-02-02 22:53:27 +07:00
9e64bfc531 docs: fix again 2025-02-02 00:04:39 +07:00
25cfcdf0f0 fix: lol 2025-02-02 00:04:14 +07:00
b2eec5ee30 docs: fix 2025-02-02 00:01:32 +07:00
7dbe890bf3 fix: remove game channel from games
Lol
2025-02-01 23:58:09 +07:00
976308ac85 feat: unify multiple games into one 2025-02-01 23:51:53 +07:00
fba6cecc38 fix(common): add 'hdiffmap.json' to ignore list
So we don't extract the file lol.
2025-01-15 15:13:56 +07:00
f5007b6aa5 fix(common): compat with 'hdiffmap.json'
miHoYo introduced a new hdiff map, so yeah.
2025-01-14 21:51:31 +07:00
da9c930d2e fix(genshin): handle key error 2025-01-01 12:40:39 +07:00
fc3a43bec7 fix(genshin): voicepack english 2024-12-31 11:16:07 +07:00
844453eabc fix(genshin): voicepack detection 2024-12-31 01:52:01 +07:00
d6d1fdee6e fix: handle .zip update archive
Also fixes some minor bug
2024-12-18 19:10:34 +07:00
7440b6041f chore(ci): i forgor 2024-12-18 18:31:22 +07:00
75649df729 chore(ci): use poetry to install docs 2024-12-18 18:29:53 +07:00
707bcc14c3 chore(ci): fix 2024-12-18 18:27:31 +07:00
18aa7935cb chore(ci): fix 2024-12-18 18:21:12 +07:00
ecd204428d chore: docs 2024-12-18 18:18:33 +07:00
d920aea2b8 chore: add proxy game to readme 2024-12-18 17:40:43 +07:00
441a06fb5b fix(zzz); voicepack language detection 2024-12-18 17:39:48 +07:00
a33bdaa473 feat: implement zzz
It literally is just copy-pasting
2024-12-18 17:35:04 +07:00
4a7bc3d0b4 chore(cli): migrate to one file
Easier to maintain but slightly slower execution speed lol
2024-12-18 15:20:24 +07:00
1948d1f741 chore: add whatever spicy thingy here 2024-12-18 13:30:28 +07:00
6f384c1cc5 chore: add some information to readme 2024-12-13 19:42:36 +07:00
ebc0f2f3f5 feat(cli): support genshin
It literally is just copy-pasting lol, since almost all code are shared.
2024-12-13 19:35:51 +07:00
0c83958ee5 feat(cli/hsr): implement 'voicepack install' & other changes
Rename 'voicepack update-all' to 'voicepack update', allowing update only a voicepack instead of having to upate all of them
2024-12-13 19:30:54 +07:00
95ef9409f5 feat(cli/hsr): add 'hsr install download' command
Yeah
2024-12-13 13:22:38 +07:00
659366620b feat(cli/hsr): support game installation 2024-12-13 12:54:59 +07:00
5304cfbde1 chore: fix a bit
Wtf me
2024-12-13 11:59:51 +07:00
2331f9404a feat(game/genshin): initial copy of hsr code
Lol
2024-12-13 11:56:30 +07:00
a3df498444 chore: move repair-related functions to common
Preparation for Genshin Impact support xD
2024-12-13 11:26:48 +07:00
8ba00754ee chore: add more info to readme 2024-12-12 18:20:21 +07:00
4e0c4ed2c0 chore: refactor readme 2024-10-26 00:36:24 +07:00
7e5a60bb33 fix: applying update archive
It now works, previously I thought py7zr works the same as zipfile but it's different af so I had to change some code for that.
2024-10-23 20:46:44 +07:00
f02f1d5988 fix(common/update): zipfile -> py7zr
So py7zr does have some differences from zipfile
2024-10-22 13:10:14 +07:00
0bac04bdbd fix: use py7zr
Game archive now uses .7z, wow mihoyo
2024-10-22 00:34:57 +07:00
4d8f4008f2 fix(cli/hsr): download update now works
Also change e.__context__ to traceback.format_exc() to give better and actually useful error messages.
2024-10-21 13:34:38 +07:00
208c6efd1e fix(hsr/repair): fix the decompressed url
They changed it lol
2024-10-21 13:33:49 +07:00
156c42c1f3 fix: fix predownload object
Wow.
2024-10-21 13:33:17 +07:00
8ff2a388d7 fix: migrate to HoYoPlay
For now, only update game and voicepack are working

This commit also fixes a couple of bugs too. Tbf I enjoyed HoYoPlay until I don't have enough space to update my HSR so yeah 💀
2024-09-10 16:47:20 +07:00
08c51d2fd8 feat(repair): rework the repair feature
Mostly usable now
2024-06-18 02:49:52 +07:00
e8f63f175f Merge branch 'master' of himeko.tretrauit.me:tretrauit/vollerei 2024-06-06 20:21:06 +07:00
acd457babe fix(repair): join game path with pkg_version 2024-06-06 20:19:45 +07:00
bf5bdf7618 fix(repair): join game path with pkg_version 2024-06-06 20:17:22 +07:00
f5e7417cdf feat(hsr): repair game
Apparently doesn't completely work on my setup (NTFS + Btrfs) but yours may. If it doesn't fix the game then copy the file it downloaded from temp to the game directory and it should work.
2024-05-28 10:58:27 +07:00
bbdb8d3596 fix: revert back to ThreadPoolExecutor
ProcessPoolExecutor didn't work for me :(
2024-05-08 17:21:34 +07:00
f4e09a7aad fix: handle hdiffpatch downloading for each architecture 2024-03-27 12:20:29 +07:00
9569019fcf fix(cli/hsr): fix voicepack downloading 2024-03-25 22:57:00 +07:00
8c0e03ebfa feat(cli/hsr): add "update download"
Basically download the update only.
2024-03-25 20:14:14 +07:00
34f65c00f3 feat(cli/hsr): add more commands 2024-02-06 14:39:41 +07:00
a7d763d847 feat(hsr): add voicepack related commands 2024-02-06 14:24:44 +07:00
fa011a11a1 fix(cli): fix 416 response code 2024-02-06 14:24:15 +07:00
c2f81637e2 feat(hsr): add --from-version 2024-02-06 13:34:22 +07:00
7e18185b52 fix(hsr): wrong function call 2024-02-06 13:30:47 +07:00
63f5c36391 chore: fix dependencies 2024-02-06 12:12:42 +07:00
bdf1f8d785 fix: use packaging.version instead of distutils
distutils is dead
2024-02-06 12:04:53 +07:00
70eb0e9443 feat: add hi3 stub
HI3 isn't working atm, don't expect too much
2024-02-06 11:48:16 +07:00
ca03718bf1 chore: rename _path to path 2024-01-28 18:26:48 +07:00
9f9a63c9b0 feat(hsr): implement voicepack update 2024-01-28 17:11:16 +07:00
4c851b999a fix(hsr): proper implement --noconfirm 2024-01-25 20:18:56 +07:00
80f1ea87d7 feat: implement VoicePackLanguage
Enum better than string 8)
2024-01-03 19:54:29 +07:00
a9f3ee2e3e fix(hsr): convert to string first 2024-01-03 19:50:19 +07:00
f8cfbc1100 docs: add information on functions.apply_update_archive 2024-01-03 17:00:58 +07:00
960ba8d746 docs: add some updated info 2024-01-03 12:02:23 +07:00
884236177b fix(cli/hsr): remove redundant strings
I copy-pasted lol
2024-01-03 06:32:22 +07:00
d1bda2dc32 fix(patch): unlink file if auto-repair succeed 2024-01-03 06:31:57 +07:00
7ed2b2e643 feat(hsr): add auto_repair when hpatchz error 2024-01-03 02:46:07 +07:00
6db85bc439 feat(hsr): support continue download & apply-archive 2024-01-02 23:16:58 +07:00
6f030e79ce feat(hsr): download & install updates
WIP WIP WIP, probably will crash at some point.
Also the code in paths isn't working, so i'm merging them to constants in the next commit
2024-01-02 15:27:40 +07:00
fe7b1945ef feat(cli): use cleo
It is much smarter than typer in terms of options parsing, and also once you know how to use it (with the holy outdated documentation) then it suits my purpose pretty well.
2024-01-02 02:31:15 +07:00
4db6e92b54 feat/cli: migrate to typer
Fire is too hard to customize.
2023-07-03 22:44:07 +07:00
5b916025f3 feat(cli): add --silent option 2023-06-30 20:55:53 +07:00
69a53da8ae feat: wrap all objects on /resource endpoint 2023-06-30 20:55:30 +07:00
64 changed files with 5415 additions and 1178 deletions

52
.github/workflows/docs.yml vendored Normal file
View File

@ -0,0 +1,52 @@
name: Build & Deploy
on:
push:
pull_request:
workflow_dispatch:
jobs:
# Build job
build:
# Specify runner + build & upload the static files as an artifact
runs-on: ubuntu-latest
steps:
- name: Checkout code
id: checkout-code
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.13'
cache: 'pip' # caching pip dependencies
- name: Install dependencies
id: install
run: |
pipx install poetry --python $(which python)
poetry install --only docs
- name: Build docs
id: build
run: |
poetry run mkdocs build
- name: Upload static files as artifact
id: deployment
uses: actions/upload-pages-artifact@v3 # or specific "vX.X.X" version tag for this action
with:
path: site/
# Deployment job
deploy:
# Grant GITHUB_TOKEN the permissions required to make a Pages deployment
permissions:
pages: write # to deploy to Pages
id-token: write # to verify the deployment originates from an appropriate source
environment:
name: github-pages
url: ${{ steps.deployment.outputs.page_url }}
runs-on: ubuntu-latest
needs: build
steps:
- name: Deploy to GitHub Pages
id: deployment
uses: actions/deploy-pages@v4

View File

@ -4,6 +4,17 @@
An open-source launcher for anime games
## Documentation
https://tretrauit.me/vollerei/
## Installation
Assumming you have `pipx` installed, execute the following command:
```bash
pipx install git+https://git.tretrauit.me/tretrauit/vollerei --preinstall tqdm
```
## Why?
I've done [worthless-launcher](https://tretrauit.gitlab.io/worthless-launcher) for an open-world anime game,
@ -11,9 +22,33 @@ since I want to support other anime games and the launcher code is very messy, t
## Features
+ Nothing, I haven't written any code yet.
### Turn-based game / Open-world game / Proxy game
- [x] Cross-platform support
> Tested on Windows and Linux myself, although should work on most major OSes where `HDiffPatch` is supported.
- [x] Does *not* require administrator/root privileges
> Though if issues occur during installation/upgrading process, you can always try running the program with elevated privileges to fix them.
- [x] Download the game update (including pre-downloads if available)
- [x] Get the game version
- [x] Get installed voicepacks
- [x] Installation
- [x] Patch the game for unsupported platforms (with telemetry checking)
> Not applicable for the "Open-world game", since patching is unneeded to play the game.
- [x] Repair the game (Smarter than the official launcher!)
- [x] Update the game
- [x] Update voicepacks
- [ ] Uninstall the game (Just remove the game directory lol)
- [x] Voicepacks installation
#### Advanced features
- [x] Apply the update archives
- [x] Download the update archives
- [x] Easy to use API
### Notes
### Other games (Dreamseeker game)
I haven't developed for them yet, but since most of the code is shared I'll do that when I have the motivation to do so.
~~Help me get motivated by going to https://paypal.me/tretrauit and send me a coffee lol~~
## Notes
This launcher tries to mimic the official launcher behaviour as much as possible but if a ban appears, I will
not be responsible for it. (*Turn-based game* have a ban wave already, see AAGL discord for more info)

7
docs/abc/game.md Normal file
View File

@ -0,0 +1,7 @@
# Game
::: vollerei.abc.launcher.game.GameABC
handler: python
options:
show_root_heading: true
show_source: true

9
docs/common/functions.md Normal file
View File

@ -0,0 +1,9 @@
# Function for all games
Since you should use the specific implementation for the target game instead, these functions may not be correctly documented.
::: vollerei.common.functions
handler: python
options:
show_root_heading: true
show_source: true

View File

@ -0,0 +1,7 @@
# Game
::: vollerei.game.launcher.Game
handler: python
options:
show_root_heading: true
show_source: true

7
docs/genshin/game.md Normal file
View File

@ -0,0 +1,7 @@
# Game
::: vollerei.genshin.Game
handler: python
options:
show_root_heading: true
show_source: true

7
docs/hsr/game.md Normal file
View File

@ -0,0 +1,7 @@
# Game
::: vollerei.hsr.Game
handler: python
options:
show_root_heading: true
show_source: true

7
docs/hsr/patcher.md Normal file
View File

@ -0,0 +1,7 @@
# Patcher
::: vollerei.hsr.Patcher
handler: python
options:
show_root_heading: true
show_source: true

8
docs/hsr/patchtype.md Normal file
View File

@ -0,0 +1,8 @@
# PatchType
::: vollerei.hsr.PatchType
handler: python
options:
show_root_heading: true
show_source: true

17
docs/index.md Normal file
View File

@ -0,0 +1,17 @@
# Welcome to MkDocs
For full documentation visit [mkdocs.org](https://www.mkdocs.org).
## Commands
* `mkdocs new [dir-name]` - Create a new project.
* `mkdocs serve` - Start the live-reloading docs server.
* `mkdocs build` - Build the documentation site.
* `mkdocs -h` - Print help message and exit.
## Project layout
mkdocs.yml # The configuration file.
docs/
index.md # The documentation homepage.
... # Other markdown pages, images and other files.

7
docs/zzz/game.md Normal file
View File

@ -0,0 +1,7 @@
# Game
::: vollerei.zzz.Game
handler: python
options:
show_root_heading: true
show_source: true

38
mkdocs.yml Normal file
View File

@ -0,0 +1,38 @@
site_name: Vollerei
site_url: https://tretrauit.me/vollerei/
site_author: tretrauit
site_description: >-
Documentation for the Vollerei project, a Python library for managing and
interacting with HoYoVerse games.
repo_name: teppyboy/vollerei
repo_url: https://github.com/teppyboy/vollerei
# Copyright
copyright: Copyright © 2023 - 2024 tretrauit
theme:
name: material
palette:
- media: "(prefers-color-scheme)"
toggle:
icon: material/link
name: Switch to light mode
- media: "(prefers-color-scheme: light)"
scheme: default
primary: indigo
accent: indigo
toggle:
icon: material/toggle-switch
name: Switch to dark mode
- media: "(prefers-color-scheme: dark)"
scheme: slate
primary: black
accent: indigo
toggle:
icon: material/toggle-switch-off
name: Switch to system preference
plugins:
- search
- mkdocstrings

2331
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -10,14 +10,31 @@ readme = "README.md"
python = "^3.11"
platformdirs = "^3.5.1"
requests = "^2.31.0"
cleo = "^2.1.0"
packaging = "^23.2"
py7zr = "^0.22.0"
multivolumefile = "^0.2.3"
[tool.poetry.group.cli]
optional = true
[tool.poetry.group.cli.dependencies]
tqdm = "^4.65.0"
[tool.poetry.group.dev]
optional = true
[tool.poetry.group.dev.dependencies]
pytest = "^7.3.1"
pre-commit = "^3.3.3"
[tool.poetry.group.cli.dependencies]
fire = "^0.5.0"
[tool.poetry.group.docs.dependencies]
mkdocstrings-python = "^1.12.2"
mkdocs-material = "^9.5.49"
[tool.poetry.scripts]
vollerei = 'vollerei.cli:run'
[build-system]
requires = ["poetry-core"]

View File

@ -1,5 +1,4 @@
from vollerei.cli import CLI
from fire import Fire
from vollerei.cli import run
Fire(CLI, name="vollerei")
run()

View File

@ -1,6 +1,8 @@
from abc import ABC, abstractmethod
from os import PathLike
from pathlib import Path
from typing import Any
from vollerei.common.api import resource
class GameABC(ABC):
@ -8,6 +10,11 @@ class GameABC(ABC):
Manages the game installation
"""
path: Path
cache: Path
version_override: tuple[int, int, int] | None
channel_override: Any
def __init__(self, path: PathLike = None):
pass
@ -32,9 +39,6 @@ class GameABC(ABC):
Args:
game_path (PathLike, optional): Path to install the game to.
Returns:
None
"""
pass
@ -64,6 +68,44 @@ class GameABC(ABC):
"""
pass
def repair_file(
self, file: PathLike, pre_download: bool = False, game_info=None
) -> None:
"""
Repairs a game file.
This will automatically handle backup and restore the file if the repair
fails.
Args:
file (PathLike): The file to repair.
pre_download (bool): Whether to get the pre-download version.
Defaults to False.
"""
pass
def repair_files(
self,
files: list[PathLike],
pre_download: bool = False,
game_info: resource.Game = None,
) -> None:
"""
Repairs multiple game files.
This will automatically handle backup and restore the file if the repair
fails.
This method is not multi-threaded, so it may take a while to repair
multiple files.
Args:
files (PathLike): The files to repair.
pre_download (bool): Whether to get the pre-download version.
Defaults to False.
"""
pass
def get_version(self) -> tuple[int, int, int]:
"""
Get the game version
@ -72,15 +114,23 @@ class GameABC(ABC):
"""
pass
def get_update(self):
def get_version_config(self) -> tuple[int, int, int]:
"""
Get the game update
Gets the current installed game version from config.ini.
Using this is not recommended, as only official launcher creates
and uses this file, instead you should use `get_version()`.
This returns (0, 0, 0) if the version could not be found.
Returns:
tuple[int, int, int]: Game version.
"""
pass
def get_voiceover_update(self, language: str):
def get_update(self) -> resource.Patch | None:
"""
Get the voiceover update
Get the game update
"""
pass
@ -89,3 +139,18 @@ class GameABC(ABC):
Get the game channel
"""
pass
def get_remote_game(
self, pre_download: bool = False
) -> resource.Main | resource.PreDownload:
"""
Gets the current game information from remote.
Args:
pre_download (bool): Whether to get the pre-download version.
Defaults to False.
Returns:
A `Game` object that contains the game information.
"""
pass

View File

@ -1,28 +1,10 @@
from pathlib import Path
from vollerei import __version__
from vollerei.cli.hsr import HSR
from vollerei.hsr import PatchType
from vollerei.cli import utils
from cleo.application import Application
from vollerei.cli import commands
application = Application()
for command in commands.exports:
application.add(command)
class CLI:
def __init__(
self, game_path: str = None, patch_type=None, noconfirm: bool = False
) -> None:
"""
Vollerei CLI
"""
print(f"Vollerei v{__version__}")
if noconfirm:
print("User requested to automatically answer yes to all questions.")
utils.no_confirm = noconfirm
if not game_path:
game_path = Path.cwd()
game_path = Path(game_path)
if patch_type is None:
patch_type = PatchType.Jadeite
elif isinstance(patch_type, str):
patch_type = PatchType[patch_type]
elif isinstance(patch_type, int):
patch_type = PatchType(patch_type)
self.hsr = HSR(game_path=game_path, patch_type=patch_type)
def run():
application.run()

982
vollerei/cli/commands.py Normal file
View File

@ -0,0 +1,982 @@
import copy
import traceback
from cleo.commands.command import Command
from cleo.helpers import option, argument
from pathlib import PurePath
from platform import system
from vollerei.abc.launcher.game import GameABC
from vollerei.common.api import resource
from vollerei.common.enums import GameChannel, VoicePackLanguage
from vollerei.cli import utils
from vollerei.exceptions.game import GameError
from vollerei.exceptions.patcher import PatcherError, PatchUpdateError
from vollerei.genshin import Game as GenshinGame
from vollerei.hsr import Game as HSRGame, Patcher as HSRPatcher
from vollerei.hsr.patcher import PatchType as HSRPatchType
from vollerei.zzz import Game as ZZZGame
from vollerei import paths
patcher = HSRPatcher()
default_options = [
option("channel", "c", description="Game channel", flag=False, default="overseas"),
option("force", "f", description="Force the command to run"),
option(
"game-path",
"g",
description="Path to the game installation",
flag=False,
default=".",
),
option("patch-type", "p", description="Patch type", flag=False),
option("temporary-path", "t", description="Temporary path", flag=False),
option("silent", "s", description="Silent mode"),
option("noconfirm", "y", description="Do not ask for confirmation (yes to all)"),
]
class State:
game: GameABC = None
def callback(
command: Command,
):
"""
Base callback for all commands
"""
game_path = command.option("game-path")
channel = command.option("channel")
silent = command.option("silent")
noconfirm = command.option("noconfirm")
temporary_path = command.option("temporary-path")
if isinstance(channel, str):
channel = GameChannel[channel.capitalize()]
elif isinstance(channel, int):
channel = GameChannel(channel)
if temporary_path:
paths.set_base_path(temporary_path)
if command.name.startswith("hsr"):
State.game = HSRGame(game_path, temporary_path)
patch_type = command.option("patch-type")
if patch_type is None:
patch_type = HSRPatchType.Jadeite
elif isinstance(patch_type, str):
patch_type = HSRPatchType[patch_type]
elif isinstance(patch_type, int):
patch_type = HSRPatchType(patch_type)
patcher.patch_type = patch_type
elif command.name.startswith("genshin"):
State.game = GenshinGame(game_path)
elif command.name.startswith("zzz"):
State.game = ZZZGame(game_path)
else:
raise ValueError("Invalid game type")
if channel:
State.game.channel_override = channel
utils.silent_message = silent
if noconfirm:
utils.no_confirm = noconfirm
def confirm(
question: str, default: bool = False, true_answer_regex: str = r"(?i)^y"
):
command.line(
f"<question>{question} (yes/no)</question> [<comment>{'yes' if default else 'no'}</comment>] y"
)
return True
command.confirm = confirm
command.add_style("warn", fg="yellow")
def set_version_config(self: Command):
self.line("Setting version config... ")
try:
State.game.set_version_config()
except Exception as e:
self.line_error(f"<warn>Couldn't set version config: {e}</warn>")
self.line_error(
"This won't affect the overall experience, but if you're using the official launcher"
)
self.line_error(
"you may have to edit the file 'config.ini' manually to reflect the latest version."
)
self.line(
f"The game has been updated to version: <comment>{State.game.get_version_str()}</comment>"
)
class VoicepackListInstalled(Command):
name = "hsr voicepack list-installed"
description = "Get the installed voicepacks"
options = default_options
def handle(self):
callback(command=self)
installed_voicepacks_str = [
f"<comment>{x.name}</comment>"
for x in State.game.get_installed_voicepacks()
]
self.line(f"Installed voicepacks: {', '.join(installed_voicepacks_str)}")
class VoicepackList(Command):
name = "hsr voicepack list"
description = "Get all available voicepacks"
options = default_options + [
option("pre-download", description="Pre-download the game if available"),
]
def handle(self):
callback(command=self)
pre_download = self.option("pre-download")
remote_game = State.game.get_remote_game(pre_download=pre_download)
available_voicepacks_str = [
f"<comment>{x.language.name} ({x.language.value})</comment>"
for x in remote_game.major.audio_pkgs
]
self.line(f"Available voicepacks: {', '.join(available_voicepacks_str)}")
class VoicepackInstall(Command):
name = "hsr voicepack install"
description = "Installs the specified installed voicepacks"
options = default_options + [
option("pre-download", description="Pre-download the game if available"),
]
arguments = [
argument(
"language", description="Languages to install", multiple=True, optional=True
)
]
def handle(self):
callback(command=self)
pre_download = self.option("pre-download")
# Typing manually because pylance detect it as Any
languages: list[str] = self.argument("language")
# Get installed voicepacks
language_objects = []
for language in languages:
language = language.lower()
try:
language_objects.append(VoicePackLanguage[language.capitalize()])
except KeyError:
try:
language_objects.append(VoicePackLanguage.from_remote_str(language))
except ValueError:
self.line_error(f"<error>Invalid language: {language}</error>")
if len(language_objects) == 0:
self.line_error(
"<error>No valid languages specified, you must specify a language to install</error>"
)
return
progress = utils.ProgressIndicator(self)
progress.start("Fetching install package information... ")
try:
game_info = State.game.get_remote_game(pre_download=pre_download)
except Exception as e:
progress.finish(
f"<error>Fetching failed with following error: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish(
"<comment>Installation information fetched successfully.</comment>"
)
if not self.confirm("Do you want to install the specified voicepacks?"):
self.line("<error>Installation aborted.</error>")
return
# Voicepack update
for remote_voicepack in game_info.major.audio_pkgs:
if remote_voicepack.language not in language_objects:
continue
self.line(
f"Downloading install package for language: <comment>{remote_voicepack.language.name}</comment>... "
)
archive_file = State.game.cache.joinpath(
PurePath(remote_voicepack.url).name
)
try:
download_result = utils.download(
remote_voicepack.url, archive_file, file_len=remote_voicepack.size
)
except Exception as e:
self.line_error(f"<error>Couldn't download package: {e}</error>")
return
if not download_result:
self.line_error("<error>Download failed.</error>")
return
self.line("Download completed.")
progress = utils.ProgressIndicator(self)
progress.start("Installing package...")
try:
State.game.install_archive(archive_file)
except Exception as e:
progress.finish(
f"<error>Couldn't apply package: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish(
f"<comment>Package applied for language {remote_voicepack.language.name}.</comment>"
)
self.line(
f"The voicepacks have been installed to version: <comment>{State.game.get_version_str()}</comment>"
)
class VoicepackUpdate(Command):
name = "hsr voicepack update"
description = (
"Updates the specified installed voicepacks, if not specified, updates all"
)
options = default_options + [
option(
"auto-repair", "R", description="Automatically repair the game if needed"
),
option("pre-download", description="Pre-download the game if available"),
option(
"from-version", description="Update from a specific version", flag=False
),
]
arguments = [
argument(
"language", description="Languages to update", multiple=True, optional=True
)
]
def handle(self):
callback(command=self)
auto_repair = self.option("auto-repair")
pre_download = self.option("pre-download")
from_version = self.option("from-version")
# Typing manually because pylance detect it as Any
languages: list[str] = self.argument("language")
if auto_repair:
self.line("<comment>Auto-repair is enabled.</comment>")
if from_version:
self.line(f"Updating from version: <comment>{from_version}</comment>")
State.game.version_override = from_version
# Get installed voicepacks
if len(languages) == 0:
self.line(
"<comment>No languages specified, updating all installed voicepacks...</comment>"
)
installed_voicepacks = State.game.get_installed_voicepacks()
if len(languages) > 0:
languages = [x.lower() for x in languages]
# Support both English and en-us and en
installed_voicepacks = [
x
for x in installed_voicepacks
if x.name.lower() in languages
or x.value.lower() in languages
or x.name.lower()[:2] in languages
]
installed_voicepacks_str = [
f"<comment>{str(x.name)}</comment>" for x in installed_voicepacks
]
self.line(f"Updating voicepacks: {', '.join(installed_voicepacks_str)}")
progress = utils.ProgressIndicator(self)
progress.start("Checking for updates... ")
try:
update_diff: resource.Patch | None = State.game.get_update(
pre_download=pre_download
)
game_info = State.game.get_remote_game(pre_download=pre_download)
except Exception as e:
progress.finish(
f"<error>Update checking failed with following error: {e} \n{traceback.format_exc()}</error>"
)
return
if update_diff is None:
progress.finish("<comment>Game is already updated.</comment>")
return
progress.finish("<comment>Update available.</comment>")
self.line(
f"The current version is: <comment>{State.game.get_version_str()}</comment>"
)
self.line(
f"The latest version is: <comment>{game_info.major.version}</comment>"
)
if not self.confirm("Do you want to update the game?"):
self.line("<error>Update aborted.</error>")
return
# Voicepack update
for remote_voicepack in update_diff.audio_pkgs:
if remote_voicepack.language not in installed_voicepacks:
continue
# Voicepack is installed, update it
archive_file = State.game.cache.joinpath(
PurePath(remote_voicepack.url).name
)
self.line(
f"Downloading update package for voicepack language '{remote_voicepack.language.name}'..."
)
try:
download_result = utils.download(
remote_voicepack.url, archive_file, file_len=remote_voicepack.size
)
except Exception as e:
self.line_error(f"<error>Couldn't download update: {e}</error>")
return
if not download_result:
self.line_error("<error>Download failed.</error>")
return
self.line("Download completed.")
progress = utils.ProgressIndicator(self)
progress.start("Applying update package...")
try:
State.game.apply_update_archive(
archive_file=archive_file, auto_repair=auto_repair
)
except Exception as e:
progress.finish(
f"<error>Couldn't apply update: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish(
f"<comment>Update applied for language {remote_voicepack.language.name}.</comment>"
)
State.game.version_override = game_info.major.version
set_version_config(self=self)
State.game.version_override = None
class PatchTypeCommand(Command):
name = "hsr patch type"
description = "Get the patch type of the game"
options = default_options
def handle(self):
callback(command=self)
self.line(f"Patch type: <comment>{patcher.patch_type.name}</comment>")
class UpdatePatchCommand(Command):
name = "hsr patch update"
description = "Updates the patch"
options = default_options
def handle(self):
callback(command=self)
progress = utils.ProgressIndicator(self)
progress.start("Updating patch... ")
try:
patcher.update_patch()
except PatchUpdateError as e:
progress.finish(
f"<error>Patch update failed with following error: {e} \n{traceback.format_exc()}</error>"
)
else:
progress.finish("<comment>Patch updated!</comment>")
class PatchInstallCommand(Command):
name = "hsr patch install"
description = "Installs the patch"
options = default_options
def jadeite(self):
progress = utils.ProgressIndicator(self)
progress.start("Installing patch... ")
try:
jadeite_dir = patcher.patch_game(game=State.game)
except PatcherError as e:
progress.finish(
f"<error>Patch installation failed with following error: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish("<comment>Patch installed!</comment>")
print()
exe_path = jadeite_dir.joinpath("jadeite.exe")
self.line(f"Jadeite executable is located at: <question>{exe_path}</question>")
self.line(
"You need to <warn>run the game using Jadeite</warn> to use the patch."
)
self.line(f'E.g: <question>{exe_path} "{State.game.path}"</question>')
print()
self.line(
"To activate the experimental patching method, set the environment variable BREAK_CATHACK=1"
)
self.line(
"Read more about it here: https://codeberg.org/mkrsym1/jadeite/issues/37"
)
print()
self.line(
"Please don't spread this project to public, we just want to play the game."
)
self.line(
"And for your own sake, please only <warn>use test accounts</warn>, as there is an <warn>extremely high risk of getting banned.</warn>"
)
def astra(self):
progress = utils.ProgressIndicator(self)
progress.start("Installing patch... ")
try:
patcher.patch_game(game=State.game)
except PatcherError as e:
progress.finish(
f"<error>Patch installation failed with following error: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish("<comment>Patch installed!</comment>")
self.line()
self.line(
"Please don't spread this project to public, we just want to play the game."
)
self.line(
"And for your own sake, please only use testing accounts, as there is an extremely high risk of getting banned."
)
def handle(self):
callback(command=self)
if system() == "Windows":
self.line(
"Windows is <comment>officialy supported</comment> by the game, so no patching is needed."
)
self.line(
"By patching the game, <warn>you are violating the ToS of the game.</warn>"
)
if not self.confirm("Do you want to patch the game?"):
self.line("<error>Patching aborted.</error>")
return
progress = utils.ProgressIndicator(self)
progress.start("Checking telemetry hosts... ")
telemetry_list = patcher.check_telemetry()
if telemetry_list:
progress.finish("<warn>Telemetry hosts were found.</warn>")
self.line("Below is the list of telemetry hosts that need to be blocked:")
print()
for host in telemetry_list:
self.line(f"{host}")
print()
self.line(
"To prevent the game from sending data about the patch, "
+ "we need to <comment>block these hosts.</comment>"
)
if not self.confirm("Do you want to block them?"):
self.line("<error>Patching aborted.</error>")
self.line(
"<error>Please block these hosts manually then try again.</error>"
)
return
try:
patcher.block_telemetry(telemetry_list=telemetry_list)
except Exception:
self.line_error(
f"<error>Couldn't block telemetry hosts: {traceback.format_exc()}</error>"
)
# There's a good reason for this.
if system() != "Windows":
self.line(
"<error>Cannot continue, please block them manually then try again.</error>"
)
return
self.line("<warn>Continuing anyway...</warn>")
else:
progress.finish("<comment>No telemetry hosts found.</comment>")
progress = utils.ProgressIndicator(self)
progress.start("Updating patch... ")
try:
patcher.update_patch()
except PatchUpdateError as e:
progress.finish(
f"<error>Patch update failed with following error: {e} \n{traceback.format_exc()}</error>"
)
else:
progress.finish("<comment>Patch updated.</comment>")
match patcher.patch_type:
case HSRPatchType.Jadeite:
self.jadeite()
case HSRPatchType.Astra:
self.astra()
PatchCommand = copy.deepcopy(PatchInstallCommand)
PatchCommand.name = "hsr patch"
class PatchTelemetryCommand(Command):
name = "hsr patch telemetry"
description = "Checks for telemetry hosts and block them."
options = default_options
def handle(self):
progress = utils.ProgressIndicator(self)
progress.start("Checking telemetry hosts... ")
telemetry_list = patcher.check_telemetry()
if telemetry_list:
progress.finish("<warn>Telemetry hosts were found.</warn>")
self.line("Below is the list of telemetry hosts that need to be blocked:")
print()
for host in telemetry_list:
self.line(f"{host}")
print()
self.line(
"To prevent the game from sending data about the patch, "
+ "we need to <comment>block these hosts.</comment>"
)
if not self.confirm("Do you want to block them?"):
self.line("<error>Blocking aborted.</error>")
return
try:
patcher.block_telemetry(telemetry_list=telemetry_list)
except Exception:
self.line_error(
f"<error>Couldn't block telemetry hosts: {traceback.format_exc()}</error>"
)
else:
progress.finish("<comment>No telemetry hosts found.</comment>")
class GetVersionCommand(Command):
name = "hsr version"
description = "Gets the local game version"
options = default_options
def handle(self):
callback(command=self)
try:
self.line(
f"<comment>Version:</comment> {'.'.join(str(x) for x in State.game.get_version())}"
)
except GameError as e:
self.line_error(f"<error>Couldn't get game version: {e}</error>")
class InstallCommand(Command):
name = "hsr install"
description = (
"Installs the latest version of the game to the specified path (default: current directory). "
+ "Note that this will not install the default voicepack (English), you need to install it manually."
)
options = default_options + [
option("pre-download", description="Pre-download the game if available"),
]
def handle(self):
callback(command=self)
pre_download = self.option("pre-download")
progress = utils.ProgressIndicator(self)
progress.start("Fetching install package information... ")
try:
game_info = State.game.get_remote_game(pre_download=pre_download)
except Exception as e:
progress.finish(
f"<error>Fetching failed with following error: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish(
"<comment>Installation information fetched successfully.</comment>"
)
if not self.confirm("Do you want to install the game?"):
self.line("<error>Installation aborted.</error>")
return
self.line("Downloading install package...")
first_pkg_out_path = None
for game_pkg in game_info.major.game_pkgs:
out_path = State.game.cache.joinpath(PurePath(game_pkg.url).name)
if not first_pkg_out_path:
first_pkg_out_path = out_path
try:
download_result = utils.download(
game_pkg.url, out_path, file_len=game_pkg.size
)
except Exception as e:
self.line_error(
f"<error>Couldn't download install package: {e}</error>"
)
return
if not download_result:
self.line_error("<error>Download failed.</error>")
return
self.line("Download completed.")
progress = utils.ProgressIndicator(self)
progress.start("Installing package...")
try:
State.game.install_archive(first_pkg_out_path)
except Exception as e:
progress.finish(
f"<error>Couldn't install package: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish("<comment>Package applied for the base game.</comment>")
self.line("Setting version config... ")
State.game.version_override = game_info.major.version
set_version_config(self=self)
State.game.version_override = None
self.line(
f"The game has been installed to version: <comment>{State.game.get_version_str()}</comment>"
)
class UpdateCommand(Command):
name = "hsr update"
description = "Updates the local game if available"
options = default_options + [
option(
"auto-repair", "R", description="Automatically repair the game if needed"
),
option("pre-download", description="Pre-download the game if available"),
option(
"from-version", description="Update from a specific version", flag=False
),
]
def handle(self):
callback(command=self)
auto_repair = self.option("auto-repair")
pre_download = self.option("pre-download")
from_version = self.option("from-version")
if auto_repair:
self.line("<comment>Auto-repair is enabled.</comment>")
if from_version:
self.line(f"Updating from version: <comment>{from_version}</comment>")
State.game.version_override = from_version
progress = utils.ProgressIndicator(self)
progress.start("Checking for updates... ")
try:
update_diff = State.game.get_update(pre_download=pre_download)
game_info = State.game.get_remote_game(pre_download=pre_download)
except Exception as e:
progress.finish(
f"<error>Update checking failed with following error: {e} \n{traceback.format_exc()}</error>"
)
return
if update_diff is None or isinstance(game_info.major, str | None):
progress.finish("<comment>Game is already updated.</comment>")
return
progress.finish("<comment>Update available.</comment>")
self.line(
f"The current version is: <comment>{State.game.get_version_str()}</comment>"
)
self.line(
f"The latest version is: <comment>{game_info.major.version}</comment>"
)
if not self.confirm("Do you want to update the game?"):
self.line("<error>Update aborted.</error>")
return
self.line("Downloading update package...")
update_game_url = update_diff.game_pkgs[0].url
out_path = State.game.cache.joinpath(PurePath(update_game_url).name)
try:
download_result = utils.download(
update_game_url, out_path, file_len=update_diff.game_pkgs[0].size
)
except Exception as e:
self.line_error(f"<error>Couldn't download update: {e}</error>")
return
if not download_result:
self.line_error("<error>Download failed.</error>")
return
self.line("Download completed.")
progress = utils.ProgressIndicator(self)
progress.start("Applying update package...")
try:
State.game.apply_update_archive(out_path, auto_repair=auto_repair)
except Exception as e:
progress.finish(
f"<error>Couldn't apply update: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish("<comment>Update applied for base game.</comment>")
# Get installed voicepacks
installed_voicepacks = State.game.get_installed_voicepacks()
# Voicepack update
for remote_voicepack in update_diff.audio_pkgs:
if remote_voicepack.language not in installed_voicepacks:
continue
# Voicepack is installed, update it
archive_file = State.game.cache.joinpath(
PurePath(remote_voicepack.url).name
)
self.line(
f"Downloading update package for voicepack language '{remote_voicepack.language.name}'..."
)
try:
download_result = utils.download(
remote_voicepack.url, archive_file, file_len=remote_voicepack.size
)
except Exception as e:
self.line_error(f"<error>Couldn't download update: {e}</error>")
return
if not download_result:
self.line_error("<error>Download failed.</error>")
return
self.line("Download completed.")
progress = utils.ProgressIndicator(self)
progress.start("Applying update package...")
try:
State.game.apply_update_archive(
archive_file=archive_file, auto_repair=auto_repair
)
except Exception as e:
progress.finish(
f"<error>Couldn't apply update: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish(
f"<comment>Update applied for language {remote_voicepack.language.name}.</comment>"
)
self.line("Setting version config... ")
State.game.version_override = game_info.major.version
set_version_config(self=self)
State.game.version_override = None
self.line(
f"The game has been updated to version: <comment>{State.game.get_version_str()}</comment>"
)
class RepairCommand(Command):
name = "hsr repair"
description = "Tries to repair the local game"
options = default_options
def handle(self):
callback(command=self)
self.line(
"This command will try to repair the game by downloading missing/broken files."
)
self.line(
"There will be no progress available, so please be patient and just wait."
)
if not self.confirm(
"Do you want to repair the game (this will take a long time!)?"
):
self.line("<error>Repairation aborted.</error>")
return
progress = utils.ProgressIndicator(self)
progress.start("Repairing game files (no progress available)... ")
try:
State.game.repair_game()
except Exception as e:
progress.finish(
f"<error>Repairation failed with following error: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish("<comment>Repairation completed.</comment>")
class InstallDownloadCommand(Command):
name = "hsr install download"
description = (
"Downloads the latest version of the game. "
+ "Note that this will not download the default voicepack (English), you need to download it manually."
)
options = default_options + [
option("pre-download", description="Pre-download the game if available"),
]
def handle(self):
callback(command=self)
pre_download = self.option("pre-download")
progress = utils.ProgressIndicator(self)
progress.start("Fetching install package information... ")
try:
game_info = State.game.get_remote_game(pre_download=pre_download)
except Exception as e:
progress.finish(
f"<error>Fetching failed with following error: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish(
"<comment>Installation information fetched successfully.</comment>"
)
if not self.confirm("Do you want to download the game?"):
self.line("<error>Download aborted.</error>")
return
self.line("Downloading install package...")
first_pkg_out_path = None
for game_pkg in game_info.major.game_pkgs:
out_path = State.game.cache.joinpath(PurePath(game_pkg.url).name)
if not first_pkg_out_path:
first_pkg_out_path = out_path
try:
download_result = utils.download(
game_pkg.url, out_path, file_len=game_pkg.size
)
except Exception as e:
self.line_error(
f"<error>Couldn't download install package: {e}</error>"
)
return
if not download_result:
self.line_error("<error>Download failed.</error>")
return
self.line("Download completed.")
class UpdateDownloadCommand(Command):
name = "hsr update download"
description = "Download the update for the local game if available"
options = default_options + [
option(
"auto-repair", "R", description="Automatically repair the game if needed"
),
option("pre-download", description="Pre-download the game if available"),
option(
"from-version", description="Update from a specific version", flag=False
),
]
def handle(self):
callback(command=self)
auto_repair = self.option("auto-repair")
pre_download = self.option("pre-download")
from_version = self.option("from-version")
if auto_repair:
self.line("<comment>Auto-repair is enabled.</comment>")
if from_version:
self.line(f"Updating from version: <comment>{from_version}</comment>")
State.game.version_override = from_version
progress = utils.ProgressIndicator(self)
progress.start("Checking for updates... ")
try:
update_diff = State.game.get_update(pre_download=pre_download)
game_info = State.game.get_remote_game(pre_download=pre_download)
except Exception as e:
progress.finish(
f"<error>Update checking failed with following error: {e} \n{traceback.format_exc()}</error>"
)
return
if update_diff is None or isinstance(game_info.major, str | None):
progress.finish("<comment>Game is already updated.</comment>")
return
progress.finish("<comment>Update available.</comment>")
self.line(
f"The current version is: <comment>{State.game.get_version_str()}</comment>"
)
self.line(
f"The latest version is: <comment>{game_info.major.version}</comment>"
)
if not self.confirm("Do you want to download the update?"):
self.line("<error>Download aborted.</error>")
return
self.line("Downloading update package...")
update_game_url = update_diff.game_pkgs[0].url
out_path = State.game.cache.joinpath(PurePath(update_game_url).name)
try:
download_result = utils.download(
update_game_url, out_path, file_len=update_diff.game_pkgs[0].size
)
except Exception as e:
self.line_error(
f"<error>Couldn't download update: {e} \n{traceback.format_exc()}</error>"
)
return
if not download_result:
self.line_error("<error>Download failed.</error>")
return
self.line("Download completed.")
# Get installed voicepacks
installed_voicepacks = State.game.get_installed_voicepacks()
# Voicepack update
for remote_voicepack in update_diff.audio_pkgs:
if remote_voicepack.language not in installed_voicepacks:
continue
# Voicepack is installed, update it
archive_file = State.game.cache.joinpath(
PurePath(remote_voicepack.url).name
)
self.line(
f"Downloading update package for voicepack language '{remote_voicepack.language.name}'..."
)
try:
download_result = utils.download(
remote_voicepack.url, archive_file, file_len=remote_voicepack.size
)
except Exception as e:
self.line_error(f"<error>Couldn't download update: {e}</error>")
return
if not download_result:
self.line_error("<error>Download failed.</error>")
return
self.line("Download completed.")
class ApplyInstallArchive(Command):
name = "hsr install apply-archive"
description = "Applies the install archive"
arguments = [argument("path", description="Path to the install archive")]
options = default_options
def handle(self):
callback(command=self)
install_archive = self.argument("path")
progress = utils.ProgressIndicator(self)
progress.start("Applying install package...")
try:
State.game.install_archive(install_archive)
except Exception as e:
progress.finish(
f"<error>Couldn't apply package: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish("<comment>Package applied.</comment>")
set_version_config(self=self)
class ApplyUpdateArchive(Command):
name = "hsr update apply-archive"
description = "Applies the update archive to the local game"
arguments = [argument("path", description="Path to the update archive")]
options = default_options + [
option(
"auto-repair", "R", description="Automatically repair the game if needed"
),
]
def handle(self):
callback(command=self)
update_archive = self.argument("path")
auto_repair = self.option("auto-repair")
progress = utils.ProgressIndicator(self)
progress.start("Applying update package...")
try:
State.game.apply_update_archive(update_archive, auto_repair=auto_repair)
except Exception as e:
progress.finish(
f"<error>Couldn't apply update: {e} \n{traceback.format_exc()}</error>"
)
return
progress.finish("<comment>Update applied.</comment>")
set_version_config(self=self)
# This is the list for HSR commands, we'll add Genshin commands later
classes = [
ApplyInstallArchive,
ApplyUpdateArchive,
GetVersionCommand,
InstallCommand,
InstallDownloadCommand,
PatchCommand,
PatchInstallCommand,
PatchTelemetryCommand,
PatchTypeCommand,
# RepairCommand,
UpdatePatchCommand,
UpdateCommand,
UpdateDownloadCommand,
VoicepackInstall,
VoicepackList,
VoicepackListInstalled,
VoicepackUpdate,
]
exports = []
for command in classes:
init_command = command()
exports.append(init_command)
if "patch" in command.name:
continue
command_name = command.name[4:]
genshin_init_command = copy.deepcopy(init_command)
genshin_init_command.name = f"genshin {command_name}"
exports.append(genshin_init_command)
zzz_init_command = copy.deepcopy(init_command)
zzz_init_command.name = f"zzz {command_name}"
exports.append(zzz_init_command)

View File

@ -1,117 +0,0 @@
from traceback import print_exc
from platform import system
from vollerei.cli.utils import ask
from vollerei.hsr import Game, Patcher
from vollerei.exceptions.patcher import PatcherError, PatchUpdateError
from vollerei.hsr.patcher import PatchType
class HSR:
def __init__(
self, game_path=None, patch_type: PatchType = PatchType.Jadeite
) -> None:
self._game = Game(game_path)
print("Game directory:", self._game.path)
print("Game version:", self._game.get_version_str())
self._patcher = Patcher()
self._patcher.patch_type = patch_type
# Double _ means private to prevent Fire from invoking it
def patch_type(self):
print("Patch type:", self._patcher.patch_type.name)
def __update_patch(self):
self.patch_type()
print("Updating patch...", end=" ")
try:
self._patcher.update_patch()
except PatchUpdateError as e:
print("FAILED")
print(f"Patch update failed with following error: {e} ({e.__context__})")
return False
print("OK")
return True
def update_patch(self):
self.__update_patch()
def __patch_jadeite(self):
try:
print("Installing patch...", end=" ")
jadeite_dir = self._patcher.patch_game(game=self._game)
except PatcherError as e:
print("FAILED")
print(f"Patching failed with following error: {e}")
return
print("OK")
exe_path = jadeite_dir.joinpath("jadeite.exe")
print("Jadeite executable is located at:", exe_path)
print()
print("=" * 15)
print(
"Installation succeeded, but note that you need to run the game using "
+ "Jadeite to use the patch."
)
print()
print(f'E.g: I_WANT_A_BAN=1 {exe_path} "{self._game.path}"')
print()
print(
"Please don't spread this project to public, we just want to play the game."
)
print(
"And for your own sake, please only use testing accounts, as there is an "
+ "extremely high risk of getting banned."
)
print("=" * 15)
def __patch_astra(self):
try:
print("Patching game...", end=" ")
self._patcher.patch_game(game=self._game)
except PatcherError as e:
print("FAILED")
print(f"Patching failed with following error: {e}")
return
print("OK")
def patch(self):
if system() == "Windows":
print(
"Windows is supported officialy by the game, so no patching is needed."
)
print("By patching you are breaking the ToS, use at your own risk.")
if not ask("Do you want to patch the game?"):
print("Patching aborted.")
return
print("Checking telemetry hosts...", end=" ")
telemetry_list = self._patcher.check_telemetry()
if telemetry_list:
print("FOUND")
print("Telemetry hosts found: ")
for host in telemetry_list:
print(f" - {host}")
print(
"To prevent the game from sending data about the patch, "
+ "we need to block these hosts."
)
if not ask("Do you want to block these hosts?"):
print("Patching aborted.")
print("Please block these hosts manually then try again.")
return
try:
self._patcher.block_telemetry(telemetry_list=telemetry_list)
except Exception as e:
print("Couldn't block telemetry hosts:", e)
if system() != "Windows":
print("Cannot continue, please block them manually then try again.")
return
print("Continuing anyway...")
else:
print("OK")
if not self.__update_patch():
return
match self._patcher.patch_type:
case PatchType.Jadeite:
self.__patch_jadeite()
case PatchType.Astra:
self.__patch_astra()

View File

@ -1,14 +1,101 @@
import requests
from cleo.commands.command import Command
from pathlib import Path
from threading import Thread
from time import sleep
from tqdm import tqdm
no_confirm = False
silent_message = False
def ask(question: str):
if no_confirm:
print(question + " [Y/n]: Y")
def args_to_kwargs(args: list):
"""
Convert a list of arguments to a dict of keyword arguments.
"""
kwargs = {}
cur_key = None
for arg in args:
if "--" == arg[:2]:
arg_key = arg[2:].replace("-", "_")
kwargs[arg_key] = True
cur_key = arg_key
elif cur_key:
kwargs[cur_key] = arg
return kwargs
class ProgressIndicator:
def auto_advance(self):
"""
Automatically advance the progress indicator.
"""
while self.progress._started:
self.progress.advance()
sleep(self.progress._interval / 1000)
def __init__(
self, command: Command, interval: int = None, values: list[str] = None
):
self.command = command
if not interval:
interval = 100
if not values:
values = ["", "", "", "", "", "", "", "", "", ""]
self.progress = self.command.progress_indicator(
interval=interval, values=values
)
self.thread = Thread(target=self.auto_advance)
self.thread.daemon = True
def start(self, message: str):
"""
Start the progress indicator.
"""
self.progress.start(message)
self.thread.start()
def finish(self, message: str, reset_indicator=False):
"""
Finish the progress indicator.
"""
self.progress.finish(message=message, reset_indicator=reset_indicator)
def download(url, out: Path, file_len: int = None, overwrite: bool = False) -> bool:
if overwrite:
out.unlink(missing_ok=True)
headers = {}
if out.exists():
cur_len = (out.stat()).st_size
headers |= {"Range": f"bytes={cur_len}-{file_len if file_len else ''}"}
else:
out.touch()
# Streaming, so we can iterate over the response.
response = requests.get(url=url, headers=headers, stream=True)
if response.status_code == 416:
return True
while True:
answer = input(question + " [Y/n]: ")
if answer.lower().strip() in ["y", "yes", ""]:
return True
# Pacman way, treat all other answers as no
else:
return False
response.raise_for_status()
# Sizes in bytes.
total_size = int(response.headers.get("content-length", 0))
block_size = 32768
with tqdm(total=total_size, unit="KB", unit_scale=True) as progress_bar:
with out.open("ab") as file:
for data in response.iter_content(block_size):
progress_bar.update(len(data))
file.write(data)
if total_size != 0 and progress_bar.n != total_size:
return False
return True
def msg(*args, **kwargs):
"""
Print but silentable
"""
if silent_message:
return
print(*args, **kwargs)

View File

@ -1,4 +1,35 @@
from vollerei.common.api.resource import Resource
import requests
from vollerei.common.api import resource
from vollerei.common.enums import GameChannel
from vollerei.constants import LAUNCHER_API
__all__ = ["Resource"]
__all__ = ["GamePackage"]
def get_game_packages(
channel: GameChannel = GameChannel.Overseas,
) -> list[resource.GameInfo]:
"""
Get game packages information from the launcher API.
Default channel is overseas.
Args:
channel: Game channel to get the resource information from.
Returns:
Resource: Game resource information.
"""
resource_path: dict = None
match channel:
case GameChannel.Overseas:
resource_path = LAUNCHER_API.OS
case GameChannel.China:
resource_path = LAUNCHER_API.CN
return resource.from_dict(
requests.get(
resource_path["url"] + LAUNCHER_API.RESOURCE_PATH,
params=resource_path["params"],
).json()["data"]
)

View File

@ -1,231 +1,156 @@
"""
Class wrapper for API endpoint /resource
"""
class Segment:
path: str
md5: str
# str -> int and checked if int is 0 then None
package_size: int | None
class VoicePack:
"""
Voice pack information
`name` maybe converted from `path` if the server returns empty string.
Attributes:
TODO
"""
language: str
name: str
path: str
# str -> int
size: int
md5: str
# str -> int
package_size: int
class Diff:
"""
Game resource diff from a version to latest information
Attributes:
TODO
"""
name: str
version: str
path: str
# str -> int
size: int
md5: str
is_recommended_update: bool
voice_packs: list[VoicePack]
# str -> int
package_size: int
class Latest:
"""
Latest game resource information
`name` maybe converted from `path` if the server returns empty string,
and if `path` is empty too then it'll convert the name from the first
segment of `segments` list.
Attributes:
TODO
"""
name: str
version: str
path: str
# str -> int
size: int
md5: str
entry: str
voice_packs: list[VoicePack]
# str but checked for empty string
decompressed_path: str | None
segments: list[Segment]
# str -> int
package_size: int
def __init__(
self,
name: str,
version: str,
path: str,
size: int,
md5: str,
entry: str,
voice_packs: list[VoicePack],
decompressed_path: str | None,
segments: list[Segment],
package_size: int,
) -> None:
self.name = name
self.version = version
self.path = path
self.size = size
self.md5 = md5
self.entry = entry
self.voice_packs = voice_packs
self.decompressed_path = decompressed_path
self.segments = segments
self.package_size = package_size
@staticmethod
def from_dict(data: dict) -> "Latest":
if data["name"] == "":
if data["path"] == "":
data["name"] = data["segments"][0]["path"].split("/")[-1]
else:
data["name"] = data["path"].split("/")[-1]
return Latest(
data["name"],
data["version"],
data["path"],
data["size"],
data["md5"],
data["entry"],
[VoicePack.from_dict(i) for i in data["voice_packs"]],
data["decompressed_path"],
[Segment.from_dict(i) for i in data["segments"]],
data["package_size"],
)
from vollerei.common.enums import VoicePackLanguage
from typing import Union
class Game:
latest: Latest
diffs: list[Diff]
def __init__(self, latest: Latest, diffs: list[Diff]) -> None:
self.latest = latest
self.diffs = diffs
def __init__(self, id: str, biz: str):
self.id = id
self.biz = biz
@staticmethod
def from_dict(data: dict) -> "Game":
return Game(
Latest.from_dict(data["latest"]), [Diff.from_dict(i) for i in data["diffs"]]
)
return Game(id=data["id"], biz=data["biz"])
class Plugin:
name: str
# str but checked for empty string
version: str | None
path: str
# str -> int
size: int
md5: str
# str but checked for empty string
entry: str | None
# str -> int
package_size: int
class LauncherPlugin:
plugins: list[Plugin]
# str -> int
version: int
class DeprecatedPackage:
name: str
md5: str
class DeprecatedFile:
path: str
# str but checked for empty string
md5: str | None
class Resource:
"""
Data class for /resource endpoint
"""
# I'm generous enough to convert the string into int
# for you guys, wtf Mihoyo?
game: Game
# ?? Mihoyo for plugin["plugins"] which is a list of Plugin objects
plugin: LauncherPlugin
web_url: str
# ?? Mihoyo
force_update: None
# Will be a Game object if a pre-download is available.
pre_download_game: Game | None
deprecated_packages: list[DeprecatedPackage]
# Maybe a SDK for Bilibili version in Genshin?
sdk: None
deprecated_files: list[DeprecatedFile]
def __init__(
self,
game: Game,
plugin: Plugin,
web_url: str,
force_update: None,
pre_download_game: Game | None,
deprecated_packages: list[DeprecatedPackage],
sdk: None,
deprecated_files: list[DeprecatedFile],
) -> None:
# Fixups
game_latest_path = game.latest.path
if game.latest.name == "":
print("A")
if game_latest_path == "":
game.latest.name = game.latest.segments[0].path.split("/")[-1]
else:
game.latest.name = game_latest_path.split("/")[-1]
self.game = game
self.plugin = plugin
self.web_url = web_url
self.force_update = force_update
self.pre_download_game = pre_download_game
self.deprecated_packages = deprecated_packages
self.sdk = sdk
self.deprecated_files = deprecated_files
class GamePackage:
def __init__(self, url: str, md5: str, size: int, decompressed_size: int):
self.url = url
self.md5 = md5
self.size = size
self.decompressed_size = decompressed_size
@staticmethod
def from_dict(json: dict) -> "Resource":
return Resource(
Game.from_dict(json["game"]),
LauncherPlugin.from_dict(json["plugin"]),
json["web_url"],
json["force_update"],
Game.from_dict(json["pre_download_game"])
if json["pre_download_game"]
else None,
[DeprecatedPackage.from_dict(x) for x in json["deprecated_packages"]],
json["sdk"],
[DeprecatedFile.from_dict(x) for x in json["deprecated_files"]],
def from_dict(data: list[dict]) -> list["GamePackage"]:
game_pkgs = []
for pkg in data:
game_pkgs.append(
GamePackage(
url=pkg["url"],
md5=pkg["md5"],
size=int(pkg["size"]),
decompressed_size=int(pkg["decompressed_size"]),
)
)
return game_pkgs
class AudioPackage:
def __init__(
self,
language: VoicePackLanguage,
url: str,
md5: str,
size: int,
decompressed_size: int,
):
self.language = language
self.url = url
self.md5 = md5
self.size = size
self.decompressed_size = decompressed_size
@staticmethod
def from_dict(data: list[dict]) -> "AudioPackage":
audio_pkgs = []
for pkg in data:
audio_pkgs.append(
AudioPackage(
language=VoicePackLanguage.from_remote_str(pkg["language"]),
url=pkg["url"],
md5=pkg["md5"],
size=int(pkg["size"]),
decompressed_size=int(pkg["decompressed_size"]),
)
)
return audio_pkgs
class Major:
def __init__(
self,
version: str,
game_pkgs: list[GamePackage],
audio_pkgs: list[AudioPackage],
res_list_url: str,
):
self.version = version
self.game_pkgs = game_pkgs
self.audio_pkgs = audio_pkgs
self.res_list_url = res_list_url
@staticmethod
def from_dict(data: dict) -> "Major":
return Major(
version=data["version"],
game_pkgs=GamePackage.from_dict(data["game_pkgs"]),
audio_pkgs=AudioPackage.from_dict(data["audio_pkgs"]),
res_list_url=data["res_list_url"],
)
# Currently patch has the same fields as major
Patch = Major
class Main:
def __init__(self, major: Major, patches: list[Patch]):
self.major = major
self.patches = patches
@staticmethod
def from_dict(data: dict) -> "Main":
return Main(
major=Major.from_dict(data["major"]),
patches=[Patch.from_dict(x) for x in data["patches"]],
)
class PreDownload:
def __init__(self, major: Major | str | None, patches: list[Patch]):
self.major = major
self.patches = patches
# Union to fix the typing issue.
@staticmethod
def from_dict(data: dict | None) -> Union["PreDownload", None]:
# pre_download can be null in the server for certain games
# e.g. HI3:
# "pre_download": null
# while in GI it is the following:
# "pre_download": {
# "major": null,
# "patches": []
# }
if data is None:
return None
return PreDownload(
major=(
data["major"]
if isinstance(data["major"], str | None)
else Major.from_dict(data["major"])
),
patches=[Patch.from_dict(x) for x in data["patches"]],
)
# Why miHoYo uses the same name "game_packages" for this big field and smol field
class GameInfo:
def __init__(self, game: Game, main: Main, pre_download: PreDownload):
self.game = game
self.main = main
self.pre_download = pre_download
@staticmethod
def from_dict(data: dict) -> "GameInfo":
return GameInfo(
game=Game.from_dict(data["game"]),
main=Main.from_dict(data["main"]),
pre_download=PreDownload.from_dict(data["pre_download"]),
)
def from_dict(data: dict) -> list[GameInfo]:
game_pkgs = []
for pkg in data["game_packages"]:
game_pkgs.append(GameInfo.from_dict(pkg))
return game_pkgs

59
vollerei/common/enums.py Normal file
View File

@ -0,0 +1,59 @@
from enum import Enum
class GameType(Enum):
Genshin = 0
HSR = 1
ZZZ = 3
HI3 = 4
class GameChannel(Enum):
Overseas = 0
China = 1
class VoicePackLanguage(Enum):
Japanese = "ja-jp"
Chinese = "zh-cn"
Taiwanese = "zh-tw"
Korean = "ko-kr"
English = "en-us"
@staticmethod
def from_remote_str(s: str) -> "VoicePackLanguage":
"""
Converts a language string from remote server to a VoicePackLanguage enum.
"""
if s == "ja-jp":
return VoicePackLanguage.Japanese
elif s == "zh-cn":
return VoicePackLanguage.Chinese
elif s == "zh-tw":
return VoicePackLanguage.Taiwanese
elif s == "ko-kr":
return VoicePackLanguage.Korean
elif s == "en-us":
return VoicePackLanguage.English
else:
raise ValueError(f"Invalid language string: {s}")
@staticmethod
def from_zzz_name(s: str) -> "VoicePackLanguage":
"""
Converts a language string from ZZZ file name to a VoicePackLanguage enum.
Only English is tested for now.
"""
if s == "Jp":
return VoicePackLanguage.Japanese
elif s == "Cn":
return VoicePackLanguage.Chinese
elif s == "Tw":
return VoicePackLanguage.Taiwanese
elif s == "Kr":
return VoicePackLanguage.Korean
elif s == "En":
return VoicePackLanguage.English
else:
raise ValueError(f"Invalid language string: {s}")

View File

@ -0,0 +1,403 @@
import concurrent.futures
import json
import hashlib
import multivolumefile
import py7zr
import zipfile
from io import IOBase
from os import PathLike
from pathlib import Path
from shutil import move
from vollerei.abc.launcher.game import GameABC
from vollerei.common.api import resource
from vollerei.exceptions.game import (
RepairError,
GameNotInstalledError,
ScatteredFilesNotAvailableError,
)
from vollerei.utils import HDiffPatch, HPatchZPatchError, download
_hdiff = HDiffPatch()
def _extract_files(
archive: py7zr.SevenZipFile | zipfile.ZipFile, files, path: PathLike
):
if isinstance(archive, py7zr.SevenZipFile):
# .7z archive
archive.extract(path, files)
else:
# .zip archive
archive.extractall(path, files)
def _open_archive(file: Path | IOBase) -> py7zr.SevenZipFile | zipfile.ZipFile:
archive: py7zr.SevenZipFile | zipfile.ZipFile = None
try:
archive = py7zr.SevenZipFile(file, "r")
except py7zr.exceptions.Bad7zFile:
# Try to open it as a zip file
try:
archive = zipfile.ZipFile(file, "r")
except zipfile.BadZipFile:
raise ValueError("Archive is not a valid 7z or zip file.")
return archive
def apply_update_archive(
game: GameABC, archive_file: Path | IOBase, auto_repair: bool = True
) -> None:
"""
Applies an update archive to the game, it can be the game update or a
voicepack update.
Because this function is shared for all games, you should use the game's
`apply_update_archive()` method instead, which additionally applies required
methods for that game.
"""
# Most code here are copied from worthless-launcher.
# worthless-launcher uses asyncio for multithreading while this one uses
# ThreadPoolExecutor, probably better for this use case.
# We need `game` for the path and `auto_repair` for the auto repair option.
# Install HDiffPatch
_hdiff.hpatchz()
# Open archive
def reset_if_py7zr(archive):
if isinstance(archive, py7zr.SevenZipFile):
archive.reset()
archive = _open_archive(archive_file)
# Get files list (we don't want to extract all of them)
files = archive.namelist()
# Don't extract these files (they're useless and if the game isn't patched then
# it'll raise 31-4xxx error in Genshin)
for file in ["deletefiles.txt", "hdifffiles.txt", "hdiffmap.json"]:
try:
files.remove(file)
except ValueError:
pass
# Think for me a better name for this variable
txtfiles = None
if isinstance(archive, py7zr.SevenZipFile):
txtfiles = archive.read(["deletefiles.txt", "hdifffiles.txt", "hdiffmap.json"])
# Reset archive to extract files
archive.reset()
try:
# miHoYo loves CRLF
if txtfiles is not None:
deletebytes = txtfiles["deletefiles.txt"].read()
else:
deletebytes = archive.read("deletefiles.txt")
if deletebytes is not str:
# Typing
deletebytes: bytes
deletebytes = deletebytes.decode()
deletefiles = deletebytes.split("\r\n")
except (IOError, KeyError):
pass
else:
for file_str in deletefiles:
file = game.path.joinpath(file_str)
if file == game.path:
# Don't delete the game folder
continue
if not file.relative_to(game.path):
# File is not in the game folder
continue
# Delete the file
file.unlink(missing_ok=True)
# hdiffpatch implementation
# Read hdifffiles.txt to get the files to patch
# Hdifffile format is [(source file, target file)]
# While the patch file is named as target file + ".hdiff"
hdifffiles: list[tuple[str, str]] = []
new_hdiff_map = False
if txtfiles is not None:
old_hdiff_map = txtfiles.get("hdifffiles.txt")
if old_hdiff_map is not None:
hdiffbytes = old_hdiff_map.read()
else:
new_hdiff_map = True
hdiffbytes = txtfiles["hdiffmap.json"].read()
else:
# Archive file must be a zip file
if zipfile.Path(archive).joinpath("hdifffiles.txt").is_file():
hdiffbytes = archive.read("hdifffiles.txt")
else:
new_hdiff_map = True
hdiffbytes = archive.read("hdiffmap.json")
if hdiffbytes is not str:
# Typing
hdiffbytes: bytes
hdiffbytes = hdiffbytes.decode()
if new_hdiff_map:
mapping = json.loads(hdiffbytes)
for diff in mapping["diff_map"]:
hdifffiles.append((diff["source_file_name"], diff["target_file_name"]))
else:
for x in hdiffbytes.split("\r\n"):
try:
name = json.loads(x.strip())["remoteName"]
hdifffiles.append((name, name))
except json.JSONDecodeError:
pass
# Patch function
def patch(source_file: Path, target_file: Path, patch_file: str):
patch_path = game.cache.joinpath(patch_file)
# Spaghetti code :(, fuck my eyes.
bak_src_file = source_file.rename(
source_file.with_suffix(source_file.suffix + ".bak")
)
try:
_hdiff.patch_file(bak_src_file, target_file, patch_path)
except HPatchZPatchError:
if auto_repair:
try:
# The game repairs file by downloading the latest file, in this case we want the target file
# instead of source file. Honestly I haven't tested this but I hope it works.
game.repair_file(target_file)
except Exception:
# Let the game download the file.
bak_src_file.rename(file.with_suffix(""))
else:
bak_src_file.unlink()
else:
# Let the game download the file.
bak_src_file.rename(file.with_suffix(""))
return
else:
# Remove old file, since we don't need it anymore.
bak_src_file.unlink()
finally:
patch_path.unlink()
# Multi-threaded patching
patch_jobs = []
patch_files = []
for source_file, target_file in hdifffiles:
source_path = game.path.joinpath(source_file)
if not source_path.exists():
# Not patching since we don't have the file
continue
target_path = game.path.joinpath(target_file)
patch_file: str = target_file + ".hdiff"
# Remove hdiff files from files list to extract
files.remove(patch_file)
# Add file to extract list
patch_files.append(patch_file)
patch_jobs.append([patch, [source_path, target_path, patch_file]])
# Extract patch files to temporary dir
_extract_files(archive, patch_files, game.cache)
reset_if_py7zr(archive) # For the next extraction
# Create new ThreadPoolExecutor for patching
patch_executor = concurrent.futures.ThreadPoolExecutor()
for job in patch_jobs:
patch_executor.submit(job[0], *job[1])
patch_executor.shutdown(wait=True)
# Extract files from archive after we have filtered out the patch files
_extract_files(archive, files, game.path)
# Close the archive
archive.close()
def install_archive(game: GameABC, archive_file: Path | IOBase) -> None:
"""
Applies an install archive to the game, it can be the game itself or a
voicepack one.
Args:
game (GameABC): The game to install the archive for.
archive_file (Path | IOBase): The archive file to install, if it's a
split archive then this is the first part.
Because this function is shared for all games, you should use the game's
`install_archive()` method instead, which additionally applies required
methods for that game.
"""
archive: py7zr.SevenZipFile | zipfile.ZipFile = None
archive_path = Path(archive_file)
target_archive = None
if archive_path.suffix == ".001":
archive_path_merged = archive_path.with_suffix("")
target_archive = multivolumefile.open(archive_path_merged, mode='rb')
if archive_path_merged.suffix == ".zip":
# .zip split archive
archive = zipfile.ZipFile(target_archive, "r")
else:
archive = py7zr.SevenZipFile(target_archive, "r")
else:
archive = _open_archive(archive_file)
archive.extractall(game.path)
archive.close()
if target_archive:
target_archive.close()
def _repair_file(game: GameABC, file: PathLike, game_info: resource.Main) -> None:
# .replace("\\", "/") is needed because Windows uses backslashes :)
relative_file = file.relative_to(game.path)
url = game_info.major.res_list_url + "/" + str(relative_file).replace("\\", "/")
# Backup the file
if file.exists():
backup_file = file.with_suffix(file.suffix + ".bak")
if backup_file.exists():
backup_file.unlink()
file.rename(backup_file)
dest_file = file.with_suffix("")
else:
dest_file = file
try:
# Download the file
temp_file = game.cache.joinpath(relative_file)
dest_file.parent.mkdir(parents=True, exist_ok=True)
print(f"Downloading repair file {url} to {temp_file}")
download(url, temp_file, overwrite=True, stream=True)
# Move the file
move(temp_file, dest_file)
print("OK")
except Exception as e:
# Restore the backup
print("Failed", e)
if file.exists():
file.rename(file.with_suffix(""))
raise e
# Delete the backup
if file.exists():
file.unlink(missing_ok=True)
def repair_files(
game: GameABC,
files: list[PathLike],
pre_download: bool = False,
game_info: resource.Game = None,
) -> None:
"""
Repairs multiple game files.
This will automatically handle backup and restore the file if the repair
fails.
Args:
game (GameABC): The game to repair the files for.
files (PathLike): The files to repair.
pre_download (bool): Whether to get the pre-download version.
Defaults to False.
"""
if not game.is_installed():
raise GameNotInstalledError("Game is not installed.")
files_path = [Path(file) for file in files]
for file in files_path:
if not file.is_relative_to(game.path):
raise ValueError("File is not in the game folder.")
if not game_info:
game_info = game.get_remote_game(pre_download=pre_download)
if game_info.latest.decompressed_path is None:
raise ScatteredFilesNotAvailableError("Scattered files are not available.")
executor = concurrent.futures.ThreadPoolExecutor()
for file in files_path:
executor.submit(_repair_file, file, game=game_info)
# self._repair_file(file, game=game)
executor.shutdown(wait=True)
def repair_game(
game: GameABC,
pre_download: bool = False,
) -> None:
"""
Tries to repair the game by reading "pkg_version" file and downloading the
mismatched files from the server.
Because this function is shared for all games, you should use the game's
`repair_game()` method instead, which additionally applies required
methods for that game.
"""
# Most code here are copied from worthless-launcher.
# worthless-launcher uses asyncio for multithreading while this one uses
# ThreadPoolExecutor, probably better for this use case.
if not game.is_installed():
raise GameNotInstalledError("Game is not installed.")
game_info = game.get_remote_game(pre_download=pre_download)
pkg_version_file = game.path.joinpath("pkg_version")
pkg_version: dict[str, dict[str, str]] = {}
if not pkg_version_file.is_file():
try:
game.repair_file(game.path.joinpath("pkg_version"), game_info=game_info)
except Exception as e:
raise RepairError(
"pkg_version file not found, most likely you need to download the full game again."
) from e
with pkg_version_file.open("r") as f:
for line in f.readlines():
line = line.strip()
if not line:
continue
line_json = json.loads(line)
pkg_version[line_json["remoteName"]] = {
"md5": line_json["md5"],
"fileSize": line_json["fileSize"],
}
read_needed_files: list[Path] = []
target_files: list[Path] = []
repair_executor = concurrent.futures.ThreadPoolExecutor()
for file in game.path.rglob("*"):
# Ignore webCaches folder (because it's user data)
if file.is_dir():
continue
if "webCaches" in str(file):
continue
def verify(file_path: Path):
nonlocal target_files
nonlocal pkg_version
relative_path = file_path.relative_to(game.path)
relative_path_str = str(relative_path).replace("\\", "/")
# print(relative_path_str)
# Wtf mihoyo, you build this game for Windows and then use Unix path separator :moyai:
try:
target_file = pkg_version.pop(relative_path_str)
if target_file:
with file_path.open("rb", buffering=0) as f:
file_hash = hashlib.file_digest(f, "md5").hexdigest()
if file_hash == target_file["md5"]:
return
print(
f"Hash mismatch for {target_file['remoteName']} ({file_hash}; expected {target_file['md5']})"
)
target_files.append(file_path)
except KeyError:
# File not found in pkg_version
read_needed_files.append(file_path)
repair_executor.submit(verify, file)
repair_executor.shutdown(wait=True)
for file in read_needed_files:
try:
with file.open("rb", buffering=0) as f:
# We only need to read 4 bytes to see if the file is readable or not
f.read(4)
except Exception:
print(f"File '{file}' is corrupted.")
target_files.append(file)
# value not used for now
for key, _ in pkg_version.items():
target_file = game.path.joinpath(key)
if target_file.is_file():
continue
print(f"{key} not found.")
target_files.append(target_file)
if not target_files:
return
print("Begin repairing files...")
game.repair_files(target_files, game_info=game_info)

View File

@ -1,5 +1,5 @@
import requests
import concurrent
import concurrent.futures
from vollerei.utils import write_hosts
from vollerei.constants import TELEMETRY_HOSTS

View File

@ -1,3 +1,21 @@
class LAUNCHER_API:
"""Launcher API constants."""
RESOURCE_PATH: str = "hyp/hyp-connect/api/getGamePackages"
OS: dict = {
"url": "https://sg-hyp-api.hoyoverse.com/",
"params": {
"launcher_id": "VYTpXlbWo8",
},
}
CN: dict = {
"url": "https://hyp-api.mihoyo.com/",
"params": {
"launcher_id": "jGHBHlcOq1",
},
}
TELEMETRY_HOSTS = [
# Global
"log-upload-os.hoyoverse.com",

View File

@ -8,6 +8,42 @@ class GameError(VollereiError):
class GameNotInstalledError(GameError):
"""Exception raised when the game is not installed."""
"""Game is not installed."""
pass
class GameAlreadyUpdatedError(GameError):
"""Game is already updated."""
pass
class GameAlreadyInstalledError(GameError):
"""Game is already installed."""
pass
class RepairError(GameError):
"""Error occurred while repairing the game."""
pass
class ScatteredFilesNotAvailableError(RepairError):
"""Scattered files are not available."""
pass
class GameNotUpdatedError(GameError):
"""Game is not updated."""
pass
class PreDownloadNotAvailable(GameError):
"""Pre-download version is not available."""
pass

View File

@ -0,0 +1,86 @@
from vollerei.common.enums import GameChannel
from vollerei.abc.launcher.game import GameABC
from vollerei.exceptions.game import GameNotInstalledError
def get_channel(game: GameABC) -> GameChannel:
"""
Gets the current game channel.
Returns:
GameChannel: The current game channel.
"""
if game.channel_override:
return game.channel_override
if not game.is_installed():
raise GameNotInstalledError("Game path is not set.")
if game.path.joinpath("YuanShen.exe").is_file():
return GameChannel.China
return GameChannel.Overseas
def get_version(game: GameABC) -> tuple[int, int, int]:
"""
Gets the current installed game version.
Credits to An Anime Team for the code that does the magic:
https://github.com/an-anime-team/anime-game-core/blob/main/src/games/genshin/game.rs#L52
If the above method fails, it'll fallback to read the config.ini file
for the version, which is not recommended (as described in
`get_version_config()` docs)
This returns (0, 0, 0) if the version could not be found
(usually indicates the game is not installed), and in fact `is_installed()` uses
this method to check if the game is installed too.
Returns:
tuple[int, int, int]: The version as a tuple of integers.
"""
data_file = game.data_folder().joinpath("globalgamemanagers")
if not data_file.exists():
return game.get_version_config()
def bytes_to_int(byte_array: list[bytes]) -> int:
bytes_as_int = int.from_bytes(byte_array, byteorder="big")
actual_int = bytes_as_int - 48 # 48 is the ASCII code for 0
return actual_int
version_bytes: list[list[bytes]] = [[], [], []]
version_ptr = 0
correct = True
try:
with data_file.open("rb") as f:
f.seek(4000)
for byte in f.read(10000):
match byte:
case 0:
version_bytes = [[], [], []]
version_ptr = 0
correct = True
case 46:
version_ptr += 1
if version_ptr > 2:
correct = False
case 95:
if (
correct
and len(version_bytes[0]) > 0
and len(version_bytes[1]) > 0
and len(version_bytes[2]) > 0
):
return (
bytes_to_int(version_bytes[0]),
bytes_to_int(version_bytes[1]),
bytes_to_int(version_bytes[2]),
)
case _:
if correct and byte in b"0123456789":
version_bytes[version_ptr].append(byte)
else:
correct = False
except Exception:
pass
# Fallback to config.ini
return game.get_version_config()

View File

@ -0,0 +1,15 @@
MD5SUMS = {
"1.0.5": {
"cn": {
"StarRailBase.dll": "66c42871ce82456967d004ccb2d7cf77",
"UnityPlayer.dll": "0c866c44bb3752031a8c12ffe935b26f",
},
"os": {
"StarRailBase.dll": "8aa3790aafa3dd176678392f3f93f435",
"UnityPlayer.dll": "f17b9b7f9b8c9cbd211bdff7771a80c2",
},
}
}
# Patches
ASTRA_REPO = "https://notabug.org/mkrsym1/astra"
JADEITE_REPO = "https://codeberg.org/mkrsym1/jadeite/"

View File

@ -0,0 +1,98 @@
from hashlib import md5
from vollerei.common.enums import GameChannel
from vollerei.abc.launcher.game import GameABC
from vollerei.game.hsr.constants import MD5SUMS
def get_channel(game: GameABC) -> GameChannel | None:
"""
Gets the current game channel.
Only works for Star Rail version 1.0.5, other versions will return the
overridden channel or GameChannel.Overseas if no channel is overridden.
This is not needed for game patching, since the patcher will automatically
detect the channel.
Returns:
GameChannel: The current game channel.
"""
version = game.version_override or game.get_version()
if version == (1, 0, 5):
for channel, v in MD5SUMS["1.0.5"].values():
for file, md5sum in v.values():
if md5(game.path.joinpath(file).read_bytes()).hexdigest() != md5sum:
continue
match channel:
case "cn":
return GameChannel.China
case "os":
return GameChannel.Overseas
return None
def get_version(game: GameABC) -> tuple[int, int, int]:
"""
Gets the current installed game version.
Credits to An Anime Team for the code that does the magic:
https://github.com/an-anime-team/anime-game-core/blob/main/src/games/star_rail/game.rs#L49
If the above method fails, it'll fallback to read the config.ini file
for the version, which is not recommended (as described in
`get_version_config()` docs)
This returns (0, 0, 0) if the version could not be found
(usually indicates the game is not installed), and in fact `is_installed()` uses
this method to check if the game is installed too.
Returns:
tuple[int, int, int]: The version as a tuple of integers.
"""
data_file = game.data_folder().joinpath("data.unity3d")
if not data_file.exists():
return game.get_version_config()
def bytes_to_int(byte_array: list[bytes]) -> int:
bytes_as_int = int.from_bytes(byte_array, byteorder="big")
actual_int = bytes_as_int - 48 # 48 is the ASCII code for 0
return actual_int
version_bytes: list[list[bytes]] = [[], [], []]
version_ptr = 0
correct = True
try:
with data_file.open("rb") as f:
f.seek(0x7D0) # 2000 in decimal
for byte in f.read(10000):
match byte:
case 0:
version_bytes = [[], [], []]
version_ptr = 0
correct = True
case 46:
version_ptr += 1
if version_ptr > 2:
correct = False
case 38:
if (
correct
and len(version_bytes[0]) > 0
and len(version_bytes[1]) > 0
and len(version_bytes[2]) > 0
):
return (
bytes_to_int(version_bytes[0]),
bytes_to_int(version_bytes[1]),
bytes_to_int(version_bytes[2]),
)
case _:
if correct and byte in b"0123456789":
version_bytes[version_ptr].append(byte)
else:
correct = False
except Exception:
pass
# Fallback to config.ini
return game.get_version_config()

View File

@ -0,0 +1,5 @@
# Re-exports
from vollerei.game.launcher.manager import Game
__all__ = ["Game"]

View File

@ -0,0 +1,32 @@
from vollerei.common.api import get_game_packages, resource
from vollerei.common.enums import GameChannel, GameType
def get_game_package(
game_type: GameType, channel: GameChannel = GameChannel.Overseas
) -> resource.GameInfo:
"""
Get game package information from the launcher API.
Doesn't work with HI3 but well, we haven't implemented anything for that game yet.
Default channel is overseas.
Args:
channel: Game channel to get the resource information from.
Returns:
GameInfo: Game resource information.
"""
find_str: str
match game_type:
case GameType.HSR:
find_str = "hkrpg"
case GameType.Genshin:
find_str = "hk4e"
case GameType.ZZZ:
find_str = "nap"
game_packages = get_game_packages(channel=channel)
for package in game_packages:
if find_str in package.game.biz:
return package

View File

@ -0,0 +1,510 @@
from configparser import ConfigParser
from io import IOBase
from os import PathLike
from pathlib import Path, PurePath
from vollerei.abc.launcher.game import GameABC
from vollerei.common import ConfigFile, functions
from vollerei.common.api import resource
from vollerei.common.enums import GameType, VoicePackLanguage, GameChannel
from vollerei.exceptions.game import (
GameAlreadyUpdatedError,
GameNotInstalledError,
PreDownloadNotAvailable,
)
from vollerei.game.launcher import api
from vollerei.game.hsr import functions as hsr_functions
from vollerei.game.genshin import functions as genshin_functions
from vollerei.game.zzz import functions as zzz_functions
from vollerei import paths
from vollerei.utils import download
class Game(GameABC):
"""
Manages the game installation
For Star Rail and Zenless Zone Zero:
Since channel detection isn't implemented yet, most functions assume you're
using the overseas version of the game. You can override channel by setting
the property `channel_override` to the channel you want to use.
"""
def __init__(
self, game_type: GameType, path: PathLike = None, cache_path: PathLike = None
):
self._path: Path | None = Path(path) if path else None
if not cache_path:
cache_path = paths.cache_path
cache_path = Path(cache_path)
self._game_type = game_type
self.cache: Path = cache_path.joinpath(f"game/{self._game_type.name.lower()}/")
self.cache.mkdir(parents=True, exist_ok=True)
self._version_override: tuple[int, int, int] | None = None
self._channel_override: GameChannel | None = None
@property
def version_override(self) -> tuple[int, int, int] | None:
"""
Overrides the game version.
This can be useful if you want to override the version of the game
and additionally working around bugs.
"""
return self._version_override
@version_override.setter
def version_override(self, version: tuple[int, int, int] | str | None):
if isinstance(version, str):
version = tuple(int(i) for i in version.split("."))
self._version_override = version
@property
def channel_override(self) -> GameChannel | None:
"""
Overrides the game channel.
Because game channel detection isn't implemented yet, you may need
to use this for some functions to work.
This can be useful if you want to override the channel of the game
and additionally working around bugs.
"""
return self._channel_override
@channel_override.setter
def channel_override(self, channel: GameChannel | str | None):
if isinstance(channel, str):
channel = GameChannel[channel]
self._channel_override = channel
@property
def path(self) -> Path | None:
"""
Paths to the game folder.
"""
return self._path
@path.setter
def path(self, path: PathLike):
self._path = Path(path)
def data_folder(self) -> Path:
"""
Paths to the game data folder.
Returns:
Path: The path to the game data folder.
"""
try:
match self._game_type:
case GameType.Genshin:
match self.get_channel():
case GameChannel.China:
return self._path.joinpath("YuanShen_Data")
case GameChannel.Overseas:
return self._path.joinpath("GenshinImpact_Data")
case GameType.HSR:
return self._path.joinpath("StarRail_Data")
case GameType.ZZZ:
return self._path.joinpath("ZenlessZoneZero_Data")
except AttributeError:
raise GameNotInstalledError("Game path is not set.")
def is_installed(self) -> bool:
"""
Checks if the game is installed.
Returns:
bool: True if the game is installed, False otherwise.
"""
if self._path is None:
return False
match self._game_type:
case GameType.Genshin:
match self.get_channel():
case GameChannel.China:
if not self._path.joinpath("YuanShen.exe").exists():
return False
case GameChannel.Overseas:
if not self._path.joinpath("GenshinImpact.exe").exists():
return False
case GameType.HSR:
if not self._path.joinpath("StarRail.exe").exists():
return False
case GameType.ZZZ:
if not self._path.joinpath("ZenlessZoneZero.exe").exists():
return False
if not self.data_folder().is_dir():
return False
if self.get_version() == (0, 0, 0):
return False
return True
def get_channel(self) -> GameChannel:
"""
Gets the current game channel.
Only works for Genshin and Star Rail version 1.0.5, other versions will return
the overridden channel or GameChannel.Overseas if no channel is overridden.
This is not needed for game patching, since the patcher will automatically
detect the channel.
Returns:
GameChannel: The current game channel.
"""
match self._game_type:
case GameType.HSR:
return (
hsr_functions.get_channel(self)
or self._channel_override
or GameChannel.Overseas
)
case GameType.Genshin:
return (
genshin_functions.get_channel(self)
or self._channel_override
or GameChannel.Overseas
)
case _:
return self._channel_override or GameChannel.Overseas
def get_version_config(self) -> tuple[int, int, int]:
"""
Gets the current installed game version from config.ini.
Using this is not recommended, as only official launcher creates
and uses this file, instead you should use `get_version()`.
This returns (0, 0, 0) if the version could not be found.
Returns:
tuple[int, int, int]: Game version.
"""
cfg_file = self._path.joinpath("config.ini")
if not cfg_file.exists():
return (0, 0, 0)
cfg = ConfigFile(cfg_file)
# Fk u miHoYo
if "general" in cfg.sections():
version_str = cfg.get("general", "game_version", fallback="0.0.0")
elif "General" in cfg.sections():
version_str = cfg.get("General", "game_version", fallback="0.0.0")
else:
return (0, 0, 0)
if version_str.count(".") != 2:
return (0, 0, 0)
try:
version = tuple(int(i) for i in version_str.split("."))
except Exception:
return (0, 0, 0)
return version
def set_version_config(self):
"""
Sets the current installed game version to config.ini.
Only works for the global version of the game, not the Chinese one (since I don't have
them installed to test).
This method is meant to keep compatibility with the official launcher only.
"""
cfg_file = self._path.joinpath("config.ini")
if cfg_file.exists():
cfg = ConfigFile(cfg_file)
cfg.set("general", "game_version", self.get_version_str())
cfg.save()
else:
cfg_dict = {
"general": {
"channel": 1,
"cps": "hyp_hoyoverse",
"game_version": self.get_version_str(),
"sub_channel": 0,
# This probably should be fetched from the server but well
"plugin_n06mjyc2r3_version": "1.1.0",
"uapc": None, # Honestly what's this?
}
}
match self._game_type:
case GameType.Genshin:
cfg_dict["general"]["uapc"] = {
"hk4e_global": {"uapc": "f55586a8ce9f_"},
"hyp": {"uapc": "f55586a8ce9f_"},
}
case GameType.HSR:
cfg_dict["general"]["uapc"] = {
"hkrpg_global": {"uapc": "f5c7c6262812_"},
"hyp": {"uapc": "f55586a8ce9f_"},
}
case GameType.ZZZ:
cfg_dict["general"]["uapc"] = {
"nap_global": {"uapc": "f55586a8ce9f_"},
"hyp": {"uapc": "f55586a8ce9f_"},
}
cfg = ConfigParser()
cfg.read_dict(cfg_dict)
cfg.write(cfg_file.open("w"))
def get_version(self) -> tuple[int, int, int]:
"""
Gets the current installed game version.
Credits to An Anime Team for the code that does the magic, see the source
in `hsr/functions.py`, `genshin/functions.py` and `zzz/functions.py` for more info
If the above method fails, it'll fallback to read the config.ini file
for the version, which is not recommended (as described in
`get_version_config()` docs)
This returns (0, 0, 0) if the version could not be found
(usually indicates the game is not installed), and in fact `is_installed()` uses
this method to check if the game is installed too.
Returns:
tuple[int, int, int]: The version as a tuple of integers.
"""
match self._game_type:
case GameType.HSR:
return hsr_functions.get_version(self)
case GameType.Genshin:
return genshin_functions.get_version(self)
case GameType.ZZZ:
return zzz_functions.get_version(self)
case _:
return self.get_version_config()
def get_version_str(self) -> str:
"""
Gets the current installed game version as a string.
Because this method uses `get_version()`, you should read the docs of
that method too.
Returns:
str: The version as a string.
"""
return ".".join(str(i) for i in self.get_version())
def get_installed_voicepacks(self) -> list[VoicePackLanguage]:
"""
Gets the installed voicepacks.
Returns:
list[VoicePackLanguage]: A list of installed voicepacks.
"""
if not self.is_installed():
raise GameNotInstalledError("Game is not installed.")
voicepacks = []
blacklisted_words = ["SFX"]
audio_package: Path
match self._game_type:
case GameType.Genshin:
audio_package = self.data_folder().joinpath(
"StreamingAssets/AudioAssets/"
)
if audio_package.joinpath("AudioPackage").is_dir():
audio_package = audio_package.joinpath("AudioPackage")
case GameType.HSR:
audio_package = self.data_folder().joinpath(
"Persistent/Audio/AudioPackage/Windows/"
)
case GameType.ZZZ:
audio_package = self.data_folder().joinpath(
"StreamingAssets/Audio/Windows/Full/"
)
for child in audio_package.iterdir():
if child.resolve().is_dir() and child.name not in blacklisted_words:
name = child.name
if name.startswith("English"):
name = "English"
voicepack: VoicePackLanguage
try:
if self._game_type == GameType.ZZZ:
voicepack = VoicePackLanguage.from_zzz_name(child.name)
else:
voicepack = VoicePackLanguage[name]
voicepacks.append(voicepack)
except (ValueError, KeyError):
pass
return voicepacks
def get_remote_game(
self, pre_download: bool = False
) -> resource.Main | resource.PreDownload:
"""
Gets the current game information from remote.
Args:
pre_download (bool): Whether to get the pre-download version.
Defaults to False.
Returns:
A `Main` or `PreDownload` object that contains the game information.
"""
channel = self._channel_override or self.get_channel()
if pre_download:
game = api.get_game_package(
game_type=self._game_type, channel=channel
).pre_download
if not game:
raise PreDownloadNotAvailable("Pre-download version is not available.")
return game
return api.get_game_package(game_type=self._game_type, channel=channel).main
def get_update(self, pre_download: bool = False) -> resource.Patch | None:
"""
Gets the current game update.
Args:
pre_download (bool): Whether to get the pre-download version.
Defaults to False.
Returns:
A `Patch` object that contains the update information or
`None` if the game is not installed or already up-to-date.
"""
if not self.is_installed():
return None
version = (
".".join(str(x) for x in self._version_override)
if self._version_override
else self.get_version_str()
)
for patch in self.get_remote_game(pre_download=pre_download).patches:
if patch.version == version:
return patch
return None
def repair_file(
self,
file: PathLike,
pre_download: bool = False,
game_info: resource.Game = None,
) -> None:
"""
Repairs a game file.
This will automatically handle backup and restore the file if the repair
fails.
Args:
file (PathLike): The file to repair.
pre_download (bool): Whether to get the pre-download version.
Defaults to False.
"""
return self.repair_files([file], pre_download=pre_download, game_info=game_info)
def repair_files(
self,
files: list[PathLike],
pre_download: bool = False,
game_info: resource.Game = None,
) -> None:
"""
Repairs multiple game files.
This will automatically handle backup and restore the file if the repair
fails.
Args:
files (PathLike): The files to repair.
pre_download (bool): Whether to get the pre-download version.
Defaults to False.
game_info (resource.Game): The game information to use for repair.
"""
functions.repair_files(
self, files, pre_download=pre_download, game_info=game_info
)
def repair_game(self) -> None:
"""
Tries to repair the game by reading "pkg_version" file and downloading the
mismatched files from the server.
"""
functions.repair_game(self)
def install_archive(self, archive_file: PathLike | IOBase) -> None:
"""
Applies an install archive to the game, it can be the game itself or a
voicepack one.
`archive_file` can be a path to the archive file or a file-like object,
like if you have very high amount of RAM and want to download the archive
to memory instead of disk, this can be useful for you.
Args:
archive_file (PathLike | IOBase): The archive file.
"""
if not isinstance(archive_file, IOBase):
archive_file = Path(archive_file)
functions.install_archive(self, archive_file)
def apply_update_archive(
self, archive_file: PathLike | IOBase, auto_repair: bool = True
) -> None:
"""
Applies an update archive to the game, it can be the game update or a
voicepack update.
`archive_file` can be a path to the archive file or a file-like object,
like if you have very high amount of RAM and want to download the update
to memory instead of disk, this can be useful for you.
`auto_repair` is used to determine whether to repair the file if it's
broken. If it's set to False, then it'll raise an exception if the file
is broken.
Args:
archive_file (PathLike | IOBase): The archive file.
auto_repair (bool, optional): Whether to repair the file if it's broken.
Defaults to True.
"""
if not self.is_installed():
raise GameNotInstalledError("Game is not installed.")
if not isinstance(archive_file, IOBase):
archive_file = Path(archive_file)
# Hello hell again, dealing with HDiffPatch and all the things again.
functions.apply_update_archive(self, archive_file, auto_repair=auto_repair)
def install_update(
self, update_info: resource.Patch = None, auto_repair: bool = True
):
"""
Installs an update from a `Patch` object.
You may want to download the update manually and pass it to
`apply_update_archive()` instead for better control, and after that
execute `set_version_config()` to set the game version.
Args:
update_info (Diff, optional): The update information. Defaults to None.
auto_repair (bool, optional): Whether to repair the file if it's broken.
Defaults to True.
"""
if not self.is_installed():
raise GameNotInstalledError("Game is not installed.")
if not update_info:
update_info = self.get_update()
if not update_info or update_info.version == self.get_version_str():
raise GameAlreadyUpdatedError("Game is already updated.")
update_url = update_info.game_pkgs[0].url
# Base game update
archive_file = self.cache.joinpath(PurePath(update_url).name)
download(update_url, archive_file)
self.apply_update_archive(archive_file=archive_file, auto_repair=auto_repair)
# Get installed voicepacks
installed_voicepacks = self.get_installed_voicepacks()
# Voicepack update
for remote_voicepack in update_info.audio_pkgs:
if remote_voicepack.language not in installed_voicepacks:
continue
# Voicepack is installed, update it
archive_file = self.cache.joinpath(PurePath(remote_voicepack.url).name)
download(remote_voicepack.url, archive_file)
self.apply_update_archive(
archive_file=archive_file, auto_repair=auto_repair
)
self.set_version_config()

223
vollerei/game/patcher.py Normal file
View File

@ -0,0 +1,223 @@
from enum import Enum
from shutil import copy2, rmtree
from packaging import version
from vollerei.abc.patcher import PatcherABC
from vollerei.common import telemetry
from vollerei.exceptions.game import GameNotInstalledError
from vollerei.exceptions.patcher import (
VersionNotSupportedError,
PatcherError,
PatchUpdateError,
)
from vollerei.game.launcher.manager import Game, GameChannel
from vollerei.utils import download_and_extract, Git, Xdelta3
from vollerei.paths import tools_data_path
from vollerei.hsr.constants import ASTRA_REPO, JADEITE_REPO
class PatchType(Enum):
"""
Patch type
Astra: The old patch which patch the game directly (not recommended).
Jadeite: The new patch which patch the game in memory by DLL injection.
"""
Astra = 0
Jadeite = 1
class Patcher(PatcherABC):
"""
Patch helper for HSR and HI3.
By default this will use Jadeite as it is maintained and more stable.
"""
def __init__(self, patch_type: PatchType = PatchType.Jadeite):
self._patch_type: PatchType = patch_type
self._path = tools_data_path.joinpath("patcher")
self._path.mkdir(parents=True, exist_ok=True)
self._jadeite = self._path.joinpath("jadeite")
self._astra = self._path.joinpath("astra")
self._git = Git()
self._xdelta3 = Xdelta3()
@property
def patch_type(self) -> PatchType:
"""
Patch type, can be either Astra or Jadeite
"""
return self._patch_type
@patch_type.setter
def patch_type(self, value: PatchType):
self._patch_type = value
def _update_astra(self):
self._git.pull_or_clone(ASTRA_REPO, self._astra)
def _update_jadeite(self):
release_info = self._git.get_latest_release(JADEITE_REPO)
file = self._git.get_latest_release_dl(release_info)[0]
file_version = release_info["tag_name"][1:] # Remove "v" prefix
current_version = None
if self._jadeite.joinpath("version").exists():
with open(self._jadeite.joinpath("version"), "r") as f:
current_version = f.read()
if current_version:
if version.parse(file_version) <= version.parse(current_version):
return
download_and_extract(file, self._jadeite)
with open(self._jadeite.joinpath("version"), "w") as f:
f.write(file_version)
def update_patch(self):
"""
Update the patch
"""
try:
match self._patch_type:
case PatchType.Astra:
self._update_astra()
case PatchType.Jadeite:
self._update_jadeite()
except Exception as e:
raise PatchUpdateError("Failed to update patch.") from e
def _patch_astra(self, game: Game):
if game.get_version() != (1, 0, 5):
raise VersionNotSupportedError(
"Only version 1.0.5 is supported by Astra patch."
)
self._update_astra()
file_type = None
match game.get_channel():
case GameChannel.China:
file_type = "cn"
case GameChannel.Overseas:
file_type = "os"
# Backup and patch
for file in ["UnityPlayer.dll", "StarRailBase.dll"]:
game.path.joinpath(file).rename(game.path.joinpath(f"{file}.bak"))
self._xdelta3.patch_file(
self._astra.joinpath(f"{file_type}/diffs/{file}.vcdiff"),
game.path.joinpath(f"{file}.bak"),
game.path.joinpath(file),
)
# Copy files
for file in self._astra.joinpath(f"{file_type}/files/").rglob("*"):
if file.suffix == ".bat":
continue
if file.is_dir():
game.path.joinpath(
file.relative_to(self._astra.joinpath(f"{file_type}/files/"))
).mkdir(parents=True, exist_ok=True)
copy2(
file,
game.path.joinpath(
file.relative_to(self._astra.joinpath(f"{file_type}/files/"))
),
)
def _patch_jadeite(self):
"""
"Patch" the game with Jadeite patch.
Unlike Astra patch, Jadeite patch does not modify the game files directly
but uses DLLs to patch the game in memory and it has an injector to do that
automatically.
"""
self._update_jadeite()
return self._jadeite
def _unpatch_astra(self, game: Game):
if game.get_version() != (1, 0, 5):
raise VersionNotSupportedError(
"Only version 1.0.5 is supported by Astra patch."
)
self._update_astra()
file_type = None
match game.get_channel():
case GameChannel.China:
file_type = "cn"
case GameChannel.Overseas:
file_type = "os"
# Restore
for file in ["UnityPlayer.dll", "StarRailBase.dll"]:
if game.path.joinpath(f"{file}.bak").exists():
game.path.joinpath(file).unlink()
game.path.joinpath(f"{file}.bak").rename(game.path.joinpath(file))
# Remove files
for file in self._astra.joinpath(f"{file_type}/files/").rglob("*"):
if file.suffix == ".bat":
continue
file_rel = file.relative_to(self._astra.joinpath(f"{file_type}/files/"))
game_path = game.path.joinpath(file_rel)
if game_path.is_file():
game_path.unlink()
elif game_path.is_dir():
try:
game_path.rmdir()
except OSError:
pass
def _unpatch_jadeite(self):
rmtree(self._jadeite, ignore_errors=True)
def patch_game(self, game: Game):
"""
Patch the game
If you use Jadeite (by default), this will just download Jadeite files
and won't actually patch the game because Jadeite will do that automatically.
Args:
game (Game): The game to patch
"""
if not game.is_installed():
raise PatcherError(GameNotInstalledError("Game is not installed"))
match self._patch_type:
case PatchType.Astra:
self._patch_astra(game)
case PatchType.Jadeite:
return self._patch_jadeite()
def unpatch_game(self, game: Game):
"""
Unpatch the game
If you use Jadeite (by default), this will just delete Jadeite files.
Note that Honkai Impact 3rd uses Jadeite too, so executing this will
delete the files needed by both games.
Args:
game (Game): The game to unpatch
"""
if not game.is_installed():
raise PatcherError(GameNotInstalledError("Game is not installed"))
match self._patch_type:
case PatchType.Astra:
self._unpatch_astra(game)
case PatchType.Jadeite:
self._unpatch_jadeite()
def check_telemetry(self) -> list[str]:
"""
Check if telemetry servers are accessible by the user
Returns:
list[str]: A list of telemetry servers that are accessible
"""
return telemetry.check_telemetry()
def block_telemetry(self, telemetry_list: list[str] = None):
"""
Block the telemetry servers
If telemetry_list is not provided, it will be checked automatically.
Args:
telemetry_list (list[str], optional): A list of telemetry servers to block.
"""
telemetry.block_telemetry(telemetry_list)

View File

@ -0,0 +1,66 @@
from vollerei.abc.launcher.game import GameABC
def get_version(game: GameABC) -> tuple[int, int, int]:
"""
Gets the current installed game version.
Credits to An Anime Team for the code that does the magic:
https://github.com/an-anime-team/anime-game-core/blob/main/src/games/zzz/game.rs#L49
If the above method fails, it'll fallback to read the config.ini file
for the version, which is not recommended (as described in
`get_version_config()` docs)
This returns (0, 0, 0) if the version could not be found
(usually indicates the game is not installed), and in fact `is_installed()` uses
this method to check if the game is installed too.
Returns:
tuple[int, int, int]: The version as a tuple of integers.
"""
data_file = game.data_folder().joinpath("globalgamemanagers")
if not data_file.exists():
return game.get_version_config()
def bytes_to_int(byte_array: list[bytes]) -> int:
bytes_as_int = int.from_bytes(byte_array, byteorder="big")
actual_int = bytes_as_int - 48 # 48 is the ASCII code for 0
return actual_int
version_bytes: list[list[bytes]] = [[], [], []]
version_ptr = 0
correct = True
try:
with data_file.open("rb") as f:
f.seek(4000)
for byte in f.read(10000):
match byte:
case 0:
if (
correct
and len(version_bytes[0]) > 0
and len(version_bytes[1]) > 0
and len(version_bytes[2]) > 0
):
found_version = tuple(
bytes_to_int(i) for i in version_bytes
)
return found_version
version_bytes = [[], [], []]
version_ptr = 0
correct = True
case b".":
version_ptr += 1
if version_ptr > 2:
correct = False
case _:
if correct and byte in b"0123456789":
version_bytes[version_ptr].append(byte)
else:
correct = False
except Exception:
pass
# Fallback to config.ini
return game.get_version_config()

View File

@ -0,0 +1,5 @@
# Re-exports
from vollerei.genshin.launcher import Game
__all__ = ["Game"]

View File

@ -0,0 +1,5 @@
# Re-exports
from vollerei.genshin.launcher.game import Game
__all__ = ["Game"]

View File

@ -0,0 +1,20 @@
from vollerei.common.api import get_game_packages, resource
from vollerei.common.enums import GameChannel
def get_game_package(channel: GameChannel = GameChannel.Overseas) -> resource.GameInfo:
"""
Get game package information from the launcher API.
Default channel is overseas.
Args:
channel: Game channel to get the resource information from.
Returns:
Resource: Game resource information.
"""
game_packages = get_game_packages(channel=channel)
for package in game_packages:
if "hk4e" in package.game.biz:
return package

View File

@ -0,0 +1,12 @@
from os import PathLike
from vollerei.game.launcher.manager import Game as CommonGame
from vollerei.common.enums import GameType
class Game(CommonGame):
"""
Manages the game installation
"""
def __init__(self, path: PathLike = None, cache_path: PathLike = None):
super().__init__(GameType.Genshin, path, cache_path)

View File

@ -0,0 +1,73 @@
from configparser import ConfigParser
from io import IOBase
from os import PathLike
from pathlib import Path
from vollerei.abc.launcher.game import GameABC
from vollerei.common import ConfigFile, functions
from vollerei.common.api import resource
from vollerei.common.enums import VoicePackLanguage
from vollerei.exceptions.game import (
GameAlreadyUpdatedError,
GameNotInstalledError,
PreDownloadNotAvailable,
ScatteredFilesNotAvailableError,
)
from vollerei.hi3.launcher.enums import GameChannel
from vollerei.hsr.launcher import api
from vollerei import paths
from vollerei.utils import download
class Game(GameABC):
"""
Manages the game installation
Since channel detection isn't implemented yet, most functions assume you're
using the overseas version of the game. You can override channel by setting
the property `channel_override` to the channel you want to use.
"""
def __init__(self, path: PathLike = None, cache_path: PathLike = None):
self._path: Path | None = Path(path) if path else None
if not cache_path:
cache_path = paths.cache_path
cache_path = Path(cache_path)
self.cache: Path = cache_path.joinpath("game/hi3/")
self.cache.mkdir(parents=True, exist_ok=True)
self._version_override: tuple[int, int, int] | None = None
self._channel_override: GameChannel | None = None
@property
def version_override(self) -> tuple[int, int, int] | None:
"""
Overrides the game version.
This can be useful if you want to override the version of the game
and additionally working around bugs.
"""
return self._version_override
@version_override.setter
def version_override(self, version: tuple[int, int, int] | str | None):
if isinstance(version, str):
version = tuple(int(i) for i in version.split("."))
self._version_override = version
@property
def channel_override(self) -> GameChannel | None:
"""
Overrides the game channel.
Because game channel detection isn't implemented yet, you may need
to use this for some functions to work.
This can be useful if you want to override the channel of the game
and additionally working around bugs.
"""
return self._channel_override
@channel_override.setter
def channel_override(self, channel: GameChannel | str | None):
if isinstance(channel, str):
channel = GameChannel[channel]
self._channel_override = channel

6
vollerei/hi3/patcher.py Normal file
View File

@ -0,0 +1,6 @@
from vollerei.hsr.patcher import Patcher, PatchType
# Re-exports Patcher and PatchType from HSR because they use the same patcher
# which is Jadeite.
__all__ = ["Patcher", "PatchType"]

View File

@ -1,6 +1,6 @@
# Re-exports
from vollerei.hsr.patcher import Patcher, PatchType
from vollerei.hsr.launcher import Game, GameChannel
from vollerei.hsr.launcher import Game
__all__ = ["Patcher", "PatchType", "Game", "GameChannel"]
__all__ = ["Patcher", "PatchType", "Game"]

View File

@ -1,38 +1 @@
class LAUNCHER_API:
"""Launcher API constants."""
RESOURCE_PATH: str = "mdk/launcher/api/resource"
OS: dict = {
"url": "https://hkrpg-launcher-static.hoyoverse.com/hkrpg_global/",
"params": {
"channel_id": 1,
"key": "vplOVX8Vn7cwG8yb",
"launcher_id": 35,
},
}
CN: dict = {
"url": "https://api-launcher.mihoyo.com/hkrpg_cn/mdk/launcher/api/resource",
"params": {
"channel_id": 1,
"key": "6KcVuOkbcqjJomjZ",
"launcher_id": 33,
},
}
LATEST_VERSION = (1, 1, 0)
MD5SUMS = {
"1.0.5": {
"cn": {
"StarRailBase.dll": "66c42871ce82456967d004ccb2d7cf77",
"UnityPlayer.dll": "0c866c44bb3752031a8c12ffe935b26f",
},
"os": {
"StarRailBase.dll": "8aa3790aafa3dd176678392f3f93f435",
"UnityPlayer.dll": "f17b9b7f9b8c9cbd211bdff7771a80c2",
},
}
}
# Patches
ASTRA_REPO = "https://notabug.org/mkrsym1/astra"
JADEITE_REPO = "https://codeberg.org/mkrsym1/jadeite/"
from vollerei.game.hsr.constants import * # noqa: F403 because we just want to re-export

View File

@ -1,5 +1,5 @@
# Re-exports
from vollerei.hsr.launcher.game import Game, GameChannel
from vollerei.hsr.launcher.game import Game
__all__ = ["Game", "GameChannel"]
__all__ = ["Game"]

View File

@ -1,29 +1,20 @@
import requests
from vollerei.common.api import Resource
from vollerei.hsr.constants import LAUNCHER_API
from vollerei.hsr.launcher.enums import GameChannel
from vollerei.common.api import get_game_packages, resource
from vollerei.common.enums import GameChannel
def get_resource(channel: GameChannel = GameChannel.Overseas) -> Resource:
def get_game_package(channel: GameChannel = GameChannel.Overseas) -> resource.GameInfo:
"""
Get game resource information from the launcher API.
Get game package information from the launcher API.
Default channel is overseas.
Args:
channel: Game channel to get the resource information from. (os, cn)
channel: Game channel to get the resource information from.
Returns:
Resource: Game resource information.
"""
resource_path: dict
match channel:
case GameChannel.Overseas:
resource_path = LAUNCHER_API.OS
case GameChannel.China:
resource_path = LAUNCHER_API.CN
return Resource.from_dict(
requests.get(
resource_path["url"] + LAUNCHER_API.RESOURCE_PATH,
params=resource_path["params"],
).json()["data"]
)
game_packages = get_game_packages(channel=channel)
for package in game_packages:
if "hkrpg" in package.game.biz:
return package

View File

@ -1,6 +0,0 @@
from enum import Enum
class GameChannel(Enum):
Overseas = 0
China = 1

View File

@ -1,207 +1,16 @@
from hashlib import md5
from os import PathLike
from pathlib import Path
from vollerei.hsr.launcher.enums import GameChannel
from vollerei.common import ConfigFile
from vollerei.abc.launcher.game import GameABC
from vollerei.hsr.constants import MD5SUMS
from vollerei.game.launcher.manager import Game as CommonGame
from vollerei.common.enums import GameType
class Game(GameABC):
class Game(CommonGame):
"""
Manages the game installation
Since channel detection isn't implemented yet, most functions assume you're
using the overseas version of the game. You can override channel by setting
the property `channel_override` to the channel you want to use.
"""
def __init__(self, path: PathLike = None):
self._path: Path | None = Path(path) if path else None
self._version_override: tuple[int, int, int] | None = None
self._channel_override: GameChannel | None = None
@property
def version_override(self) -> tuple[int, int, int] | None:
"""
Override the game version.
This can be useful if you want to override the version of the game
and additionally working around bugs.
"""
return self._version_override
@version_override.setter
def version_override(self, version: tuple[int, int, int] | str | None):
if isinstance(version, str):
version = tuple(int(i) for i in version.split("."))
self._version_override = version
@property
def channel_override(self) -> GameChannel | None:
"""
Override the game channel.
Because game channel detection isn't implemented yet, you may need
to use this for some functions to work.
This can be useful if you want to override the channel of the game
and additionally working around bugs.
"""
return self._channel_override
@channel_override.setter
def channel_override(self, channel: GameChannel | str | None):
if isinstance(channel, str):
channel = GameChannel[channel]
self._channel_override = channel
@property
def path(self) -> Path | None:
"""
Path to the game folder.
"""
return self._path
@path.setter
def path(self, path: PathLike):
self._path = Path(path)
def data_folder(self) -> Path:
"""
Path to the game data folder.
"""
return self._path.joinpath("StarRail_Data")
def is_installed(self) -> bool:
"""
Check if the game is installed.
"""
if self._path is None:
return False
if (
not self._path.joinpath("StarRail.exe").exists()
or not self._path.joinpath("StarRailBase.dll").exists()
or not self._path.joinpath("StarRail_Data").exists()
):
return False
if self.get_version() == (0, 0, 0):
return False
return True
def _get_version_config(self) -> tuple[int, int, int]:
cfg_file = self._path.joinpath("config.ini")
if not cfg_file.exists():
return (0, 0, 0)
cfg = ConfigFile(cfg_file)
if "General" not in cfg.sections():
return (0, 0, 0)
if "game_version" not in cfg["General"]:
return (0, 0, 0)
version_str = cfg["General"]["game_version"]
if version_str.count(".") != 2:
return (0, 0, 0)
try:
version = tuple(int(i) for i in version_str.split("."))
except Exception:
return (0, 0, 0)
return version
def get_version(self) -> tuple[int, int, int]:
"""
Get the current installed game version.
Credits to An Anime Team for the code that does the magic:
https://github.com/an-anime-team/anime-game-core/blob/main/src/games/star_rail/game.rs#L49
If the above method fails, it'll fallback to read the config.ini file
for the version. (Doesn't work with AAGL-based launchers)
This returns (0, 0, 0) if the version could not be found
(usually indicates the game is not installed)
Returns:
tuple[int, int, int]: The version as a tuple of integers.
"""
data_file = self.data_folder().joinpath("data.unity3d")
if not data_file.exists():
return (0, 0, 0)
def bytes_to_int(byte_array: list[bytes]) -> int:
bytes_as_int = int.from_bytes(byte_array, byteorder="big")
actual_int = bytes_as_int - 48 # 48 is the ASCII code for 0
return actual_int
allowed = [48, 49, 50, 51, 52, 53, 54, 55, 56, 57]
version_bytes: list[list[bytes]] = [[], [], []]
version_ptr = 0
correct = True
try:
with self.data_folder().joinpath("data.unity3d").open("rb") as f:
f.seek(0x7D0) # 2000 in decimal
for byte in f.read(10000):
match byte:
case 0:
version_bytes = [[], [], []]
version_ptr = 0
correct = True
case 46:
version_ptr += 1
if version_ptr > 2:
correct = False
case 38:
if (
correct
and len(version_bytes[0]) > 0
and len(version_bytes[1]) > 0
and len(version_bytes[2]) > 0
):
return (
bytes_to_int(version_bytes[0]),
bytes_to_int(version_bytes[1]),
bytes_to_int(version_bytes[2]),
)
case _:
if correct and byte in allowed:
version_bytes[version_ptr].append(byte)
else:
correct = False
except Exception:
pass
# Fallback to config.ini
return self._get_version_config()
def get_version_str(self) -> str:
"""
Same as get_version, but returns a string instead.
Returns:
str: The version as a string.
"""
return ".".join(str(i) for i in self.get_version())
def get_channel(self) -> GameChannel:
"""
Get the current game channel.
Only works for Star Rail version 1.0.5, other versions will return None
This is not needed for game patching, since the patcher will automatically
detect the channel.
Returns:
GameChannel: The current game channel.
"""
version = self._version_override or self.get_version()
if version == (1, 0, 5):
for channel, v in MD5SUMS["1.0.5"].values():
for file, md5sum in v.values():
if (
md5(self._path.joinpath(file).read_bytes()).hexdigest()
!= md5sum
):
continue
match channel:
case "cn":
return GameChannel.China
case "os":
return GameChannel.Overseas
else:
return
def __init__(self, path: PathLike = None, cache_path: PathLike = None):
super().__init__(GameType.HSR, path, cache_path)

View File

@ -1,15 +1,16 @@
from enum import Enum
from shutil import copy2, rmtree
from distutils.version import StrictVersion
from packaging import version
from vollerei.abc.patcher import PatcherABC
from vollerei.common import telemetry
from vollerei.common.enums import GameChannel
from vollerei.exceptions.game import GameNotInstalledError
from vollerei.exceptions.patcher import (
VersionNotSupportedError,
PatcherError,
PatchUpdateError,
)
from vollerei.hsr.launcher.game import Game, GameChannel
from vollerei.hsr.launcher.game import Game
from vollerei.utils import download_and_extract, Git, Xdelta3
from vollerei.paths import tools_data_path
from vollerei.hsr.constants import ASTRA_REPO, JADEITE_REPO
@ -23,8 +24,8 @@ class PatchType(Enum):
Jadeite: The new patch which patch the game in memory by DLL injection.
"""
Astra: int = 0
Jadeite: int = 1
Astra = 0
Jadeite = 1
class Patcher(PatcherABC):
@ -66,7 +67,7 @@ class Patcher(PatcherABC):
with open(self._jadeite.joinpath("version"), "r") as f:
current_version = f.read()
if current_version:
if StrictVersion(file_version) <= StrictVersion(current_version):
if version.parse(file_version) <= version.parse(current_version):
return
download_and_extract(file, self._jadeite)
with open(self._jadeite.joinpath("version"), "w") as f:
@ -188,6 +189,8 @@ class Patcher(PatcherABC):
Unpatch the game
If you use Jadeite (by default), this will just delete Jadeite files.
Note that Honkai Impact 3rd uses Jadeite too, so executing this will
delete the files needed by both games.
Args:
game (Game): The game to unpatch

View File

@ -3,21 +3,48 @@ from os import PathLike
from platformdirs import PlatformDirs
base_paths = PlatformDirs("vollerei", "tretrauit", roaming=True)
cache_path = base_paths.site_cache_path
data_path = base_paths.site_data_path
tools_data_path = data_path.joinpath("tools")
tools_cache_path = cache_path.joinpath("tools")
launcher_cache_path = cache_path.joinpath("launcher")
utils_cache_path = cache_path.joinpath("utils")
class Paths:
"""
Manages the paths
"""
def change_base_path(path: PathLike):
path = Path(path)
global base_paths, tools_data_path, tools_cache_path, launcher_cache_path, utils_cache_path, cache_path, data_path
cache_path = path.joinpath("cache")
data_path = path.joinpath("data")
base_paths = PlatformDirs("vollerei", "tretrauit", roaming=True)
cache_path = base_paths.user_cache_path
data_path = base_paths.user_data_path
tools_data_path = data_path.joinpath("tools")
tools_cache_path = cache_path.joinpath("tools")
launcher_cache_path = cache_path.joinpath("launcher")
utils_cache_path = cache_path.joinpath("utils")
@staticmethod
def set_base_path(path: PathLike):
path = Path(path)
Paths.base_paths = path
Paths.cache_path = Paths.base_paths.joinpath("cache")
Paths.data_path = Paths.base_paths
Paths.tools_data_path = Paths.data_path.joinpath("tools")
Paths.tools_cache_path = Paths.cache_path.joinpath("tools")
Paths.launcher_cache_path = Paths.cache_path.joinpath("launcher")
Paths.utils_cache_path = Paths.cache_path.joinpath("utils")
# Aliases
base_paths = Paths.base_paths
cache_path = Paths.cache_path
data_path = Paths.data_path
tools_data_path = Paths.tools_data_path
tools_cache_path = Paths.tools_cache_path
launcher_cache_path = Paths.launcher_cache_path
utils_cache_path = Paths.utils_cache_path
def set_base_path(path: PathLike):
Paths.set_base_path(path)
global base_paths, cache_path, data_path, tools_data_path, tools_cache_path, launcher_cache_path, utils_cache_path
base_paths = Paths.base_paths
cache_path = Paths.cache_path
data_path = Paths.data_path
tools_data_path = Paths.tools_data_path
tools_cache_path = Paths.tools_cache_path
launcher_cache_path = Paths.launcher_cache_path
utils_cache_path = Paths.utils_cache_path

View File

@ -1,5 +1,6 @@
import requests
import platform
import shutil
from zipfile import ZipFile
from io import BytesIO
from pathlib import Path
@ -10,20 +11,92 @@ match platform.system():
case _:
def append_text(text: str, path: Path) -> None:
raise NotImplementedError(
"append_text is not implemented for this platform"
)
# Fallback to our own implementation
# Will NOT work if we don't have permission to write to the file
try:
with path.open("a") as f:
f.write(text)
except FileNotFoundError:
with path.open(path, "w") as f:
f.write(text)
except (PermissionError, OSError) as e:
raise PermissionError(
"You don't have permission to write to the file."
) from e
# Re-exports
from vollerei.utils.git import Git
from vollerei.utils.xdelta3 import Xdelta3
from vollerei.utils.xdelta3 import Xdelta3, Xdelta3NotInstalledError, Xdelta3PatchError
from vollerei.utils.xdelta3.exceptions import Xdelta3Error
from vollerei.utils.hdiffpatch import (
HDiffPatch,
HPatchZPatchError,
NotInstalledError,
PlatformNotSupportedError as HPatchZPlatformNotSupportedError,
)
__all__ = ["Git", "Xdelta3", "download_and_extract"]
__all__ = [
"Git",
"Xdelta3",
"download_and_extract",
"HDiffPatch",
"write_hosts",
"append_text_to_file",
"Xdelta3Error",
"Xdelta3NotInstalledError",
"Xdelta3PatchError",
"HPatchZPatchError",
"NotInstalledError",
"HPatchZPlatformNotSupportedError",
]
def download(
url: str,
out: Path,
file_len: int = None,
overwrite: bool = False,
stream: bool = True,
) -> None:
"""
Download to a path.
Args:
url (str): URL to download from.
path (Path): Path to download to.
"""
if overwrite:
out.unlink(missing_ok=True)
headers = {}
mode = "a+b"
if out.exists():
cur_len = (out.stat()).st_size
headers |= {"Range": f"bytes={cur_len}-{file_len if file_len else ''}"}
else:
mode = "w+b"
out.parent.mkdir(parents=True, exist_ok=True)
out.touch()
# Streaming, so we can iterate over the response.
response = requests.get(url=url, headers=headers, stream=stream)
if response.status_code == 416:
print(f"File already downloaded: {out}")
return
response.raise_for_status()
with open(out, mode) as file:
shutil.copyfileobj(response.raw, file)
return True
def download_and_extract(url: str, path: Path) -> None:
"""
Download and extract a zip file to a path.
Args:
url (str): URL to download from.
path (Path): Path to extract to.
"""
rsp = requests.get(url, stream=True)
rsp.raise_for_status()
with BytesIO() as f:

View File

@ -1,3 +1,4 @@
from os import PathLike
import platform
import subprocess
from zipfile import ZipFile
@ -6,59 +7,95 @@ from io import BytesIO
from shutil import which
from vollerei.constants import HDIFFPATCH_GIT_URL
from vollerei.paths import tools_data_path
from vollerei.utils.hdiffpatch.exceptions import (
HPatchZPatchError,
NotInstalledError,
PlatformNotSupportedError,
)
class HDiffPatch:
"""
Quick wrapper around HDiffPatch binaries
Mostly copied from worthless-launcher
"""
def __init__(self):
self._data = tools_data_path.joinpath("hdiffpatch")
self._data.mkdir(parents=True, exist_ok=True)
self._hdiff = tools_data_path.joinpath("hdiffpatch")
self._hdiff.mkdir(parents=True, exist_ok=True)
@staticmethod
def _get_platform_arch():
processor = platform.machine()
match platform.system():
case "Windows":
match platform.architecture()[0]:
case "32bit":
match processor:
case "i386":
return "windows32"
case "64bit":
case "x86_64":
return "windows64"
case "AMD64":
return "windows64"
case "arm":
return "windows_arm32"
case "arm64":
return "windows_arm64"
case "Linux":
match platform.architecture()[0]:
case "32bit":
match processor:
case "i386":
return "linux32"
case "64bit":
case "x86_64":
return "linux64"
case "arm":
return "linux_arm32"
case "arm64":
return "linux_arm64"
case "Darwin":
return "macos"
# TODO: Add support for Android & other architectures
# Rip BSD they need to use Linux compatibility layer to run this
# (or use Wine if they prefer that)
raise RuntimeError("Only Windows, Linux and macOS are supported by HDiffPatch")
raise PlatformNotSupportedError(
"Only Windows, Linux and macOS are supported by HDiffPatch"
)
def _get_exec(self, exec_name) -> str | None:
def _get_binary(self, exec_name: str, recurse=None) -> str:
if which(exec_name):
return exec_name
if not self.data_path.exists():
return None
if not any(self.data_path.iterdir()):
return None
platform_arch_path = self.data_path.joinpath(self._get_platform_arch())
file = platform_arch_path.joinpath(exec_name)
if file.exists():
file.chmod(0o755)
return str(file)
if platform.system() == "Windows" and not exec_name.endswith(".exe"):
exec_name += ".exe"
if self._hdiff.exists() and any(self._hdiff.iterdir()):
file = self._hdiff.joinpath(self._get_platform_arch(), exec_name)
if file.exists():
if platform.system() != "Windows":
file.chmod(0o755)
return str(file)
if recurse is None:
recurse = 3
elif recurse == 0:
raise NotInstalledError(
"HDiffPatch is not installed and can't be automatically installed"
)
else:
recurse -= 1
self.download()
return self._get_binary(exec_name=exec_name, recurse=recurse)
def hpatchz(self) -> str | None:
hpatchz_name = "hpatchz" + (".exe" if platform.system() == "Windows" else "")
return self._get_exec(hpatchz_name)
return self._get_binary("hpatchz")
def patch_file(self, in_file, out_file, patch_file):
hpatchz = self.hpatchz()
if not hpatchz:
raise RuntimeError("hpatchz executable not found")
subprocess.check_call([hpatchz, "-f", in_file, patch_file, out_file])
def patch_file(self, in_file: PathLike, out_file: PathLike, patch_file: PathLike):
try:
subprocess.check_call(
[self.hpatchz(), "-f", str(in_file), str(patch_file), str(out_file)]
)
except subprocess.CalledProcessError as e:
raise HPatchZPatchError("Patch error") from e
async def _get_latest_release_info(self) -> dict:
def _get_latest_release_info(self) -> dict:
split = HDIFFPATCH_GIT_URL.split("/")
repo = split[-1]
owner = split[-2]
@ -67,38 +104,32 @@ class HDiffPatch:
params={"Headers": "Accept: application/vnd.github.v3+json"},
)
rsp.raise_for_status()
for asset in (await rsp.json())["assets"]:
archive_processor = self._get_platform_arch()
for asset in rsp.json()["assets"]:
if not asset["name"].endswith(".zip"):
continue
if "linux" in asset["name"]:
continue
if "windows" in asset["name"]:
continue
if "macos" in asset["name"]:
continue
if "android" in asset["name"]:
if archive_processor not in asset["name"]:
continue
return asset
async def get_latest_release_url(self):
asset = await self._get_latest_release_info()
def get_latest_release_url(self):
asset = self._get_latest_release_info()
return asset["browser_download_url"]
async def get_latest_release_name(self):
asset = await self._get_latest_release_info()
def get_latest_release_name(self):
asset = self._get_latest_release_info()
return asset["name"]
async def download(self):
def download(self):
"""
Download the latest release of HDiffPatch.
"""
url = await self.get_latest_release_url()
url = self.get_latest_release_url()
if not url:
raise RuntimeError("Unable to find latest release")
file = BytesIO()
with requests.get(url, stream=True) as r:
with open(file, "wb") as f:
for chunk in r.iter_content(chunk_size=32768):
f.write(chunk)
for chunk in r.iter_content(chunk_size=32768):
file.write(chunk)
with ZipFile(file) as z:
z.extractall(self._data)
z.extractall(self._hdiff)

View File

@ -10,13 +10,19 @@ class HPatchZError(HDiffPatchError):
pass
class HPatchZPatchError(HPatchZError):
"""Raised when hpatchz patch fails"""
pass
class NotInstalledError(HPatchZError):
"""Raised when HDiffPatch is not installed"""
pass
class HPatchZPatchError(HPatchZError):
"""Raised when hpatchz patch fails"""
class PlatformNotSupportedError(HPatchZError):
"""Raised when HDiffPatch is not available for your platform"""
pass

View File

@ -29,11 +29,11 @@ def write_text(text, path: str | Path):
"""Write text to a file using pkexec (friendly gui)"""
if isinstance(path, Path):
path = str(path)
exec_su(f'pkexec tee "{path}"', stdin=text)
exec_su(f'tee "{path}"', stdin=text)
def append_text(text, path: str | Path):
"""Append text to a file using pkexec (friendly gui)"""
if isinstance(path, Path):
path = str(path)
exec_su(f'pkexec tee -a "{path}"', stdin=text)
exec_su(f'tee -a "{path}"', stdin=text)

View File

@ -5,7 +5,7 @@ from os import PathLike
from io import BytesIO
from zipfile import ZipFile
from shutil import which
from vollerei.paths import tools_cache_path
from vollerei.paths import tools_data_path
from vollerei.utils.xdelta3.exceptions import (
Xdelta3NotInstalledError,
Xdelta3PatchError,
@ -18,7 +18,7 @@ class Xdelta3:
"""
def __init__(self) -> None:
self._xdelta3_path = tools_cache_path.joinpath("xdelta3")
self._xdelta3_path = tools_data_path.joinpath("xdelta3")
self._xdelta3_path.mkdir(parents=True, exist_ok=True)
def _get_binary(self, recurse=None) -> str:
@ -26,7 +26,7 @@ class Xdelta3:
return "xdelta3"
if platform.system() == "Windows":
for path in self._xdelta3_path.glob("*.exe"):
return path
return str(path)
if recurse is None:
recurse = 3
elif recurse == 0:
@ -36,7 +36,7 @@ class Xdelta3:
else:
recurse -= 1
self.download()
return self.get_binary(recurse=recurse)
return self._get_binary(recurse=recurse)
raise Xdelta3NotInstalledError("xdelta3 is not installed")
def get_binary(self) -> str:
@ -63,9 +63,8 @@ class Xdelta3:
url = "https://github.com/jmacd/xdelta-gpl/releases/download/v3.1.0/xdelta3-3.1.0-i686.exe.zip"
file = BytesIO()
with requests.get(url, stream=True) as r:
with open(file, "wb") as f:
for chunk in r.iter_content(chunk_size=32768):
f.write(chunk)
for chunk in r.iter_content(chunk_size=32768):
file.write(chunk)
with ZipFile(file) as z:
z.extractall(self._xdelta3_path)

5
vollerei/zzz/__init__.py Normal file
View File

@ -0,0 +1,5 @@
# Re-exports
from vollerei.zzz.launcher import Game
__all__ = ["Game"]

View File

@ -0,0 +1,5 @@
# Re-exports
from vollerei.zzz.launcher.game import Game
__all__ = ["Game"]

View File

@ -0,0 +1,20 @@
from vollerei.common.api import get_game_packages, resource
from vollerei.common.enums import GameChannel
def get_game_package(channel: GameChannel = GameChannel.Overseas) -> resource.GameInfo:
"""
Get game package information from the launcher API.
Default channel is overseas.
Args:
channel: Game channel to get the resource information from.
Returns:
Resource: Game resource information.
"""
game_packages = get_game_packages(channel=channel)
for package in game_packages:
if "nap" in package.game.biz:
return package

View File

@ -0,0 +1,16 @@
from os import PathLike
from vollerei.game.launcher.manager import Game as CommonGame
from vollerei.common.enums import GameType
class Game(CommonGame):
"""
Manages the game installation
Since channel detection isn't implemented yet, most functions assume you're
using the overseas version of the game. You can override channel by setting
the property `channel_override` to the channel you want to use.
"""
def __init__(self, path: PathLike = None, cache_path: PathLike = None):
super().__init__(GameType.ZZZ, path, cache_path)

View File