I want to run a binance live stream function that get a generated symbol from another function each time. 1- top gainer function: scan the market and get top price gainer coin each certain time. 2- live stream run function: get the symbol from the first function then start websocket to pull data each time with a new symbol comes from top gainer function.
here is the code:
<
import json,time, asyncio,websockets
import pandas as pd
from binance.client import Client
import numpy as np
client = Client(api_key=pkey, api_secret=skey, tld='com', testnet=True)
interval = '1m'
data=pd.DataFrame(columns=['Date','Coin','Open', 'High', 'Low', 'Close', 'Volume', 'no_trades','Quote_Volume','Complete'])
async def top_3gainer_coins():
try:
print("Updating top gainer coins...")
time.sleep(90)
exchange_info = client.get_exchange_info()
spot_symbols = [
symbol['symbol'] for symbol in exchange_info['symbols'] if symbol['isSpotTradingAllowed']
and symbol['quoteAsset'] == 'USDT'
and symbol['status'] == 'TRADING' # Ensure the symbol is actively trading
]
all_data = []
for symbol in spot_symbols:
try:
data = (pd.DataFrame(client.get_klines(symbol=symbol, interval='5m', limit=12))).copy()
data = data.iloc[:, [0, 4, 5, 8]] # Keep only columns with timestamp, open, close, and volume
data.columns = ['Date', 'Close', 'Volume', 'NoTrades']
data['Date'] = pd.to_datetime(data['Date'], unit='ms')
for column in data.columns[1:]:
data[column] = pd.to_numeric(data[column])
data['pricegain'] = (data.Close.pct_change() * 100).fillna(0)
data['VGain'] = (data.Volume.pct_change() * 100).fillna(0)
data['TPS'] = (data.NoTrades / 60).fillna(0)
data.insert(1, 'Symbol', symbol)
data = data.iloc[-2:]
data = data.replace([np.inf, -np.inf], np.nan) # Replace inf with NaN
data = data.reset_index(drop=True) # Reset the index for each symbol's data
all_data.append(data)
except Exception as e:
print(f"Error fetching data for symbol {symbol}: {e}")
continue
if all_data:
all_data = pd.concat(all_data)
all_data = all_data.sort_values(by='pricegain', ascending=False)
all_data.reset_index(drop=True, inplace=True)
all_data.index += 1
top_gainer = all_data.head(1) # Select the top 3 gainers
top_gainer_coins = top_gainer['Symbol'].tolist() # Get the symbols of the top 3 gainers
print(top_gainer_coins)
print(f"Top 3 gainer coins\n{top_gainer.to_string()}")
except Exception as e:
print(f"Error: {e}")
return top_gainer_coins
def on_open(ws):
#print(f'Opened connection for {ws.symbol}')
pass
def on_close(ws):
print(f'Closed connection for {ws.symbol}')
async def on_message(ws, msg):
try:
msg = json.loads(msg)
start_time = pd.to_datetime(msg['k']['t'], unit='ms')
symbol = msg['s']
open_price = float(msg['k']['o'])
high = float(msg['k']['h'])
low = float(msg['k']['l'])
close = float(msg['k']['c'])
volume = float(msg['k']['v'])
no_trades = float(msg['k']['n'])
quote_volume = float(msg['k']['q'])
complete = msg['k']['x']
if complete:
data.loc[len(data)] = [start_time, symbol, open_price, high, low, close, volume, no_trades, quote_volume, complete]
print(data)
except Exception as e:
print(f"Error processing message for symbol {symbol}: {e}")
async def run(symbol):
print('system starts')
try:
symbol= symbol.lower()
print(f'Start working on coin: {symbol}')
socket = f"wss://stream.binance.com:9443/ws/{symbol}@kline_{interval}"
async with websockets.connect(socket) as ws:
while True:
response = await ws.recv()
on_message(ws, response)
await asyncio.sleep(1)
except Exception as e:
print(f'error on run : {e}')
pass
async def main():
while True:
try:
symbols =await top_3gainer_coins()
if symbols:
# Create a list of tasks for the run function with the symbols from top gainers
run_tasks = [asyncio.create_task(run(symbol)) for symbol in symbols]
await asyncio.gather(*run_tasks) # Wait for all run tasks to complete
except Exception as e:
print(f'error: {e}')
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as e:
print(f'error: {e}')