【Python】BitflyerのTickerをSQLiteに突っ込む

2018年3月30日Python,開発

おはようございます。

今回は、前回までに取得できるようにした Bitflyer の Ticker情報を、そのままSQLiteに保存してみました。

プログラムは前回のものを流用します。

【Python】pubnub と WebSocket で更にリアルタイムで情報を取得する

スポンサーリンク

新規クラスの追加

Utils/SQLiteUtil.py

import sqlite3
import logging
from contextlib import closing


class SQLiteUtil:
    u""" SQLite 操作用クラス """
    
    def __init__(self):
        """
        インスタンス作成時にテーブルを作る
        """
        self.db_name = "Ticker.db"
        self.create_db()

    def create_db(self):
        u""" データベース、及び必要なテーブルを作成します. """

        logging.info("create database")
        with closing(sqlite3.connect(self.db_name)) as conn:

            c = conn.cursor()

            # データクリア
            sql = "DELETE FROM TBL_TICKER"
            c.execute(sql)

            # ティッカーテーブル
            sql = "CREATE TABLE IF NOT EXISTS TBL_TICKER ("
            sql += "  PRODUCT_CODE TEXT"
            sql += ", TIME_STAMP TEXT"
            sql += ", TICK_ID TEXT"
            sql += ", BEST_BID REAL"
            sql += ", BEST_ASK REAL"
            sql += ", BEST_BID_SIZE REAL"
            sql += ", BEST_ASK_SIZE REAL"
            sql += ", TOTAL_BID_DEPTH REAL"
            sql += ", TOTAL_ASK_DEPTH REAL"
            sql += ", LTP REAL"
            sql += ", VOLUME REAL"
            sql += ", VOLUME_BY_PRODUCT REAL"
            sql += ", PRIMARY KEY (TICK_ID)"
            sql += ")"
            c.execute(sql)

            c.close()
            conn.commit()

    def insert_data(self, ticker):
        u""" 渡されたタプルデータを登録します """

        with closing(sqlite3.connect(self.db_name)) as conn:

            c = conn.cursor()
            # 猫データ
            sql = "INSERT INTO TBL_TICKER VALUES (?,?,?,?,?,?,?,?,?,?,?,?)"
            c.execute(sql, ticker)

            c.close()
            conn.commit()

プログラムの修正

BfApi.py

ライブラリ使用宣言の追加

    from dateutil import parser
    from datetime import timedelta
    
    from Utils.SQLiteUtil import SQLiteUtil

pubnubのコールバッククラスで、メッセージ受信時の処理を修正

class MySubscriberCallback(SubscribeCallback):
    """
    Pubnub登録のコールバッククラス
    """

    def __init__(self, client):
        self.cl = client
        self.sqlite_util = SQLiteUtil()

    def presence(self, pubnub, presence):
        pass  # handle incoming presence data

    def status(self, pubnub, status):
        if status.category == PNStatusCategory.PNUnexpectedDisconnectCategory:
            pass  # This event happens when radio / connectivity is lost

        elif status.category == PNStatusCategory.PNConnectedCategory:
            # Connect event. You can do stuff like publish, and know you'll get it.
            # Or just use the connected event to confirm you are subscribed for
            # UI / internal notifications, etc
            pass
        elif status.category == PNStatusCategory.PNReconnectedCategory:
            pass
            # Happens as part of our regular operation. This event happens when
            # radio / connectivity is lost, then regained.
        elif status.category == PNStatusCategory.PNDecryptionErrorCategory:
            pass
            # Handle message decryption error. Probably client configured to
            # encrypt messages and on live data feed it received plain text.

    def message(self, pubnub, message):
        """
        登録したチャンネルからメッセージを受信した際の処理
        :param pubnub:
        :param message:
        :return:
        """
        # WEBソケットを利用してクライアントに配信
        for c in self.cl:
            d = message.message
            self.sqlite_util.insert_data((
                d["product_code"]
                , self.parse_date(d["timestamp"])
                , d["tick_id"]
                , d["best_bid"]
                , d["best_ask"]
                , d["best_bid_size"]
                , d["best_ask_size"]
                , d["total_bid_depth"]
                , d["total_ask_depth"]
                , d["ltp"]
                , d["volume"]
                , d["volume_by_product"]
            ))

            c.write_message(message.message)

    def parse_date(self, iso_date):
        date_time = parser.parse(iso_date) + timedelta(hours=9)
        return date_time.strftime("%Y/%m/%d %H:%M:%S")

データを追加してみる

起動して一覧の更新をスタートしてみてから、
A5:SQL Mk2 にてデータの確認をしました。

A5SQL

無事にデータが追加されています。

まとめ

単純にpubnubだけ動かすツールを作って、データを収集するだけでもよさそうですね。
ただ、Ticker情報だとデータ量が膨大になるため、1分、5分、10分などで処理やテーブルを分けてしまってもいいかもしれません。

ではでは。

スポンサーリンク


関連するコンテンツ