[AITech][Final Project][P stage] 20220527 - 최종 프로젝트 10일차
최종 프로젝트 10일차
Face feature와 Cloth feature를 함께 사용하여 성능이 개선되는 것을 확인하였다!
아래는 팀 노션에 공유된 내용이다.
핵심적인 코드 흐름을 따라가면서 설명하겠습니다. 참고 깃헙은 아래 두 군데입니다.
우선 사용하는 파일을 아래와 같습니다.
person_clustering.ipynb
: 영상 input부터 cluster 후 이미지 저장까지 수행하는 main 코드face_classifier.py
: face detection/encoding, cloth 좌표 계산/encoding, 이미지 저장 등 대부분의 코드가 작성되어 있는 파일calc.py
: cloth feature 추출용 모델 가져오기, feature extraction, pca, cluster 등 실질적인 연산을 위한 함수들이 정의되어 있는 파일icio.py
: pickle/timestamp/image 등을 읽고 쓰는 io 연산을 위한 함수들이 정의되어 있는 파일postproc.py
: 시각화, clustering 결과 저장 등 util 함수들이 정의되어 있는 파일
person_clustering.ipynb
while running:
ret, frame = video.read()
# ...
# 얼굴 디텍션 -> 얼굴 인코딩(frame, face_boxes) -> 옷 디텍션 -> 옷 인코딩(read_frames 코드 + _frame_worker 코드 + get_model 코드 + fingerprint 코드) -> 이미지 저장
# 반환: 상체 이미지(파일명): [얼굴 인코딩 | 옷 인코딩] (fingerprint)
frame_fingerprints = fc.detect_faces(frame)
if frame_fingerprints:
fingerprints.update(frame_fingerprints)
print('Face images: ', len(fingerprints))
cnt += 1
print('frame_cnt: ', cnt)
print()
if cnt >= capture_cnt:
break
person_clustering.ipynb의 코드는 상당히 간단합니다.
face_classifier.py의 FaceClassifier 클래스의 인스턴스를 생성하고, fc.detect_faces 메서드를 호출합니다. 인자로는 frame을 넘깁니다.
face_classifier.py
- detect_faces
def detect_faces(self, frame):
# face locationss
face_boxes = self.locate_faces(frame) # box: (top, right, bottom, left)
# 사람이 2명 ~ 4명 사이일 때만 수행
if len(face_boxes) <= 1 or len(face_boxes) >= 5:
return None
# ...
face_encodings = face_recognition.face_encodings(frame, face_boxes, model='large')
locate_faces 메서드에서 프레임에 존재하는 얼굴들의 좌표값들을 추출하고, 이를 이용해 2명~4명 사이일 때만 이미지 추출을 진행합니다.
face_recognition.face_encodings에 frame과 face boxes를 전달하여 각 face landmarks들을 128차원 벡터로 변환합니다.
이어서 보겠습니다.
# model for cloth encoding
cloth_encoding_model = calc.get_model() # resnet
calc.get_model 함수를 호출하여 cloth feature 추출용 모델을 불러옵니다. 해당 함수는 아래와 같습니다.
calc.py
def get_model():
# ...
base_model = ResNet50(weights='imagenet', include_top=True)
model = keras.Sequential(
[
Model(inputs=base_model.input, outputs=base_model.get_layer('conv3_block4_out').output), # (None, 28, 28, 512)
keras.layers.GlobalAveragePooling2D() # (None, 512)
]
)
return model
현재 image clustering 분야에서 SOTA 모델들은 대부분 backbone으로 resnet18을 사용합니다.
현재는 코드가 tensorflow로 작성되어 있고 keras에서는 resnet18 모델을 제공하지 않아 resnet50 모델의 중간 feature map을 뽑아 와서 GAP로 512 크기의 feature vector를 얻도록 했습니다.
추후 해당 코드를 pytorch로 리팩토링 하면 이는 resnet18로 변경될 수 있습니다.
다시 face_classifier.py로 돌아옵니다.
face_classifier.py
- detect_faces
fingerprints = dict()
for i, face_box in enumerate(face_boxes):
# 1. crop face image
upper_body_image, cloth_image = self.get_face_and_cloth_image(frame, face_box)
# 2. cloth preprocessing
preprocessed_cloth_image = self.preprocess(cloth_image, (224, 224))
# 3. cloth_encodings
cloth_encoding = calc.fingerprint(preprocessed_cloth_image, cloth_encoding_model)
# 4. normalize
normalized_face_encoding = face_encodings[i] / np.linalg.norm(face_encodings[i])
normalized_cloth_encoding = cloth_encoding / np.linalg.norm(cloth_encoding)
# 5. concat features [face | cloth]
encoding = np.concatenate((normalized_face_encoding, normalized_cloth_encoding), axis=0) # 128-d + 512-d
# 6. filename
filename = str_ms + str(i) + ".png"
# 7. save image
filepath = os.path.join(self.save_dir, filename)
cv2.imwrite(filepath, upper_body_image)
print('image saved path: ', filepath)
# 8. save fingerprint
fingerprints[filepath] = encoding
return fingerprints
fingerprints 딕셔너리 변수는 key 값으로 filename, value 값으로 640(128+512)-d feature vector를 갖습니다. 각 이미지 파일에 대한 face+cloth feature vector입니다.
for문 내의 코드 흐름은 직관적입니다.
- get_face_and_cloth_image 메서드를 호출하여 앞서 얻은 face 좌표를 이용해 upper body 좌표와 cloth 좌표를 계산해 반환받습니다. (upper body 이미지는 서비스 시 사용자에게 보여주기 위한 이미지 후보들에 해당합니다)
def get_face_and_cloth_image(self, frame, box):
'''
param:
frame: 프레임 이미지
box: 좌표값 (top, right, bottom, left)
return:
padded_face: 얼굴 이미지 (numpy array)
padded_cloth: 옷 이미지 (numpy array)
'''
img_height, img_width = frame.shape[:2]
(box_top, box_right, box_bottom, box_left) = box # 딱 얼굴 이미지
box_width = box_right - box_left
box_height = box_bottom - box_top
# padding
crop_top = max(box_top - box_height, 0)
pad_top = -min(box_top - box_height, 0)
crop_bottom = min(box_bottom + box_height, img_height - 1)
pad_bottom = max(box_bottom + box_height - img_height, 0)
crop_left = max(box_left - box_width, 0)
pad_left = -min(box_left - box_width, 0)
crop_right = min(box_right + box_width, img_width - 1)
pad_right = max(box_right + box_width - img_width, 0)
# cropping
face_image = frame[crop_top:crop_bottom, crop_left:crop_right]
cloth_image = frame[box_bottom+int(box_height*0.2):crop_bottom, crop_left:crop_right]
# return
if (pad_top == 0 and pad_bottom == 0):
if (pad_left == 0 and pad_right == 0):
return face_image, cloth_image
padded_face = cv2.copyMakeBorder(face_image, pad_top, pad_bottom,
pad_left, pad_right, cv2.BORDER_CONSTANT)
padded_cloth = cv2.copyMakeBorder(cloth_image, pad_top, pad_bottom,
pad_left, pad_right, cv2.BORDER_CONSTANT)
return padded_face, padded_cloth
- preprocess 메서드를 호출하여 cloth_image에 색 변환, 리사이즈 등을 적용합니다.
def preprocess(self, image, size):
try:
img = Image.fromarray(image).convert('RGB').resize(size, resample=3)
arr = tf_image.img_to_array(img, dtype=int)
return arr
except OSError as ex:
print(f"skipping file...: {ex}")
return None
- calc.fingerprint 함수를 호출하여 cloth feature를 반환받습니다.
def fingerprint(image, model):
# (224, 224, 1) -> (224, 224, 3)
#
# Simple hack to convert a grayscale image to fake RGB by replication of
# the image data to all 3 channels.
#
# Deep learning models may have learned color-specific filters, but the
# assumption is that structural image features (edges etc) contibute more to
# the image representation than color, such that this hack makes it possible
# to process gray-scale images with nets trained on color images (like
# VGG16).
#
# We assme channels_last here. Fix if needed.
if image.shape[2] == 1:
image = image.repeat(3, axis=2)
# (1, 224, 224, 3)
arr4d = np.expand_dims(image, axis=0)
# (1, 224, 224, 3)
arr4d_pp = preprocess_input(arr4d)
ret = model.predict(arr4d_pp)[0,:]
return ret
- Face feature와 cloth feature의 scale을 맞춰주기 위해 두 feature vector를 각각 normalize 해줍니다.
- normalize까지 마친 face feature와 cloth feature vector를 concat합니다.
- 이미지 파일명을 생성합니다.
- upper body 이미지를 저장합니다.
- 저장된 이미지의 fingerprint(encoding)를 저장합니다.
그리고 해당 frame에 대한 인물들의 fingerprint를 반환합니다.
person_clustering.ipynb
60 프레임(본인이 임의로 지정)의 인물들이 추출되고 나면 while문을 탈출합니다. fingerprints 변수에는 추출된 인물들에 대한 feature vector 값이 저장되어 있습니다.
while running:
# ...
# 얼굴 디텍션 -> 얼굴 인코딩(frame, face_boxes) -> 옷 디텍션 -> 옷 인코딩(read_frames 코드 + _frame_worker 코드 + get_model 코드 + fingerprint 코드) -> 이미지 저장
# 반환: 상체 이미지(파일명): [얼굴 인코딩 | 옷 인코딩] (fingerprint)
frame_fingerprints = fc.detect_faces(frame)
if frame_fingerprints:
fingerprints.update(frame_fingerprints)
print('Face images: ', len(fingerprints))
cnt += 1
print('frame_cnt: ', cnt)
print()
if cnt >= capture_cnt:
break
이후에는 추출된 fingerprints 를 이용하여 clustering을 수행합니다.
clusters = calc.cluster(fingerprints, sim=0.55, min_csize=3) # 높일수록 엄격하게
calc.py의 cluster 함수는 아래와 같이 작성되어 있습니다.
calc.py
def cluster(fingerprints, sim=0.5, timestamps=None, alpha=0.3, method='average',
metric='euclidean', extra_out=False, print_stats=True, min_csize=2):
assert 0 <= sim <= 1, "sim not 0..1"
assert 0 <= alpha <= 1, "alpha not 0..1"
assert min_csize >= 1, "min_csize must be >= 1"
files = list(fingerprints.keys())
# array(list(...)): 2d array
# [[... fingerprint of image1 (4096,) ...],
# [... fingerprint of image2 (4096,) ...],
# ...
# ]
dfps = distance.pdist(np.array(list(fingerprints.values())), metric)
if timestamps is not None:
# Sanity error check as long as we don't have a single data struct to
# keep fingerprints and timestamps, as well as image data. This is not
# pretty, but at least a safety hook.
set_files = set(files)
set_tsfiles = set(timestamps.keys())
set_diff = set_files.symmetric_difference(set_tsfiles)
assert len(set_diff) == 0, (f"files in fingerprints and timestamps do "
f"not match: diff={set_diff}")
# use 'files' to make sure we have the same order as in 'fingerprints'
tsarr = np.array([timestamps[k] for k in files])[:,None]
dts = distance.pdist(tsarr, metric)
dts = dts / dts.max()
dfps = dfps / dfps.max()
dfps = dfps * (1 - alpha) + dts * alpha
# hierarchical/agglomerative clustering (Z = linkage matrix, construct
# dendrogram), plot: scipy.cluster.hierarchy.dendrogram(Z)
Z = hierarchy.linkage(dfps, method=method, metric=metric)
# cut dendrogram, extract clusters
# cut=[12, 3, 29, 14, 28, 27,...]: image i belongs to cluster cut[i]
cut = hierarchy.fcluster(Z, t=dfps.max()*(1.0-sim), criterion='distance')
cluster_dct = dict((iclus, []) for iclus in np.unique(cut))
for iimg,iclus in enumerate(cut):
cluster_dct[iclus].append(files[iimg])
# group all clusters (cluster = list_of_files) of equal size together
# {number_of_files1: [[list_of_files], [list_of_files],...],
# number_of_files2: [[list_of_files],...],
# }
clusters = {}
for cluster in cluster_dct.values():
csize = len(cluster)
if csize >= min_csize:
if not (csize in clusters.keys()):
clusters[csize] = [cluster]
else:
clusters[csize].append(cluster)
if print_stats:
print_cluster_stats(clusters)
if extra_out:
extra = {'Z': Z, 'dfps': dfps, 'cluster_dct': cluster_dct, 'cut': cut}
return clusters, extra
else:
return clusters
복잡해 보이는데, 결국에는 clustering에서 많이 사용되는 HAC(Hierarchical Agglomerative Clustering)를 수행한다는 것만 알면 됩니다.
scipy.cluster.hierarchy에 있는 fcluster 함수를 사용하면 HAC를 수행할 수 있습니다.
다시 main 코드로 돌아오겠습니다.
person_clustering.ipynb
cluster까지 마치고 나면 생성된 cluster 대로 이미지들을 폴더 별로 저장하고, 전체 결과를 보여주는 한 장의 이미지를 저장 및 출력합니다.
postproc.make_links(clusters, os.path.join(result_dir, 'imagecluster/clusters'))
images = icio.read_images(result_dir, size=(224,224))
fig, ax = postproc.plot_clusters(clusters, images)
fig.savefig(os.path.join(result_dir, 'imagecluster/_cluster.png'))
postproc.plt.show()
예시 결과 이미지는 아래와 같습니다.
해당 코드는 현재 Tensorflow로 작성되어 있는 상태라, 기존 코드들과의 통일성을 위해 PyTorch로 변경할 예정이다.
또한 PyTorch에서는 pretrained resnet18 모델을 제공하고 있어 이를 사용하면 성능 향상과 속도 개선을 기대할 수도 있다.
다만, 리팩토링을 잘 수행하여 동일한 결과가 재현될 수 있도록 하는 것이 가장 중요하다.
결론
- Tensorflow로 작성되어 있는 코드들을 PyTorch로 리팩토링한다.
- 결과가 재현될 수 있도록 잘 리팩토링한다.
Leave a comment