Sempre crie uma frase de segurança única para jogos, testnets ou airdrops e evite usar sua carteira principal.
Este artigo apresenta um guia completo para conectar-se a um banco de dados PostgreSQL usando Python, manipular intervalos de chaves privadas Bitcoin SV, e realizar operações de verificação paralela utilizando múltiplas threads. O código ilustrado exemplifica boas práticas em conexão de banco, manipulação de dados, uso de criptografia, e otimização para operações intensivas.
O script começa limpando o terminal para facilitar a leitura das saídas, seguido pela definição da URL do banco de dados com os parâmetros necessários para a conexão segura:
O script implementa funções para criar tabelas específicas dinamicamente, armazenar subintervalos de chaves privadas em hexadecimal, e selecionar linhas aleatórias para processamento incremental:
O script divide o intervalo de chaves em partes iguais para processamento em paralelo usando concurrent.futures.ThreadPoolExecutor. Essa abordagem melhora o desempenho significativamente para verificação de grandes volumes de chaves.
Após executar a função principal, o usuário seleciona o puzzle (identificador) que deseja trabalhar. O script exibe informações sobre subranges e progresso e então inicia a análise das chaves, mostrando o total de chaves válidas e inválidas no final.
Esse fluxo é ideal para quem trabalha com segurança em blockchain, desenvolve ferramentas de análise de chaves, ou gerencia bancos de dados de puzzles criptográficos.
import pg8000
import urllib.parse
import random
import time
import os
import datetime
import gc
import hashlib
from ecdsa import SECP256k1
import sys
import concurrent.futures
# Limpa o terminal para uma visualização mais limpa
os.system('cls' if os.name == 'nt' else 'clear')
# URL do banco de dados
DATABASE_URL = "postgresql://neondb_owner:.....us-east-1.aws.neon.tech/neondb?sslmode=require"
url = urllib.parse.urlparse(DATABASE_URL)
# Função para conectar ao banco de dados
def conectar_banco():
try:
return pg8000.connect(
user=url.username,
password=url.password,
host=url.hostname,
port=5432,
database=url.path[1:],
ssl_context=True
)
except Exception as e:
print(f"Erro ao conectar ao banco de dados: {e}")
raise
# Função para buscar as identificações dos puzzles
def buscar_identificacoes():
try:
with conectar_banco() as conn:
cursor = conn.cursor()
cursor.execute("SELECT identificacao FROM lista_puzzle;")
return [puzzle[0] for puzzle in cursor.fetchall()]
except Exception as e:
print(f"Erro ao buscar identificações: {e}")
return []
# Função para buscar o puzzle selecionado
def buscar_puzzle(user_choice):
try:
with conectar_banco() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM lista_puzzle WHERE identificacao = %s;", (user_choice,))
return cursor.fetchone()
except Exception as e:
print(f"Erro ao buscar puzzle: {e}")
return None
# Função para criar a tabela no banco de dados
def criar_tabela(table_name):
try:
with conectar_banco() as conn:
cursor = conn.cursor()
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS {table_name} (
id SERIAL PRIMARY KEY,
inicio_subrange TEXT,
final_subrange TEXT,
progresso_subrange TEXT,
datahora TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")
cursor.execute(f"""
CREATE UNIQUE INDEX IF NOT EXISTS unique_inicio_subrange ON {table_name} (inicio_subrange);
""")
conn.commit()
except Exception as e:
print(f"Erro ao criar tabela {table_name}: {e}")
# Função para inserir partes na tabela
def inserir_partes(table_name, indices_aleatorios, inicio_intervalo, tamanho_parte, num_partes):
tentativas = 0
sucesso = False
while tentativas < 3 and not sucesso:
parte_inicio = inicio_intervalo + (indices_aleatorios[0] * tamanho_parte)
parte_fim = parte_inicio + tamanho_parte
parte_inicio_hex = hex(parte_inicio)
parte_fim_hex = hex(parte_fim)
try:
with conectar_banco() as conn:
cursor = conn.cursor()
cursor.execute(f"SELECT to_regclass('{table_name}')")
if cursor.fetchone()[0] is None:
criar_tabela(table_name)
cursor.execute(f"SELECT * FROM {table_name} WHERE inicio_subrange = %s;", (parte_inicio_hex,))
if cursor.fetchone() is None:
datahora_atual = datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
cursor.execute(f"""
INSERT INTO {table_name} (inicio_subrange, final_subrange, progresso_subrange, datahora)
VALUES (%s, %s, %s, %s);
""", (parte_inicio_hex, parte_fim_hex, None, datahora_atual))
sucesso = True
except Exception as e:
print(f"Erro ao inserir partes na tabela {table_name}: {e}")
tentativas += 1
indices_aleatorios = random.sample(range(num_partes), 1)
# Função para escolher uma linha aleatória da tabela
def escolher_linha_aleatoria(table_name):
try:
with conectar_banco() as conn:
cursor = conn.cursor()
cursor.execute(f"SELECT inicio_subrange, final_subrange, progresso_subrange, datahora FROM {table_name} ORDER BY RANDOM() LIMIT 1;")
return cursor.fetchone()
except Exception as e:
print(f"Erro ao escolher linha aleatória: {e}")
return None
# Função para preencher variáveis com base na linha aleatória escolhida
def procedimento_preencher_variaveis(table_name):
tentativas = 0
while tentativas < 20:
random_row = escolher_linha_aleatoria(table_name)
if not random_row:
return None
datahora_db = random_row[3]
if not datahora_db:
return None
if isinstance(datahora_db, str):
datahora_db = datetime.datetime.strptime(datahora_db, '%Y-%m-%d %H:%M:%S')
if not isinstance(datahora_db, datetime.datetime):
return None
tempo_atual = datetime.datetime.utcnow()
delta_tempo = tempo_atual - datahora_db
segundos_diff = delta_tempo.total_seconds()
if segundos_diff >= 60:
return random_row
tentativas += 1
time.sleep(1)
return None
# Função para gerar a chave pública a partir de uma chave privada (comprimida)
def private_key_to_public_key_compressed(private_key):
#print(f'0x{hex(private_key)[2:].zfill(64)}')
public_key_point = SECP256k1.generator * private_key
if public_key_point.y() % 2 == 0:
public_key_bytes = b'\x02' + public_key_point.x().to_bytes(32, byteorder='big')
else:
public_key_bytes = b'\x03' + public_key_point.x().to_bytes(32, byteorder='big')
return public_key_bytes
# Função para gerar o endereço Bitcoin SV (com base na chave pública comprimida)
def public_key_to_address(public_key):
sha256_hash = hashlib.sha256(public_key).digest()
ripemd160_hash = hashlib.new("ripemd160", sha256_hash).digest()
prefixed = b'\x00' + ripemd160_hash
checksum = hashlib.sha256(hashlib.sha256(prefixed).digest()).digest()[:4]
address = prefixed + checksum
alphabet = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
num = int.from_bytes(address, 'big')
encoded_address = ''
while num > 0:
num, mod = divmod(num, 58)
encoded_address = alphabet[mod] + encoded_address
n = 0
for byte in address:
if byte == 0:
n += 1
else:
break
return alphabet[0] * n + encoded_address
# Função para verificar se uma chave privada é válida
def is_valid_private_key(private_key_int):
if private_key_int <= 0 or private_key_int >= SECP256k1.order:
return False
public_key = private_key_to_public_key_compressed(private_key_int)
address = public_key_to_address(public_key)
return address[0] == '1'
# Função para verificar chaves em um intervalo
def verificar_chaves_no_intervalo(inicio, fim):
valid_keys_local = 0
invalid_keys_local = 0
for private_key_int in range(inicio, fim + 1):
if is_valid_private_key(private_key_int):
valid_keys_local += 1
else:
invalid_keys_local += 1
return valid_keys_local, invalid_keys_local
# Função principal do script
def main():
identificacoes = buscar_identificacoes()
colunas = 12
linhas = len(identificacoes) // colunas + (1 if len(identificacoes) % colunas != 0 else 0)
for i in range(linhas):
linha = [str(identificacoes[i + j * linhas]) for j in range(colunas) if i + j * linhas < len(identificacoes)]
print(" |\t ".join(linha))
user_choice = input("\nQual puzzle deseja executar? (Escolha o identificador): ")
selected_puzzle = buscar_puzzle(user_choice)
if selected_puzzle:
identificacao, inicio_range, fim_range, carteira, ativo = selected_puzzle
else:
print("Puzzle não encontrado!")
return
inicio_intervalo = int(f'0x{inicio_range}', 16)
fim_intervalo = int(f'0x{fim_range}', 16)
diferenca = fim_intervalo - inicio_intervalo
num_partes = 1000000000
tamanho_parte = diferenca // num_partes
table_name = f"puzzle{identificacao}"
criar_tabela(table_name)
indices_aleatorios = random.sample(range(num_partes), 1)
inserir_partes(table_name, indices_aleatorios, inicio_intervalo, tamanho_parte, num_partes)
random_row = procedimento_preencher_variaveis(table_name)
gc.collect()
base_inicial = inicio_intervalo
base_final = fim_intervalo
if random_row:
print("\nDados da linha com diferença de tempo maior que 1 minuto:")
print(f"Início do Subrange: {random_row[0]}")
print(f"Final do Subrange: {random_row[1]}")
print(f"Progresso Subrange: {random_row[2]}")
print(f"Data e Hora do Registro: {random_row[3]}")
base_inicial = int(random_row[0], 16) if random_row[0] else base_inicial
base_final = int(random_row[1], 16) if random_row[1] else base_final
else:
print("Nenhuma linha encontrada com a diferença de tempo requerida.")
print(f"Base Inicial: {hex(base_inicial)} - Base Final: {hex(base_final)}")
valid_keys = 0
invalid_keys = 0
start_time = time.time()
total_keys = base_final - base_inicial + 1
chaves_contar = 100000 # A cada 100.000 chaves, imprime o progresso
# Aqui, dividimos o trabalho em múltiplos threads
num_threads = 8 # Por exemplo, 8 threads (ajuste conforme seu sistema)
step = (base_final - base_inicial) // num_threads
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = []
for i in range(num_threads):
start = base_inicial + i * step
end = base_inicial + (i + 1) * step - 1 if i < num_threads - 1 else base_final
futures.append(executor.submit(verificar_chaves_no_intervalo, start, end))
for future in concurrent.futures.as_completed(futures):
valid_keys_local, invalid_keys_local = future.result()
valid_keys += valid_keys_local
invalid_keys += invalid_keys_local
# Exibe os resultados finais
print(f"\nTotal de chaves válidas: {valid_keys}")
print(f"Total de chaves inválidas: {invalid_keys}")
# Inicia o processo
if __name__ == "__main__":
main()
Comentários
Comente só assim vamos crescer juntos!