How to use this gateway to read and write files with HTTP only? #101

Closed
opened 2025-12-28 18:00:09 +00:00 by sami · 9 comments
Owner

Originally created by @Hecate2 on GitHub (Mar 4, 2025).

Problem

How to use this gateway to read and write files with HTTP only?

Context

# main.py
import os
import json
import httpx
from base64 import b64decode, b64encode
from hashlib import sha256
from crypto_utils import wif_to_private_key, private_key_to_neo3_public_key_and_address, sign_message, verify_message_signature
from utils import num2VarInt

PRIVATE_KEY_WIF = "KzSDwz8tWQhAbuCdQZGj5vSwPy2XghjSppUPLpqnAeLNvjxQW6cf"
private_key = wif_to_private_key(PRIVATE_KEY_WIF)
# public key 02878528d4e2e39cedf20d9dbc9e5a031afc60cb9c474348ec893834c7921fb0b9
# address Nb2CHYY5wTh2ac58mTue5S3wpG6bQv5hSY
public_key, address = private_key_to_neo3_public_key_and_address(private_key)

BASE_URL = 'https://rest.t5.fs.neo.org/v1/'  # for test only
httpx_client = httpx.Client(base_url=BASE_URL, headers={"Content-Type": "application/json"})

my_containers = httpx_client.get(f'containers?ownerId={address}').json()
print(f'My containers: {my_containers}')

auth_header = {
    'X-Bearer-Owner-Id': address,
    'X-Bearer-Lifetime': "10000",
    'X-Bearer-For-All-Users': "false",
}
auth_content = [{"name":"my-bearer-token","object":[{"action":"ALLOW","filters":[],"operation":"GET","targets":[{"keys":[],"role":"OTHERS"}]}]},{"container":{"verb":"PUT"},"name":"my token to create container"}]
auth_resp: list[dict[str, str|bytes]] = httpx_client.post('auth', headers=auth_header, content=json.dumps(auth_content)).json()
for t in auth_resp:
    t["decoded_token"]: bytes = b64decode(t["token"])
for t in auth_resp:
    t["signed_token"] = sign_message(private_key, t["decoded_token"]).hex()
for t in auth_resp:
    assert verify_message_signature(public_key, t["decoded_token"], bytes.fromhex(t["signed_token"]))
print(auth_resp)

# https://github.com/nspcc-dev/panel-fs-neo-org/blob/30598676da830e8eaff7cc34279d61fa5b939587/src/App.js#L350-L356
# https://github.com/NeoNEXT/neoline/blob/4667e690147352eafb6812301492b7767bc96132/src/app/popup/notification/neo3-signature/neo3-signature.component.ts#L81
msg: str = auth_resp[0]['token']
random_salt = os.urandom(16).hex()  # suitable for cryptography
parameter_hex_string = (random_salt + msg).encode().hex()
assert len(parameter_hex_string) % 2 == 0
length_hex = num2VarInt(len(parameter_hex_string) // 2)
concatenated_string = length_hex + parameter_hex_string
serialized_transaction = '010001f0' + concatenated_string + '0000'
signature = sign_message(private_key, serialized_transaction)
assert verify_message_signature(public_key, serialized_transaction, signature)
signature_hex_str: str = signature.hex()

httpx_client.headers.update({"Authorization": f"Bearer {auth_resp[0]['token']}"})
bearer_header = {
    'X-Bearer-Owner-Id': address,
    'X-Bearer-Signature': signature_hex_str + random_salt,
    'X-Bearer-Signature-Key': public_key,
}
create_container_resp = httpx_client.put('containers?walletConnect=true&name-scope-global=true', headers=bearer_header)
print(create_container_resp)
# crypto_utils.py
import os
import binascii
import json
import base58
from hashlib import scrypt, sha256
from base58 import b58decode_check, b58encode, b58encode_check
from Crypto.Cipher import AES  # pip install pycryptodome
from Crypto.PublicKey import ECC
from Crypto.Hash import SHA256
from Crypto.Signature import DSS
from Crypto.Hash import RIPEMD160
import ecdsa


def wif_to_private_key(wif: str) -> bytes:
    # Step 1: Decode the WIF key using Base58
    decoded = base58.b58decode(wif)

    # Step 2: Verify the length of the decoded data
    if len(decoded) != 38:
        raise ValueError("Invalid WIF length")

    # The first byte is the version, the last 4 bytes are the checksum
    # version_byte = decoded[0]  # This is usually 0x80 for Bitcoin mainnet
    private_key_bytes = decoded[1:-5]  # Extract the private key part
    checksum = decoded[-4:]  # Extract the checksum part

    # Step 3: Calculate the expected checksum
    hash1 = sha256(decoded[:-4]).digest()
    hash2 = sha256(hash1).digest()
    expected_checksum = hash2[:4]

    # Check if the provided checksum matches the calculated checksum
    if checksum != expected_checksum:
        raise ValueError("Invalid WIF checksum")

    return private_key_bytes

def hex_to_private_key(hex: str) -> bytes:
    hex.removeprefix('0x')
    return binascii.unhexlify(hex)

def private_key_to_neo3_public_key_and_address(private_key: bytes) -> (str, str):
    public_key = ECC.construct(curve='secp256r1', d=int.from_bytes(private_key, 'big')).pointQ
    x = public_key.x.to_bytes(32, 'big')
    prefix = b'\x02' if public_key.y % 2 == 0 else b'\x03'
    compressed_public_key = prefix + x
    verification_script = b'\x0c\x21' + compressed_public_key + b'\x41\x56\xe7\xb3\x27'
    ripemd160 = RIPEMD160.new()
    ripemd160.update(sha256(verification_script).digest())
    script_hash = ripemd160.digest()
    address = b58encode_check(b'\x35' + script_hash).decode('utf-8')
    return compressed_public_key.hex(), address

def sign_message(private_key: bytes, message: str | bytes) -> bytes:
    assert len(private_key) == 32
    if type(message) is str:
        message: bytes = bytes.fromhex(message)
    pk = ecdsa.SigningKey.from_string(private_key, curve=ecdsa.NIST256p, hashfunc=sha256)
    signature: bytes = pk.sign_deterministic(message)
    return signature

def verify_message_signature(public_key: str | bytes, message: str | bytes, signature: bytes) -> bool:
    if type(public_key) is str:
        public_key: bytes = bytes.fromhex(public_key)
    assert len(public_key) == 33
    assert public_key.startswith(b'\x02') or public_key.startswith(b'\x03')
    if type(message) is str:
        message: bytes = bytes.fromhex(message)
    public_key: ecdsa.VerifyingKey = ecdsa.VerifyingKey.from_string(public_key, curve=ecdsa.NIST256p, hashfunc=sha256)
    result = public_key.verify(signature, message)
    return result
# utils.py
# https://github.com/CityOfZion/neon-js/blob/master/packages/neon-core/src/u/convert.ts

def num2VarInt(num: int) -> str:
    if num < 0xfd:
        return num2hexstring(num)
    elif num <= 0xffff:
        # uint16
        return "fd" + num2hexstring(num, 2, True)
    elif num <= 0xffffffff:
        # uint32
        return "fe" + num2hexstring(num, 4, True)
    else:
        # uint64
        return "ff" + num2hexstring(num, 8, True)


def num2hexstring(num: int, size: int = 1, little_endian=False) -> str:
    if not isinstance(num, int):
        raise TypeError(f"num2hexstring expected a number but got {type(num)} instead.")
    if num < 0:
        raise ValueError(f"num2hexstring expected a positive integer but got {num} instead.")
    if size % 1 != 0:
        raise ValueError(f"num2hexstring expected a positive integer but got {num} instead.")
    if num > 2 ** 53 - 1:
        raise ValueError(f"num2hexstring expected a safe integer but got {num} instead.")
    
    size *= 2
    hexstring = hex(num)[2:]
    hexstring = hexstring.zfill(size)
    
    if little_endian:
        hexstring = reverse_hex(hexstring)
    return hexstring

def reverse_hex(hexstring: str) -> str:
    return ''.join(reversed([hexstring[i:i + 2] for i in range(0, len(hexstring), 2)]))

requirements:
base58==2.1.1
ecdsa==0.19.0
httpx==0.28.1
pycryptodome==3.21.0

Originally created by @Hecate2 on GitHub (Mar 4, 2025). ## Problem How to use this gateway to read and write files with HTTP only? ## Context ```python # main.py import os import json import httpx from base64 import b64decode, b64encode from hashlib import sha256 from crypto_utils import wif_to_private_key, private_key_to_neo3_public_key_and_address, sign_message, verify_message_signature from utils import num2VarInt PRIVATE_KEY_WIF = "KzSDwz8tWQhAbuCdQZGj5vSwPy2XghjSppUPLpqnAeLNvjxQW6cf" private_key = wif_to_private_key(PRIVATE_KEY_WIF) # public key 02878528d4e2e39cedf20d9dbc9e5a031afc60cb9c474348ec893834c7921fb0b9 # address Nb2CHYY5wTh2ac58mTue5S3wpG6bQv5hSY public_key, address = private_key_to_neo3_public_key_and_address(private_key) BASE_URL = 'https://rest.t5.fs.neo.org/v1/' # for test only httpx_client = httpx.Client(base_url=BASE_URL, headers={"Content-Type": "application/json"}) my_containers = httpx_client.get(f'containers?ownerId={address}').json() print(f'My containers: {my_containers}') auth_header = { 'X-Bearer-Owner-Id': address, 'X-Bearer-Lifetime': "10000", 'X-Bearer-For-All-Users': "false", } auth_content = [{"name":"my-bearer-token","object":[{"action":"ALLOW","filters":[],"operation":"GET","targets":[{"keys":[],"role":"OTHERS"}]}]},{"container":{"verb":"PUT"},"name":"my token to create container"}] auth_resp: list[dict[str, str|bytes]] = httpx_client.post('auth', headers=auth_header, content=json.dumps(auth_content)).json() for t in auth_resp: t["decoded_token"]: bytes = b64decode(t["token"]) for t in auth_resp: t["signed_token"] = sign_message(private_key, t["decoded_token"]).hex() for t in auth_resp: assert verify_message_signature(public_key, t["decoded_token"], bytes.fromhex(t["signed_token"])) print(auth_resp) # https://github.com/nspcc-dev/panel-fs-neo-org/blob/30598676da830e8eaff7cc34279d61fa5b939587/src/App.js#L350-L356 # https://github.com/NeoNEXT/neoline/blob/4667e690147352eafb6812301492b7767bc96132/src/app/popup/notification/neo3-signature/neo3-signature.component.ts#L81 msg: str = auth_resp[0]['token'] random_salt = os.urandom(16).hex() # suitable for cryptography parameter_hex_string = (random_salt + msg).encode().hex() assert len(parameter_hex_string) % 2 == 0 length_hex = num2VarInt(len(parameter_hex_string) // 2) concatenated_string = length_hex + parameter_hex_string serialized_transaction = '010001f0' + concatenated_string + '0000' signature = sign_message(private_key, serialized_transaction) assert verify_message_signature(public_key, serialized_transaction, signature) signature_hex_str: str = signature.hex() httpx_client.headers.update({"Authorization": f"Bearer {auth_resp[0]['token']}"}) bearer_header = { 'X-Bearer-Owner-Id': address, 'X-Bearer-Signature': signature_hex_str + random_salt, 'X-Bearer-Signature-Key': public_key, } create_container_resp = httpx_client.put('containers?walletConnect=true&name-scope-global=true', headers=bearer_header) print(create_container_resp) ``` ```python # crypto_utils.py import os import binascii import json import base58 from hashlib import scrypt, sha256 from base58 import b58decode_check, b58encode, b58encode_check from Crypto.Cipher import AES # pip install pycryptodome from Crypto.PublicKey import ECC from Crypto.Hash import SHA256 from Crypto.Signature import DSS from Crypto.Hash import RIPEMD160 import ecdsa def wif_to_private_key(wif: str) -> bytes: # Step 1: Decode the WIF key using Base58 decoded = base58.b58decode(wif) # Step 2: Verify the length of the decoded data if len(decoded) != 38: raise ValueError("Invalid WIF length") # The first byte is the version, the last 4 bytes are the checksum # version_byte = decoded[0] # This is usually 0x80 for Bitcoin mainnet private_key_bytes = decoded[1:-5] # Extract the private key part checksum = decoded[-4:] # Extract the checksum part # Step 3: Calculate the expected checksum hash1 = sha256(decoded[:-4]).digest() hash2 = sha256(hash1).digest() expected_checksum = hash2[:4] # Check if the provided checksum matches the calculated checksum if checksum != expected_checksum: raise ValueError("Invalid WIF checksum") return private_key_bytes def hex_to_private_key(hex: str) -> bytes: hex.removeprefix('0x') return binascii.unhexlify(hex) def private_key_to_neo3_public_key_and_address(private_key: bytes) -> (str, str): public_key = ECC.construct(curve='secp256r1', d=int.from_bytes(private_key, 'big')).pointQ x = public_key.x.to_bytes(32, 'big') prefix = b'\x02' if public_key.y % 2 == 0 else b'\x03' compressed_public_key = prefix + x verification_script = b'\x0c\x21' + compressed_public_key + b'\x41\x56\xe7\xb3\x27' ripemd160 = RIPEMD160.new() ripemd160.update(sha256(verification_script).digest()) script_hash = ripemd160.digest() address = b58encode_check(b'\x35' + script_hash).decode('utf-8') return compressed_public_key.hex(), address def sign_message(private_key: bytes, message: str | bytes) -> bytes: assert len(private_key) == 32 if type(message) is str: message: bytes = bytes.fromhex(message) pk = ecdsa.SigningKey.from_string(private_key, curve=ecdsa.NIST256p, hashfunc=sha256) signature: bytes = pk.sign_deterministic(message) return signature def verify_message_signature(public_key: str | bytes, message: str | bytes, signature: bytes) -> bool: if type(public_key) is str: public_key: bytes = bytes.fromhex(public_key) assert len(public_key) == 33 assert public_key.startswith(b'\x02') or public_key.startswith(b'\x03') if type(message) is str: message: bytes = bytes.fromhex(message) public_key: ecdsa.VerifyingKey = ecdsa.VerifyingKey.from_string(public_key, curve=ecdsa.NIST256p, hashfunc=sha256) result = public_key.verify(signature, message) return result ``` ```python # utils.py # https://github.com/CityOfZion/neon-js/blob/master/packages/neon-core/src/u/convert.ts def num2VarInt(num: int) -> str: if num < 0xfd: return num2hexstring(num) elif num <= 0xffff: # uint16 return "fd" + num2hexstring(num, 2, True) elif num <= 0xffffffff: # uint32 return "fe" + num2hexstring(num, 4, True) else: # uint64 return "ff" + num2hexstring(num, 8, True) def num2hexstring(num: int, size: int = 1, little_endian=False) -> str: if not isinstance(num, int): raise TypeError(f"num2hexstring expected a number but got {type(num)} instead.") if num < 0: raise ValueError(f"num2hexstring expected a positive integer but got {num} instead.") if size % 1 != 0: raise ValueError(f"num2hexstring expected a positive integer but got {num} instead.") if num > 2 ** 53 - 1: raise ValueError(f"num2hexstring expected a safe integer but got {num} instead.") size *= 2 hexstring = hex(num)[2:] hexstring = hexstring.zfill(size) if little_endian: hexstring = reverse_hex(hexstring) return hexstring def reverse_hex(hexstring: str) -> str: return ''.join(reversed([hexstring[i:i + 2] for i in range(0, len(hexstring), 2)])) ``` requirements: base58==2.1.1 ecdsa==0.19.0 httpx==0.28.1 pycryptodome==3.21.0
sami 2025-12-28 18:00:09 +00:00
Author
Owner

@roman-khimov commented on GitHub (Mar 4, 2025):

@mike-petrov, @cthulhu-rider

@roman-khimov commented on GitHub (Mar 4, 2025): @mike-petrov, @cthulhu-rider
Author
Owner

@cthulhu-rider commented on GitHub (Mar 4, 2025):

@Hecate2 pls share what error do u get and at what stage

@cthulhu-rider commented on GitHub (Mar 4, 2025): @Hecate2 pls share what error do u get and at what stage
Author
Owner

@Hecate2 commented on GitHub (Mar 4, 2025):

@Hecate2 pls share what error do u get and at what stage

I got an HTTP response code 400 at the end of main.py. Also I just do not know what to do next.

@Hecate2 commented on GitHub (Mar 4, 2025): > @Hecate2 pls share what error do u get and at what stage I got an HTTP response code 400 at the end of `main.py`. Also I just do not know what to do next.
Author
Owner

@cthulhu-rider commented on GitHub (Mar 4, 2025):

I got an HTTP response code 400

does the response contain any message? It should be present and indicate why the request is incorrectly formulated and help u in debug

@cthulhu-rider commented on GitHub (Mar 4, 2025): > I got an HTTP response code 400 does the response contain any message? It should be present and indicate why the request is incorrectly formulated and help u in debug
Author
Owner

@Hecate2 commented on GitHub (Mar 4, 2025):

I got an HTTP response code 400

does the response contain any message? It should be present and indicate why the request is incorrectly formulated and help u in debug

It responds that
{'message': "invalid session token: can't unmarshal session token: invalid session UUID version 1", 'type': 'GW'}
I found the message in github.com/nspcc-dev/neofs-sdk-go@v1.0.0-rc.12/session/common.go, return fmt.Errorf("invalid session UUID version %d", ver)
Is it because my signMessage process is wrong? Or am I missing any header?
And, the latest https://github.com/nspcc-dev/neofs-sdk-go/blob/master/session/common.go does not have error message about uuid

@Hecate2 commented on GitHub (Mar 4, 2025): > > I got an HTTP response code 400 > > does the response contain any message? It should be present and indicate why the request is incorrectly formulated and help u in debug It responds that `{'message': "invalid session token: can't unmarshal session token: invalid session UUID version 1", 'type': 'GW'}` I found the message in `github.com/nspcc-dev/neofs-sdk-go@v1.0.0-rc.12/session/common.go`, `return fmt.Errorf("invalid session UUID version %d", ver)` Is it because my signMessage process is wrong? Or am I missing any header? And, the latest https://github.com/nspcc-dev/neofs-sdk-go/blob/master/session/common.go does not have error message about uuid
Author
Owner

@cthulhu-rider commented on GitHub (Mar 4, 2025):

can't unmarshal session token: invalid session UUID version 1

it complains about the token field that you send in the Authorization header. IIUC u attach it here

httpx_client.headers.update({"Authorization": f"Bearer {auth_resp[0]['token']}"})

understanding what bearer token is sent would help

as i can see, @Hecate2 u dont modify UUID field intentionally, so either the GW sends u invalid token or u should handle response in other way

@mike-petrov @smallhive

@cthulhu-rider commented on GitHub (Mar 4, 2025): > can't unmarshal session token: invalid session UUID version 1 it complains about the token field that you send in the `Authorization` header. IIUC u attach it here ```py httpx_client.headers.update({"Authorization": f"Bearer {auth_resp[0]['token']}"}) ``` understanding what bearer token is sent would help as i can see, @Hecate2 u dont modify UUID field intentionally, so either the GW sends u invalid token or u should handle response in other way @mike-petrov @smallhive
Author
Owner

@mike-petrov commented on GitHub (Mar 13, 2025):

@Hecate2 At first we thought the problem was in the logic of your tokens, but it turns out to be simpler than that. You get this response in tokens:

[{
    'name': 'my-bearer-token',
    'token': '...',
    'type': 'object',
    'decoded_token': b '...',
    'signed_token': '...'
}, {
    'name': 'my token to create container',
    'token': '...',
    'type': 'container',
    'decoded_token': b '...',
    'signed_token': '...'
}]

The first one is for object, the second one is for container. But then you refer to the first object even though you're creating a container. So you just need to replace auth_resp[0] with auth_resp[1].

You also need to pass the body for the request with the necessary parameters, your code should look like this:

create_container_content = {
    "containerName": "test",
    "placementPolicy": "REP 3",
    "basicAcl": "eacl-public-read-write",
    "attributes": []
}
create_container_resp = httpx_client.put('containers?walletConnect=true&name-scope-global=true', headers=bearer_header, content=json.dumps(create_container_content)).json()
print(create_container_resp)
@mike-petrov commented on GitHub (Mar 13, 2025): @Hecate2 At first we thought the problem was in the logic of your tokens, but it turns out to be simpler than that. You get this response in tokens: ``` [{ 'name': 'my-bearer-token', 'token': '...', 'type': 'object', 'decoded_token': b '...', 'signed_token': '...' }, { 'name': 'my token to create container', 'token': '...', 'type': 'container', 'decoded_token': b '...', 'signed_token': '...' }] ``` The first one is for object, the second one is for container. But then you refer to the first object even though you're creating a container. So you just need to replace `auth_resp[0]` with `auth_resp[1]`. You also need to pass the body for the request with the necessary parameters, your code should look like this: ``` create_container_content = { "containerName": "test", "placementPolicy": "REP 3", "basicAcl": "eacl-public-read-write", "attributes": [] } create_container_resp = httpx_client.put('containers?walletConnect=true&name-scope-global=true', headers=bearer_header, content=json.dumps(create_container_content)).json() print(create_container_resp) ```
Author
Owner

@mike-petrov commented on GitHub (Mar 13, 2025):

We have a good example of using all the functionality in one interface: https://github.com/nspcc-dev/panel-fs-neo-org, I recommend running it. It can also be used for debugging purposes to see what happens when python scripts are executed.

Image
@mike-petrov commented on GitHub (Mar 13, 2025): We have a good example of using all the functionality in one interface: https://github.com/nspcc-dev/panel-fs-neo-org, I recommend running it. It can also be used for debugging purposes to see what happens when python scripts are executed. <img width="600" alt="Image" src="https://github.com/user-attachments/assets/333b40fe-510b-45d3-8b97-ea07b5778084" />
Author
Owner

@roman-khimov commented on GitHub (Mar 25, 2025):

Seems to be done to me.

@roman-khimov commented on GitHub (Mar 25, 2025): Seems to be done to me.
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
nspcc-dev/neofs-rest-gw#101
No description provided.