【Python】BitflyerのTickerをSQLiteに突っ込む
おはようございます。
今回は、前回までに取得できるようにした Bitflyer の Ticker情報を、そのままSQLiteに保存してみました。
プログラムは前回のものを流用します。
【Python】pubnub と WebSocket で更にリアルタイムで情報を取得する
スポンサーリンク
新規クラスの追加
Utils/SQLiteUtil.py
import sqlite3
import logging
from contextlib import closing
class SQLiteUtil:
u""" SQLite 操作用クラス """
def __init__(self):
"""
インスタンス作成時にテーブルを作る
"""
self.db_name = "Ticker.db"
self.create_db()
def create_db(self):
u""" データベース、及び必要なテーブルを作成します. """
logging.info("create database")
with closing(sqlite3.connect(self.db_name)) as conn:
c = conn.cursor()
# データクリア
sql = "DELETE FROM TBL_TICKER"
c.execute(sql)
# ティッカーテーブル
sql = "CREATE TABLE IF NOT EXISTS TBL_TICKER ("
sql += " PRODUCT_CODE TEXT"
sql += ", TIME_STAMP TEXT"
sql += ", TICK_ID TEXT"
sql += ", BEST_BID REAL"
sql += ", BEST_ASK REAL"
sql += ", BEST_BID_SIZE REAL"
sql += ", BEST_ASK_SIZE REAL"
sql += ", TOTAL_BID_DEPTH REAL"
sql += ", TOTAL_ASK_DEPTH REAL"
sql += ", LTP REAL"
sql += ", VOLUME REAL"
sql += ", VOLUME_BY_PRODUCT REAL"
sql += ", PRIMARY KEY (TICK_ID)"
sql += ")"
c.execute(sql)
c.close()
conn.commit()
def insert_data(self, ticker):
u""" 渡されたタプルデータを登録します """
with closing(sqlite3.connect(self.db_name)) as conn:
c = conn.cursor()
# 猫データ
sql = "INSERT INTO TBL_TICKER VALUES (?,?,?,?,?,?,?,?,?,?,?,?)"
c.execute(sql, ticker)
c.close()
conn.commit()プログラムの修正
BfApi.py
ライブラリ使用宣言の追加
from dateutil import parser
from datetime import timedelta
from Utils.SQLiteUtil import SQLiteUtil
pubnubのコールバッククラスで、メッセージ受信時の処理を修正
class MySubscriberCallback(SubscribeCallback):
"""
Pubnub登録のコールバッククラス
"""
def __init__(self, client):
self.cl = client
self.sqlite_util = SQLiteUtil()
def presence(self, pubnub, presence):
pass # handle incoming presence data
def status(self, pubnub, status):
if status.category == PNStatusCategory.PNUnexpectedDisconnectCategory:
pass # This event happens when radio / connectivity is lost
elif status.category == PNStatusCategory.PNConnectedCategory:
# Connect event. You can do stuff like publish, and know you'll get it.
# Or just use the connected event to confirm you are subscribed for
# UI / internal notifications, etc
pass
elif status.category == PNStatusCategory.PNReconnectedCategory:
pass
# Happens as part of our regular operation. This event happens when
# radio / connectivity is lost, then regained.
elif status.category == PNStatusCategory.PNDecryptionErrorCategory:
pass
# Handle message decryption error. Probably client configured to
# encrypt messages and on live data feed it received plain text.
def message(self, pubnub, message):
"""
登録したチャンネルからメッセージを受信した際の処理
:param pubnub:
:param message:
:return:
"""
# WEBソケットを利用してクライアントに配信
for c in self.cl:
d = message.message
self.sqlite_util.insert_data((
d["product_code"]
, self.parse_date(d["timestamp"])
, d["tick_id"]
, d["best_bid"]
, d["best_ask"]
, d["best_bid_size"]
, d["best_ask_size"]
, d["total_bid_depth"]
, d["total_ask_depth"]
, d["ltp"]
, d["volume"]
, d["volume_by_product"]
))
c.write_message(message.message)
def parse_date(self, iso_date):
date_time = parser.parse(iso_date) + timedelta(hours=9)
return date_time.strftime("%Y/%m/%d %H:%M:%S")データを追加してみる
起動して一覧の更新をスタートしてみてから、
A5:SQL Mk2 にてデータの確認をしました。
無事にデータが追加されています。
まとめ
単純にpubnubだけ動かすツールを作って、データを収集するだけでもよさそうですね。
ただ、Ticker情報だとデータ量が膨大になるため、1分、5分、10分などで処理やテーブルを分けてしまってもいいかもしれません。
ではでは。
ディスカッション
コメント一覧
まだ、コメントがありません