はじめに

この実験 (lab) では、SQLite 内で JSON データを処理する方法を学びます。SQLite データベース内での JSON データの保存、抽出、フィルタリング、および更新の方法を検証します。この実験 (lab) は、SQLite での JSON データの操作に関する実践的な入門となり、これは現代のデータ管理においてますます価値のあるスキルです。

データベースとテーブルの作成

このステップでは、SQLite データベースと JSON データを格納するためのテーブルを作成します。SQLite は、データを単一のファイルに格納する軽量なデータベースであり、管理が容易です。

まず、ターミナルを開きます。デフォルトのパスは /home/labex/project です。

次に、データベースを格納するためのディレクトリを作成しましょう。

mkdir sqlite_json
cd sqlite_json

これらのコマンドは、sqlite_json という名前のディレクトリを作成し、現在のディレクトリをそのディレクトリに変更します。これにより、プロジェクトファイルが整理されます。

次に、mydatabase.db という名前の SQLite データベースを作成します。

sqlite3 mydatabase.db

このコマンドは SQLite シェルを開き、mydatabase.db データベースに接続します。データベースファイルが存在しない場合、SQLite はそれを作成します。

次に、idname、および details の列を持つ products という名前のテーブルを作成します。details 列は、JSON データをテキストとして格納します。

CREATE TABLE products (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT,
    details TEXT
);

この SQL コマンドは、products テーブルを作成します。

  • id: 各新しい製品に対して自動的にインクリメントされる一意の整数 (unique integer)。
  • name: 製品の名前 (例:Laptop, Smartphone)。
  • details: 製品の JSON データを格納するためのテキストフィールド (text field)。

JSON データの挿入

このステップでは、JSON データを products テーブルに挿入します。

products テーブルに 2 つのサンプルレコードを挿入しましょう。

INSERT INTO products (name, details) VALUES (
    'Laptop',
    '{"brand": "Dell", "model": "XPS 13", "specs": {"cpu": "Intel i7", "memory": "16GB", "storage": "512GB SSD"}}'
);

INSERT INTO products (name, details) VALUES (
    'Smartphone',
    '{"brand": "Samsung", "model": "Galaxy S21", "specs": {"display": "6.2 inch", "camera": "12MP", "storage": "128GB"}}'
);

これらの INSERT ステートメントは、products テーブルに 2 つの行を追加します。details 列には、テキスト文字列 (text string) として JSON データが含まれています。

データが正しく挿入されたことを確認するには、次のクエリを実行します。

SELECT * FROM products;

期待される出力 (Expected Output):

1|Laptop|{"brand": "Dell", "model": "XPS 13", "specs": {"cpu": "Intel i7", "memory": "16GB", "storage": "512GB SSD"}}
2|Smartphone|{"brand": "Samsung", "model": "Galaxy S21", "specs": {"display": "6.2 inch", "camera": "12MP", "storage": "128GB"}}

この出力は、JSON データが products テーブルに正常に格納されたことを確認します。

カスタム関数による JSON フィールドの抽出

SQLite には組み込みの JSON 関数がないため、JSON 文字列からデータを抽出するためのカスタム Python 関数を作成します。

まず、SQLite シェルを終了します。

.exit

次に、json_extractor.py という名前の Python ファイルを作成します。

nano json_extractor.py

以下の Python コードを json_extractor.py ファイルに貼り付けます。

## 必要なライブラリをインポートします
import sqlite3
import json

## パスを使用して JSON 文字列から値を抽出する関数を定義します
def json_extract(json_str, path):
    try:
        ## JSON 文字列を Python の辞書にパースします
        json_data = json.loads(json_str)
        ## パスをコンポーネントに分割します(例:'specs.cpu' は ['specs', 'cpu'] になります)
        path_components = path.split('.')
        ## 完全な JSON オブジェクトから開始します
        value = json_data
        ## パスコンポーネントを使用して JSON オブジェクトをトラバースします
        for component in path_components:
            ## 現在のコンポーネントの値を取得します
            value = value.get(component)
        ## 最終的な値を返します
        return value
    ## JSON が無効であるか、パスが存在しない場合のエラーを処理します
    except (json.JSONDecodeError, AttributeError, TypeError):
        return None

## データベースに接続し、カスタム関数を登録する関数を定義します
def connect_db(db_path):
    ## 指定されたパスの SQLite データベースに接続します
    conn = sqlite3.connect(db_path)
    ## 'json_extract' Python 関数をカスタム SQL 関数として登録します
    ## "json_extract" は SQL での名前、2 は引数の数、json_extract は Python 関数です
    conn.create_function("json_extract", 2, json_extract)
    ## データベース接続を返します
    return conn

## スクリプトが直接実行されたときに実行されるブロックです
if __name__ == '__main__':
    ## データベースに接続し、関数を登録します
    conn = connect_db('mydatabase.db')
    ## SQL クエリを実行するためのカーソルオブジェクトを作成します
    cursor = conn.cursor()

    ## カスタム SQL 関数を使用して、'products' テーブルの 'name' が 'Laptop' である行の 'details' 列から 'brand' を抽出します
    cursor.execute("SELECT json_extract(details, 'brand') FROM products WHERE name = 'Laptop'")
    ## 結果を取得して表示します
    print(cursor.fetchone()[0])

    ## カスタム SQL 関数を使用して、'products' テーブルの 'name' が 'Laptop' である行のネストされた 'specs' オブジェクトから 'cpu' を抽出します
    cursor.execute("SELECT json_extract(details, 'specs.cpu') FROM products WHERE name = 'Laptop'")
    ## 結果を取得して表示します
    print(cursor.fetchone()[0])

    ## データベース接続を閉じます
    conn.close()

この Python コードは、JSON 文字列とパスを入力として受け取り、そのパスの値を取得する json_extract 関数を定義しています。また、SQLite データベースに接続し、json_extract 関数を登録するための connect_db 関数も含まれています。

  • json.loads(json_str): この行は、JSON 文字列を Python の辞書にパースします。
  • path.split('.'): これは、パスをコンポーネントのリストに分割します。例えば、'specs.cpu'['specs', 'cpu'] になります。
  • ループはパスコンポーネントを反復処理し、JSON データ内のネストされた値にアクセスします。

ファイルを保存して nano を終了します。

次に、Python スクリプトを実行します。

python3 json_extractor.py

期待される出力:

Dell
Intel i7

このスクリプトはデータベースに接続し、json_extract 関数を登録してから、それを使用して Laptop のブランドと CPU を抽出します。

JSON クエリを使用したデータのフィルタリング

このステップでは、カスタム json_extract 関数を使用して、JSON フィールド内の値に基づいてデータをフィルタリングします。

json_extractor.py ファイルを再度開きます。

nano json_extractor.py

データベースをクエリするための関数を含めるように json_extractor.py ファイルを変更します。

## 必要なライブラリをインポートします
import sqlite3
import json

## パスを使用して JSON 文字列から値を抽出する関数を定義します
def json_extract(json_str, path):
    try:
        ## JSON 文字列を Python の辞書に解析します
        json_data = json.loads(json_str)
        ## パスをコンポーネントに分割します (例:'specs.cpu' は ['specs', 'cpu'] になります)
        path_components = path.split('.')
        ## 完全な JSON オブジェクトから開始します
        value = json_data
        ## パスコンポーネントを使用して JSON オブジェクトをトラバースします
        for component in path_components:
            ## 現在のコンポーネントの値を取得します
            value = value.get(component)
        ## 最終的な値を返します
        return value
    ## JSON が無効な場合やパスが存在しない場合のエラーを処理します
    except (json.JSONDecodeError, AttributeError, TypeError):
        return None

## データベースに接続し、カスタム関数を登録する関数を定義します
def connect_db(db_path):
    ## 指定されたパスの SQLite データベースに接続します
    conn = sqlite3.connect(db_path)
    ## 'json_extract' Python 関数をカスタム SQL 関数として登録します
    conn.create_function("json_extract", 2, json_extract)
    ## データベース接続を返します
    return conn

## JSON フィールドに基づいて製品をフィルタリングする関数を定義します
def filter_products(db_path, json_path, value):
    ## データベースに接続します
    conn = connect_db(db_path)
    ## カーソルオブジェクトを作成します
    cursor = conn.cursor()
    ## JSON 値でフィルタリングするために f-string を使用して SQL クエリを作成します
    query = f"SELECT * FROM products WHERE json_extract(details, '{json_path}') = '{value}'"
    ## クエリを実行します
    cursor.execute(query)
    ## 一致するすべての結果を取得します
    results = cursor.fetchall()
    ## データベース接続を閉じます
    conn.close()
    ## 結果を返します
    return results

## スクリプトが直接実行されたときに実行されるブロックです
if __name__ == '__main__':
    ## 使用例:
    ## ブランドが 'Dell' の製品をフィルタリングします
    dell_products = filter_products('mydatabase.db', 'brand', 'Dell')
    print("Products with brand 'Dell':", dell_products)

    ## CPU が 'Intel i7' の製品をフィルタリングします
    intel_products = filter_products('mydatabase.db', 'specs.cpu', 'Intel i7')
    print("Products with CPU 'Intel i7':", intel_products)

このコードは、データベースパス、JSON パス、および値を入力として受け取る filter_products 関数を追加します。次に、データベースに接続し、json_extract 関数を登録し、指定された JSON パスにある値が指定された値と一致するすべての製品を見つけるためのクエリを実行します。

ファイルを保存して nano を終了します。

次に、Python スクリプトを実行します。

python3 json_extractor.py

期待される出力:

Products with brand 'Dell': [(1, 'Laptop', '{"brand": "Dell", "model": "XPS 13", "specs": {"cpu": "Intel i7", "memory": "16GB", "storage": "512GB SSD"}}')]
Products with CPU 'Intel i7': [(1, 'Laptop', '{"brand": "Dell", "model": "XPS 13", "specs": {"cpu": "Intel i7", "memory": "16GB", "storage": "512GB SSD"}}')]

この出力は、指定された基準に一致する製品を示しています。

JSON 値の更新

このステップでは、JSON フィールド内の値を更新する方法を学びます。

json_extractor.py ファイルを開きます。

nano json_extractor.py

json_extractor.py ファイルを修正して、JSON とデータベースを更新する関数を含めます。

## 必要なライブラリをインポート
import sqlite3
import json

## パスを使用して JSON 文字列から値を抽出する関数を定義
def json_extract(json_str, path):
    try:
        ## JSON 文字列を Python の辞書に解析
        json_data = json.loads(json_str)
        ## パスをコンポーネントに分割
        path_components = path.split('.')
        ## 完全な JSON オブジェクトから開始
        value = json_data
        ## JSON オブジェクトをトラバース
        for component in path_components:
            value = value.get(component)
        return value
    except (json.JSONDecodeError, AttributeError, TypeError):
        return None

## データベースに接続し、カスタム関数を登録する関数を定義
def connect_db(db_path):
    ## データベースに接続
    conn = sqlite3.connect(db_path)
    ## カスタム SQL 関数を登録
    conn.create_function("json_extract", 2, json_extract)
    return conn

## JSON フィールドに基づいて製品をフィルタリングする関数を定義
def filter_products(db_path, json_path, value):
    ## データベースに接続
    conn = connect_db(db_path)
    ## カーソルオブジェクトを作成
    cursor = conn.cursor()
    ## JSON 値でフィルタリングする SQL クエリを作成
    query = f"SELECT * FROM products WHERE json_extract(details, '{json_path}') = '{value}'"
    ## クエリを実行
    cursor.execute(query)
    ## 一致するすべての結果を取得
    results = cursor.fetchall()
    ## 接続を閉じる
    conn.close()
    return results

## JSON 文字列内の値を更新する関数を定義
def update_json_value(json_str, path, new_value):
    try:
        ## JSON 文字列を Python の辞書に解析
        json_data = json.loads(json_str)
        ## パスをコンポーネントに分割
        path_components = path.split('.')
        ## ターゲット値の親に移動
        target = json_data
        for i in range(len(path_components) - 1):
            target = target.get(path_components[i])

        ## ターゲット値を更新
        target[path_components[-1]] = new_value
        ## Python の辞書を JSON 文字列に戻す
        return json.dumps(json_data)
    except (json.JSONDecodeError, AttributeError, TypeError):
        ## エラーが発生した場合は、元の JSON 文字列を返す
        return json_str

## データベース内の製品の詳細を更新する関数を定義
def update_product_details(db_path, product_name, json_path, new_value):
    ## データベースに接続
    conn = sqlite3.connect(db_path)
    ## カーソルオブジェクトを作成
    cursor = conn.cursor()

    ## 指定された製品の現在の詳細を取得
    ## '?' は SQL インジェクションを防ぐためのプレースホルダーです
    cursor.execute("SELECT details FROM products WHERE name = ?", (product_name,))
    result = cursor.fetchone()
    ## 製品が存在しない場合は、接続を閉じて False を返す
    if not result:
        conn.close()
        return False

    ## 現在の JSON 詳細文字列を取得
    current_details = result[0]

    ## 新しい値で JSON 文字列を更新
    updated_details = update_json_value(current_details, json_path, new_value)

    ## 変更された JSON 文字列でデータベースを更新
    cursor.execute("UPDATE products SET details = ? WHERE name = ?", (updated_details, product_name))
    ## データベースに変更をコミット
    conn.commit()
    ## 接続を閉じる
    conn.close()
    return True

## スクリプトが直接実行されたときに実行されるブロック
if __name__ == '__main__':
    ## 使用例:ラップトップのメモリを 32GB に更新
    update_product_details('mydatabase.db', 'Laptop', 'specs.memory', '32GB')
    print("Laptop memory updated to 32GB")

    ## データを再度取得して更新を確認
    conn = sqlite3.connect('mydatabase.db')
    cursor = conn.cursor()
    cursor.execute("SELECT details FROM products WHERE name = 'Laptop'")
    ## 更新された詳細を取得
    updated_details = cursor.fetchone()[0]
    print("Updated Laptop details:", updated_details)
    ## 接続を閉じる
    conn.close()

このコードは 2 つの関数を追加します。

  • update_json_value: この関数は、JSON 文字列、パス、および新しい値を入力として受け取ります。JSON 文字列を解析し、指定されたパスの値を確認し、更新された JSON 文字列を返します。
  • update_product_details: この関数は、データベースパス、製品名、JSON パス、および新しい値を入力として受け取ります。データベースに接続し、製品の現在の JSON データを取得し、update_json_value を使用して指定されたパスの値を確認し、次に変更された JSON データでデータベースを更新します。

ファイルを保存して nano を終了します。

次に、Python スクリプトを実行します。

python3 json_extractor.py

期待される出力:

Laptop memory updated to 32GB
Updated Laptop details: {"brand": "Dell", "model": "XPS 13", "specs": {"cpu": "Intel i7", "memory": "32GB", "storage": "512GB SSD"}}

この出力は、ラップトップのメモリがデータベースで 32GB に更新されたことを確認します。

まとめ

この実験 (lab) では、SQLite 内で JSON データを処理する方法を学びました。まず、データベースと JSON データを格納するテーブルを作成しました。次に、JSON データをテーブルに挿入する方法を学びました。JSON データから特定のフィールドを抽出するカスタム Python 関数を作成し、この関数を使用して JSON フィールド内の値に基づいてデータをフィルタリングしました。最後に、カスタム Python 関数を使用して JSON フィールド内の値を更新する方法を学びました。これらのスキルは、SQLite データベース内で JSON データを効果的に管理するための基礎となります。