ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Avro] Avro Schema 알아보기
    BigData 2024. 3. 6. 22:33
    728x90
    반응형

     

    - 목차

     

    들어가며.

    이번 글에서는 Avro Schema 에 대한 다양한 예시들을 살펴볼 예정입니다.

    Python 과 "avro-python3==1.10.2" 모듈을 사용하여 다양한 데이터셋을 다루어보도록 하겠습니다.

    전반적인 데이터들은 Kaggle 에서 제공되는 데이터를 활용하도록 하겠습니다.

     

    Enum.

    제한된 갯수를 가지는 Categorical Data 를 대상으로 Enum 타입을 적용할 수 있습니다.

    예를 들어, 날씨 정보가 ['drizzle' 'rain' 'sun' 'snow' 'fog'] 와 같은 5가지 경우를 가진다고 한다면 Enum 타입을 을 사용할 수 있습니다.

    아래 Kaggle 의 seattle_weather 데이터셋을 통해서 자세한 내용을 살펴보도록 하겠습니다.

     

    https://www.kaggle.com/datasets/ananthr1/weather-prediction?select=seattle-weather.csv

     

    WEATHER PREDICTION

    Predicting it will Rain or not using some Weather Conditons..

    www.kaggle.com

     

    대략적인 데이터의 형식은 아래와 같구요.

                date  precipitation  temp_max  temp_min  wind  weather
    0     2012-01-01            0.0      12.8       5.0   4.7  drizzle
    1     2012-01-02           10.9      10.6       2.8   4.5     rain
    2     2012-01-03            0.8      11.7       7.2   2.3     rain
    3     2012-01-04           20.3      12.2       5.6   4.7     rain
    4     2012-01-05            1.3       8.9       2.8   6.1     rain
    ...          ...            ...       ...       ...   ...      ...
    1456  2015-12-27            8.6       4.4       1.7   2.9     rain
    1457  2015-12-28            1.5       5.0       1.7   1.3     rain
    1458  2015-12-29            0.0       7.2       0.6   2.6      fog
    1459  2015-12-30            0.0       5.6      -1.0   3.4      sun
    1460  2015-12-31            0.0       5.6      -2.1   3.5      sun
    
    [1461 rows x 6 columns]

     

    seattle_weather.csv 를 Avro 파일로 표현하기 위한 Schema 는 아래와 같습니다.

    < schema.avsc >

    {
      "name": "seattle_weather",
      "namespace": "seattle.weather",
      "type": "record",
      "fields": [
        {"name": "date", "type": "string"},
        {"name": "precipitation", "type": "float"},
        {"name": "temp_max", "type": "float"},
        {"name": "temp_min", "type": "float"},
        {"name": "wind", "type": "float"},
        {"name": "weather", "type": {
            "type": "enum",
            "name": "weather",
            "symbols": ["drizzle", "rain", "sun", "snow", "fog"]
          }
        }
      ]
    }

     

    avro-python3 와 Pandas 모듈을 통해서 아래와 같이 Avro 파일을 생성할 수 있습니다.

    import avro
    import pandas as pd
    from avro.datafile import DataFileWriter
    from avro.io import DatumWriter
    
    schema = avro.schema.parse(open("schema.avsc", "rb").read())
    writer = DataFileWriter(open("seattle_weather.avro", "wb"), DatumWriter(), schema)
    file_path = "./seattle-weather.csv"
    df = pd.read_csv(file_path)
    for index, row in df.iterrows():
        row_dict = row.to_dict()
        writer.append(row_dict)
    writer.close()

     

     

    Nullable Type ( Optional ).

    세상의 많은 데이터들은 Null 상태인 경우가 많습니다.

    이러한 상태는 Nullable, Empty, Falsy, Sparse 등의 여러 단어로 표현되는데요.

    Avro 에서는 이러한 Nullable 한 상태의 데이터를 Optional 한 방식으로 처리할 수 있습니다.

    Avro Schema 에서는 이러한 상태의 Type 을 Optional 방식으로 표현하며,

    문법은 "type" : ["null":, "string"] 와 같은 형식으로 사용됩니다.

     

    Nullable 상태의 데이터셋을 사용하기 위해서 아래의 Kaggle 데이터셋을 사용합니다.

    서비스 고객 만족도에 대한 데이터로써 서비스-고객 간의 상호작용이 이뤄지지 않은 경우에 Nullable 한 값이 많습니다.

     

    https://www.kaggle.com/datasets/ddosad/ecommerce-customer-service-satisfaction

     

    eCommerce Customer Service Satisfaction

    Real World Data for EDA, Classification

    www.kaggle.com

    schema.avsc

    스키마는 아래와 같이 표현됩니다.

    string, int, float 타입의 칼럼들이 존재하구요.

    Nullable 한 칼럼은 Union 타입으로 지정할 수 있습니다.

    {
      "name": "service_customer_satisfaction",
      "namespace": "service.customer.satisfaction",
      "type" : "record",
      "fields" : [
        {"name": "Unique id", "type" : "string"},
        {"name": "channel_name", "type" : "string"},
        {"name": "category", "type" : "string"},
        {"name": "Sub-category", "type" : "string"},
        {"name": "Customer Remarks", "type" : ["string", "null"], "default":  null},
        {"name": "Order_id", "type" : ["string", "null"], "default":  null},
        {"name": "order_date_time", "type" : ["string", "null"], "default":  null},
        {"name": "Issue_reported at", "type" : "string"},
        {"name": "issue_responded", "type" : "string"},
        {"name": "Survey_response_Date", "type" : "string"},
        {"name": "Customer_City", "type" : ["string", "null"], "default":  null},
        {"name": "Product_category", "type" : ["string", "null"], "default":  null},
        {"name": "Item_price", "type" : ["float", "null"], "default":  null},
        {"name": "connected_handling_time", "type" : ["float", "null"], "default":  null},
        {"name": "Agent_name", "type" : "string"},
        {"name": "Supervisor", "type" : "string"},
        {"name": "Manager", "type" : "string"},
        {"name": "Tenure Bucket", "type" : "string"},
        {"name": "Agent Shift", "type" : "string"},
        {"name": "CSAT Score", "type" : "int"}
      ]
    }

     

    String 타입의 Nullable 칼럼은 아래와 같이 설정합니다.

    {"name": "Customer Remarks", "type" : ["string", "null"], "default":  null}

     

    그 외 int, float 타입의 Nullable 칼럼도 이와 같은 방식으로써 지정할 수 있습니다.

    {"name": "connected_handling_time", "type" : ["float", "null"], "default":  null},

     

    avro_writer.py

    Kaggle 의 csv 파일을 Avro 파일로 변형하는 코드 예시는 아래와 같습니다.

    저는 csv 파일을 손쉽게 Python Object 로 변환하기 위해서 Pandas 를 활용하였구요.

    이 과정에서 빈값에 해당하는 칼럼이 nan 으로 자동 변경되었습니다.

    따라서 DataFrame 의 replace 함수를 통해 nan -> None 으로 치환하는 코드가 추가됩니다.

    import pandas as pd
    import numpy as np
    import avro
    from avro.datafile import DataFileWriter
    from avro.io import DatumWriter
    
    file_path = "./Customer_support_data.csv"
    df = pd.read_csv(file_path)
    print(df.isna().sum())
    
    schema = avro.schema.parse(open("schema.avsc", "rb").read())
    writer = DataFileWriter(open("service_customer_satisfaction.avro", "wb"), DatumWriter(), schema)
    for index, row in df.iterrows():
        row.replace({np.nan: None}, inplace=True)
        row_dict = row.to_dict()
        writer.append(row_dict)
    writer.close()

     

     

     

    반응형

    'BigData' 카테고리의 다른 글

    Docker 로 Minio Storage 구현하기  (0) 2024.01.12
    Checksum 알아보기 (Data Integrity, 데이터 무결성, 체크섬)  (0) 2023.12.23
    ASCII 코드 알아보기  (2) 2023.12.22
    Trino 도커로 따라하기  (0) 2023.12.03
    Thrift 알아보기  (0) 2023.11.04
Designed by Tistory.