Blog logo콜로리 블로그
개발 > 데이터
2달 전

시계열 데이터베이스 InfluxDB 사용해보기 - 4편 - 데이터 다루기 Part. 2

Post thumbnailImage

이전글 - 시계열 데이터베이스 InfluxDB 사용해보기 - 4편 - 데이터 다루기


서론

지난 파트에서는 데이터를 다루기 위한 방법들을 알아보았다.

이번에는 파이썬(Python)을 이용해 프로그래밍 방식으로 데이터를 직접 다루어보자.


주식의 종목별 일자 가격 정보를 저장하는 것을 예시로

DB에 접속하는 방법과 CRUD 기본 예시를 준비했다.


참고로 InfluxDB는 스키마리스(Schema less)로 RDBMS와 같이 테이블을 정의하는 과정이 없으므로,

DB 구성만 되어 있다면 바로 아래 코드를 참조하여 데이터를 다룰 수 있다.




가이드


1. 토큰(Token) 확인

지난 2편 설정 에서 메모해두었던 토큰값을 이용하게 된다.

혹시나 잊어버렸다면 Influx UI 의 API Token에서 새로 생성이 가능하다.

(보안상의 이유로 만들어진 토큰 정보를 볼 수는 없는듯하다.)




2. 커넥션 연결

DB 연결을 위한 커넥션을 맺고 client를 반환하는 함수다.

실제 CRUD 조작시에는 해당 client 에서 용도별 api를 반환받아 이용하게 된다.


bucket 정보는 데이터 조작시에 지정하게되고,

org도 DB 정보를 세팅할 때 입력하게 되는 값이므로 잘 입력해주자.

# python
BUCKET='{BUCKET}'
ORG='{ORG}'
MEASUREMENT = 'stock_tickers'

def get_client():
  token = '{TOKEN}'
  url = 'http://localhost:8086'

  return influxdb_client.InfluxDBClient(url=url, token=token, org=ORG)




3. 데이터 생성 & 수정

아래 코드에서 main() -> insert() 순으로 보면 된다.

위의 get_client() 함수의 반환값으로 write_api() 함수를 이용해 write() 함수로 데이터를 생성&수정 할 수 있게 된다.


get_client() 함수와 with 절을 같이 사용하는 이유는 처리후 커넥션을 반환하기 위한 close() 함수를 자동으로 호출하기 위함이다.



가상의 주식 종목 A, B를 일자별로 저장하는 예시 코드이다.

A라는 종목은 2024. 04. 01 일자부터 5일동안 가격을 저장하고, B라는 종목은 3일간 가격 정보를 저장한다.


추가로 Update(수정) 예시는 # update 부분을 참조하면 된다.

update 함수가 따로 있는것이 아니라 write() 함수를 같이 사용하게 된다.

Influx DB에서의 PK는 시간과 Tag의 조합으로 이해하면 좋은데,

Update는 PK기준으로 데이터를 덮어씌우는 개념인 것이다.


# python
def insert(client: InfluxDBClient):
  items = []

  for i in range(5):
    items.append(
      Point('stock_tickers')
        .tag('stock_code', 'A')
        .field('close_price', 100 + i)
        .field('volume', 1000 + i)
        .time(datetime(2024, 4, i + 1, 0, 0, 0))
    )

  for i in range(3):
    items.append(
      Point('stock_tickers')
        .tag('stock_code', 'B')
        .field('close_price', 200 + i)
        .field('volume', 2000 + i)
        .time(datetime(2024, 4, i + 1, 0, 0, 0))
    )

  # update
  items.append(
    Point('stock_tickers')
      .tag('stock_code', 'B')
      .field('close_price', 999)
      .field('volume', 999999)
      .time(datetime(2024, 4, 1, 0, 0, 0))
  )

  with client.write_api(write_options=SYNCHRONOUS) as writer:
    writer.write(bucket=BUCKET, record=items)


def main():
  with get_client() as client:
    insert(client)

if __name__ == '__main__':
  main()




4. 조회

조회는 client에서 query_api() 함수를 호출해 api를 이용할 수 있다.


주가 종목 데이터이므로 종목 코드(stock_code)가 PK 컬럼이다.

stock_code를 인자로 넘겨받아 A 혹은 B 종목의 주가를 조회하기 위한 함수이다.


설명

  • range
  • 조회하려는 measurement에서 시간 범위이며 -5d 와 같은 시간 단위로 현재 시간 기준으로 범위 조회도 가능하다.
  • filter
  • 조건을 입력하는 부분이며 Lambda 식의 함수와 유사하게 생겼다. _measurement 에는 테이블 개념과 같은 measurement의 이름을 입력하고 필요한 컬럼의 조건을 입력할 수 있다.
  • 쿼리된 결과는 아래 for문을 참조하자.


# python
def get_tickers_by_stock(client: InfluxDBClient, stock_code: str):
  query_api = client.query_api()

  query = f"""from(bucket: "{BUCKET}")
  |> range(start: 2024-04-01T00:00:00Z, stop: 2024-04-30T00:00:00Z)
  |> filter(fn: (r) => r._measurement == "{MEASUREMENT}" and r.stock_code == "{stock_code}")"""
  tables = query_api.query(query)

  for table in tables:
    for record in table.records:
        values = record.values

        print(f"Measurement: {record.get_measurement()}, Time: {record.get_time()}, Stock: {values['stock_code']}, Name: {record.get_field()}, Value: {record.get_value()}")



print로 실행된 출력문

# Measurement: stock_tickers, Time: 2024-04-01 00:00:00+00:00, Stock: B, Name: close_price, Value: 999
# Measurement: stock_tickers, Time: 2024-04-02 00:00:00+00:00, Stock: B, Name: close_price, Value: 201
# Measurement: stock_tickers, Time: 2024-04-03 00:00:00+00:00, Stock: B, Name: close_price, Value: 202
# Measurement: stock_tickers, Time: 2024-04-01 00:00:00+00:00, Stock: B, Name: volume, Value: 999999
# Measurement: stock_tickers, Time: 2024-04-02 00:00:00+00:00, Stock: B, Name: volume, Value: 2001
# Measurement: stock_tickers, Time: 2024-04-03 00:00:00+00:00, Stock: B, Name: volume, Value: 2002




5. 조회 + Pandas Dataframe

위에서 query_api 를 통해 받아온 결과는 column 기준으로 조회되어 불편한점이 있다.

그리고 python의 pandas 라이브러리를 이용하면 Dataframe으로 데이터를 쉽게 조작 가능한 장점이 있는데,

InfluxDB의 쿼리를 바로 dataframe으로 받아올 수 있다.


참고로 pandas 라이브러리는 pip로 별도로 설치해주어야 한다.

pip install pandas



위의 get_tickers_by_stock() 함수와 거의 비슷하지만 query_data_frame 함수로 쿼리 결과를 받아오며,

쿼리 내용에서 pivot 부분이 추가가 되었다.

# python
def get_all_tickers(client: InfluxDBClient):
  query_api = client.query_api()

  query = f"""from(bucket: "{BUCKET}")
  |> range(start: 2024-04-01T00:00:00Z, stop: 2024-04-30T00:00:00Z)
  |> filter(fn: (r) => r._measurement == "{MEASUREMENT}")
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
  """
  df = query_api.query_data_frame(query)

  print(df)



Dataframe으로 받아온 출력이며 익숙한 row단위로 조회가 가능해졌다.

#     result  table                    _start                     _stop                     _time   _measurement stock_code  close_price  volume
# 0  _result      0 2024-04-01 00:00:00+00:00 2024-04-30 00:00:00+00:00 2024-04-01 00:00:00+00:00  stock_tickers          A          100    1000
# 1  _result      0 2024-04-01 00:00:00+00:00 2024-04-30 00:00:00+00:00 2024-04-02 00:00:00+00:00  stock_tickers          A          101    1001
# 2  _result      0 2024-04-01 00:00:00+00:00 2024-04-30 00:00:00+00:00 2024-04-03 00:00:00+00:00  stock_tickers          A          102    1002
# 3  _result      0 2024-04-01 00:00:00+00:00 2024-04-30 00:00:00+00:00 2024-04-04 00:00:00+00:00  stock_tickers          A          103    1003
# 4  _result      0 2024-04-01 00:00:00+00:00 2024-04-30 00:00:00+00:00 2024-04-05 00:00:00+00:00  stock_tickers          A          104    1004
# 5  _result      1 2024-04-01 00:00:00+00:00 2024-04-30 00:00:00+00:00 2024-04-01 00:00:00+00:00  stock_tickers          B          999  999999
# 6  _result      1 2024-04-01 00:00:00+00:00 2024-04-30 00:00:00+00:00 2024-04-02 00:00:00+00:00  stock_tickers          B          201    2001
# 7  _result      1 2024-04-01 00:00:00+00:00 2024-04-30 00:00:00+00:00 2024-04-03 00:00:00+00:00  stock_tickers          B          202    2002



6. 삭제

InfluxDB에서 삭제는 권장되지 않지만 기능은 지원된다.

정말 불가피한 경우에만 주의해서 사용하는것으로 하자.

measurement를 선택하고 시간 범위를 입력해 삭제를 할 수 있다.

def delete_tickers(client: InfluxDBClient, start: str, stop: str):
  delete_api = client.delete_api()
  delete_api.delete(
    start=start,
    stop=stop,
    bucket='app',
    predicate=f'_measurement="{MEASUREMENT}"',
  )





#시계열 DB#Influx DB#데이터 다루기 실전