| | import threading |
| | import os |
| | import contextlib |
| | import torch |
| | import torch.nn as nn |
| | from PIL import Image, ImageDraw, ImageFont, ExifTags |
| | from PIL import __version__ as pil_version |
| | from multiprocessing.pool import ThreadPool |
| | import numpy as np |
| | from itertools import repeat |
| | import glob |
| | import cv2 |
| | import tempfile |
| | import hashlib |
| | from pathlib import Path |
| | import time |
| | import torchvision |
| | import math |
| | import re |
| | from typing import List, Union, Dict |
| | import pkg_resources as pkg |
| | from types import SimpleNamespace |
| | from torch.utils.data import Dataset, DataLoader |
| | from tqdm import tqdm |
| | import random |
| | import yaml |
| | import logging.config |
| | import sys |
| | import pathlib |
| | CURRENT_DIR = pathlib.Path(__file__).parent |
| | sys.path.append(str(CURRENT_DIR)) |
| |
|
| | LOGGING_NAME = 'ultralytics' |
| | LOGGER = logging.getLogger(LOGGING_NAME) |
| | for fn in LOGGER.info, LOGGER.warning: |
| | setattr(LOGGER, fn.__name__, lambda x: fn(x)) |
| | IMG_FORMATS = "bmp", "dng", "jpeg", "jpg", "mpo", "png", "tif", "tiff", "webp", "pfm" |
| | VID_FORMATS = "asf", "avi", "gif", "m4v", "mkv", "mov", "mp4", "mpeg", "mpg", "ts", "wmv" |
| | TQDM_BAR_FORMAT = '{l_bar}{bar:10}{r_bar}' |
| | NUM_THREADS = min(8, os.cpu_count()) |
| | PIN_MEMORY = str(os.getenv("PIN_MEMORY", True)).lower() == "true" |
| | _formats = ["xyxy", "xywh", "ltwh"] |
| | CFG_FLOAT_KEYS = {'warmup_epochs', 'box', 'cls', 'dfl', 'degrees', 'shear'} |
| | CFG_FRACTION_KEYS = { |
| | 'dropout', 'iou', 'lr0', 'lrf', 'momentum', 'weight_decay', 'warmup_momentum', 'warmup_bias_lr', 'fl_gamma', |
| | 'label_smoothing', 'hsv_h', 'hsv_s', 'hsv_v', 'translate', 'scale', 'perspective', 'flipud', 'fliplr', 'mosaic', |
| | 'mixup', 'copy_paste', 'conf', 'iou'} |
| | CFG_INT_KEYS = { |
| | 'epochs', 'patience', 'batch', 'workers', 'seed', 'close_mosaic', 'mask_ratio', 'max_det', 'vid_stride', |
| | 'line_thickness', 'workspace', 'nbs'} |
| | CFG_BOOL_KEYS = { |
| | 'save', 'exist_ok', 'pretrained', 'verbose', 'deterministic', 'single_cls', 'image_weights', 'rect', 'cos_lr', |
| | 'overlap_mask', 'val', 'save_json', 'save_hybrid', 'half', 'dnn', 'plots', 'show', 'save_txt', 'save_conf', |
| | 'save_crop', 'hide_labels', 'hide_conf', 'visualize', 'augment', 'agnostic_nms', 'retina_masks', 'boxes', 'keras', |
| | 'optimize', 'int8', 'dynamic', 'simplify', 'nms', 'v5loader'} |
| | |
| | for orientation in ExifTags.TAGS.keys(): |
| | if ExifTags.TAGS[orientation] == 'Orientation': |
| | break |
| |
|
| | def segments2boxes(segments): |
| | """ |
| | It converts segment labels to box labels, i.e. (cls, xy1, xy2, ...) to (cls, xywh) |
| | |
| | Args: |
| | segments (list): list of segments, each segment is a list of points, each point is a list of x, y coordinates |
| | |
| | Returns: |
| | (np.ndarray): the xywh coordinates of the bounding boxes. |
| | """ |
| | boxes = [] |
| | for s in segments: |
| | x, y = s.T |
| | boxes.append([x.min(), y.min(), x.max(), y.max()]) |
| | return xyxy2xywh(np.array(boxes)) |
| |
|
| |
|
| | def check_version( |
| | current: str = "0.0.0", |
| | minimum: str = "0.0.0", |
| | name: str = "version ", |
| | pinned: bool = False, |
| | hard: bool = False, |
| | verbose: bool = False, |
| | ) -> bool: |
| | """ |
| | Check current version against the required minimum version. |
| | |
| | Args: |
| | current (str): Current version. |
| | minimum (str): Required minimum version. |
| | name (str): Name to be used in warning message. |
| | pinned (bool): If True, versions must match exactly. If False, minimum version must be satisfied. |
| | hard (bool): If True, raise an AssertionError if the minimum version is not met. |
| | verbose (bool): If True, print warning message if minimum version is not met. |
| | |
| | Returns: |
| | bool: True if minimum version is met, False otherwise. |
| | """ |
| | current, minimum = (pkg.parse_version(x) for x in (current, minimum)) |
| | result = (current == minimum) if pinned else (current >= minimum) |
| | warning_message = f"WARNING ⚠️ {name}{minimum} is required by YOLOv8, but {name}{current} is currently installed" |
| | if verbose and not result: |
| | LOGGER.warning(warning_message) |
| | return result |
| |
|
| |
|
| | TORCH_1_9 = check_version(torch.__version__, '1.9.0') |
| |
|
| |
|
| | def smart_inference_mode(): |
| | |
| | def decorate(fn): |
| | return (torch.inference_mode if TORCH_1_9 else torch.no_grad)()(fn) |
| |
|
| | return decorate |
| |
|
| |
|
| | def box_iou(box1, box2, eps=1e-7): |
| | |
| | """ |
| | Return intersection-over-union (Jaccard index) of boxes. |
| | Both sets of boxes are expected to be in (x1, y1, x2, y2) format. |
| | Arguments: |
| | box1 (Tensor[N, 4]) |
| | box2 (Tensor[M, 4]) |
| | Returns: |
| | iou (Tensor[N, M]): the NxM matrix containing the pairwise |
| | IoU values for every element in boxes1 and boxes2 |
| | """ |
| |
|
| | |
| | (a1, a2), (b1, b2) = box1.unsqueeze(1).chunk(2, 2), box2.unsqueeze(0).chunk(2, 2) |
| | inter = (torch.min(a2, b2) - torch.max(a1, b1)).clamp(0).prod(2) |
| |
|
| | |
| | return inter / ((a2 - a1).prod(2) + (b2 - b1).prod(2) - inter + eps) |
| |
|
| |
|
| | class LoadImages: |
| | |
| | def __init__( |
| | self, path, imgsz=640, stride=32, auto=True, transforms=None, vid_stride=1 |
| | ): |
| | |
| | if isinstance(path, str) and Path(path).suffix == ".txt": |
| | path = Path(path).read_text().rsplit() |
| | files = [] |
| | for p in sorted(path) if isinstance(path, (list, tuple)) else [path]: |
| | p = str(Path(p).resolve()) |
| | if "*" in p: |
| | files.extend(sorted(glob.glob(p, recursive=True))) |
| | elif os.path.isdir(p): |
| | files.extend(sorted(glob.glob(os.path.join(p, "*.*")))) |
| | elif os.path.isfile(p): |
| | files.append(p) |
| | else: |
| | raise FileNotFoundError(f"{p} does not exist") |
| | |
| | images = [x for x in files if x.split(".")[-1].lower() in IMG_FORMATS] |
| | videos = [x for x in files if x.split(".")[-1].lower() in VID_FORMATS] |
| | ni, nv = len(images), len(videos) |
| |
|
| | self.imgsz = imgsz |
| | self.stride = stride |
| | self.files = images + videos |
| | self.nf = ni + nv |
| | self.video_flag = [False] * ni + [True] * nv |
| | self.mode = "image" |
| | self.auto = auto |
| | self.transforms = transforms |
| | self.vid_stride = vid_stride |
| | self.bs = 1 |
| | if any(videos): |
| | self.orientation = None |
| | self._new_video(videos[0]) |
| | else: |
| | self.cap = None |
| | if self.nf == 0: |
| | raise FileNotFoundError( |
| | f"No images or videos found in {p}. " |
| | f"Supported formats are:\nimages: {IMG_FORMATS}\nvideos: {VID_FORMATS}" |
| | ) |
| |
|
| | def __iter__(self): |
| | self.count = 0 |
| | return self |
| |
|
| | def __next__(self): |
| | if self.count == self.nf: |
| | raise StopIteration |
| | path = self.files[self.count] |
| |
|
| | if self.video_flag[self.count]: |
| | |
| | self.mode = "video" |
| | for _ in range(self.vid_stride): |
| | self.cap.grab() |
| | success, im0 = self.cap.retrieve() |
| | while not success: |
| | self.count += 1 |
| | self.cap.release() |
| | if self.count == self.nf: |
| | raise StopIteration |
| | path = self.files[self.count] |
| | self._new_video(path) |
| | success, im0 = self.cap.read() |
| |
|
| | self.frame += 1 |
| | s = f"video {self.count + 1}/{self.nf} ({self.frame}/{self.frames}) {path}: " |
| |
|
| | else: |
| | |
| | self.count += 1 |
| | im0 = cv2.imread(path) |
| | if im0 is None: |
| | raise FileNotFoundError(f"Image Not Found {path}") |
| | s = f"image {self.count}/{self.nf} {path}: " |
| |
|
| | if self.transforms: |
| | im = self.transforms(im0) |
| | else: |
| | im = LetterBox(self.imgsz, self.auto, stride=self.stride)(image=im0) |
| | im = im.transpose((2, 0, 1))[::-1] |
| | im = np.ascontiguousarray(im) |
| |
|
| | return path, im, im0, self.cap, s |
| |
|
| | def _new_video(self, path): |
| | |
| | self.frame = 0 |
| | self.cap = cv2.VideoCapture(path) |
| | self.frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT) / self.vid_stride) |
| | if hasattr(cv2, "CAP_PROP_ORIENTATION_META"): |
| | self.orientation = int( |
| | self.cap.get(cv2.CAP_PROP_ORIENTATION_META) |
| | ) |
| | |
| | |
| |
|
| | def _cv2_rotate(self, im): |
| | |
| | if self.orientation == 0: |
| | return cv2.rotate(im, cv2.ROTATE_90_CLOCKWISE) |
| | elif self.orientation == 180: |
| | return cv2.rotate(im, cv2.ROTATE_90_COUNTERCLOCKWISE) |
| | elif self.orientation == 90: |
| | return cv2.rotate(im, cv2.ROTATE_180) |
| | return im |
| |
|
| | def __len__(self): |
| | return self.nf |
| |
|
| |
|
| | class LetterBox: |
| | """Resize image and padding for detection, instance segmentation, pose""" |
| |
|
| | def __init__( |
| | self, new_shape=(640, 640), auto=False, scaleFill=False, scaleup=True, stride=32 |
| | ): |
| | self.new_shape = new_shape |
| | self.auto = auto |
| | self.scaleFill = scaleFill |
| | self.scaleup = scaleup |
| | self.stride = stride |
| |
|
| | def __call__(self, labels=None, image=None): |
| | if labels is None: |
| | labels = {} |
| | img = labels.get("img") if image is None else image |
| | shape = img.shape[:2] |
| | new_shape = labels.pop("rect_shape", self.new_shape) |
| | if isinstance(new_shape, int): |
| | new_shape = (new_shape, new_shape) |
| |
|
| | |
| | r = min(new_shape[0] / shape[0], new_shape[1] / shape[1]) |
| | |
| | if not self.scaleup: |
| | r = min(r, 1.0) |
| |
|
| | |
| | ratio = r, r |
| | new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r)) |
| | dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] |
| | if self.auto: |
| | dw, dh = np.mod(dw, self.stride), np.mod(dh, self.stride) |
| | elif self.scaleFill: |
| | dw, dh = 0.0, 0.0 |
| | new_unpad = (new_shape[1], new_shape[0]) |
| | ratio = ( |
| | new_shape[1] / shape[1], |
| | new_shape[0] / shape[0], |
| | ) |
| |
|
| | dw /= 2 |
| | dh /= 2 |
| | if labels.get("ratio_pad"): |
| | labels["ratio_pad"] = (labels["ratio_pad"], (dw, dh)) |
| |
|
| | if shape[::-1] != new_unpad: |
| | img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR) |
| | top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1)) |
| | left, right = int(round(dw - 0.1)), int(round(dw + 0.1)) |
| | img = cv2.copyMakeBorder( |
| | img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=(114, 114, 114) |
| | ) |
| |
|
| | if len(labels): |
| | labels = self._update_labels(labels, ratio, dw, dh) |
| | labels["img"] = img |
| | labels["resized_shape"] = new_shape |
| | return labels |
| | else: |
| | return img |
| |
|
| | def _update_labels(self, labels, ratio, padw, padh): |
| | """Update labels""" |
| | labels["instances"].convert_bbox(format="xyxy") |
| | labels["instances"].denormalize(*labels["img"].shape[:2][::-1]) |
| | labels["instances"].scale(*ratio) |
| | labels["instances"].add_padding(padw, padh) |
| | return labels |
| |
|
| |
|
| | class Annotator: |
| | |
| | def __init__( |
| | self, |
| | im, |
| | line_width=None, |
| | font_size=None, |
| | font="Arial.ttf", |
| | pil=False, |
| | example="abc", |
| | ): |
| | assert ( |
| | im.data.contiguous |
| | ), "Image not contiguous. Apply np.ascontiguousarray(im) to Annotator() input images." |
| | |
| | non_ascii = not is_ascii(example) |
| | self.pil = pil or non_ascii |
| | if self.pil: |
| | self.pil_9_2_0_check = check_version( |
| | pil_version, "9.2.0" |
| | ) |
| | self.im = im if isinstance(im, Image.Image) else Image.fromarray(im) |
| | self.draw = ImageDraw.Draw(self.im) |
| | self.font = ImageFont.load_default() |
| | else: |
| | self.im = im |
| | self.lw = line_width or max(round(sum(im.shape) / 2 * 0.003), 2) |
| |
|
| | def box_label( |
| | self, box, label="", color=(128, 128, 128), txt_color=(255, 255, 255) |
| | ): |
| | |
| | if isinstance(box, torch.Tensor): |
| | box = box.tolist() |
| | if self.pil or not is_ascii(label): |
| | self.draw.rectangle(box, width=self.lw, outline=color) |
| | if label: |
| | if self.pil_9_2_0_check: |
| | _, _, w, h = self.font.getbbox(label) |
| | else: |
| | w, h = self.font.getsize( |
| | label |
| | ) |
| | outside = box[1] - h >= 0 |
| | self.draw.rectangle( |
| | ( |
| | box[0], |
| | box[1] - h if outside else box[1], |
| | box[0] + w + 1, |
| | box[1] + 1 if outside else box[1] + h + 1, |
| | ), |
| | fill=color, |
| | ) |
| | |
| | self.draw.text( |
| | (box[0], box[1] - h if outside else box[1]), |
| | label, |
| | fill=txt_color, |
| | font=self.font, |
| | ) |
| | else: |
| | p1, p2 = (int(box[0]), int(box[1])), (int(box[2]), int(box[3])) |
| | cv2.rectangle( |
| | self.im, p1, p2, color, thickness=self.lw, lineType=cv2.LINE_AA |
| | ) |
| | if label: |
| | tf = max(self.lw - 1, 1) |
| | |
| | w, h = cv2.getTextSize(label, 0, fontScale=self.lw / 3, thickness=tf)[0] |
| | outside = p1[1] - h >= 3 |
| | p2 = p1[0] + w, p1[1] - h - 3 if outside else p1[1] + h + 3 |
| | cv2.rectangle(self.im, p1, p2, color, -1, cv2.LINE_AA) |
| | cv2.putText( |
| | self.im, |
| | label, |
| | (p1[0], p1[1] - 2 if outside else p1[1] + h + 2), |
| | 0, |
| | self.lw / 3, |
| | txt_color, |
| | thickness=tf, |
| | lineType=cv2.LINE_AA, |
| | ) |
| |
|
| | def rectangle(self, xy, fill=None, outline=None, width=1): |
| | |
| | self.draw.rectangle(xy, fill, outline, width) |
| |
|
| | def text(self, xy, text, txt_color=(255, 255, 255), anchor="top"): |
| | |
| | if anchor == "bottom": |
| | w, h = self.font.getsize(text) |
| | xy[1] += 1 - h |
| | self.draw.text(xy, text, fill=txt_color, font=self.font) |
| |
|
| | def fromarray(self, im): |
| | |
| | self.im = im if isinstance(im, Image.Image) else Image.fromarray(im) |
| | self.draw = ImageDraw.Draw(self.im) |
| |
|
| | def result(self): |
| | |
| | return np.asarray(self.im) |
| |
|
| |
|
| | def non_max_suppression( |
| | prediction, |
| | conf_thres=0.25, |
| | iou_thres=0.45, |
| | classes=None, |
| | agnostic=False, |
| | multi_label=False, |
| | labels=(), |
| | max_det=300, |
| | nm=0, |
| | ): |
| | |
| | assert ( |
| | 0 <= conf_thres <= 1 |
| | ), f"Invalid Confidence threshold {conf_thres}, valid values are between 0.0 and 1.0" |
| | assert ( |
| | 0 <= iou_thres <= 1 |
| | ), f"Invalid IoU {iou_thres}, valid values are between 0.0 and 1.0" |
| | |
| | if isinstance(prediction, (list, tuple)): |
| | prediction = prediction[0] |
| | device = prediction.device |
| | mps = "mps" in device.type |
| | if mps: |
| | prediction = prediction.cpu() |
| | bs = prediction.shape[0] |
| | nc = prediction.shape[1] - nm - 4 |
| | mi = 4 + nc |
| | xc = prediction[:, 4:mi].amax(1) > conf_thres |
| |
|
| | |
| | |
| | max_wh = 7680 |
| | max_nms = 30000 |
| | time_limit = 0.5 + 0.05 * bs |
| | redundant = True |
| | multi_label &= nc > 1 |
| | merge = False |
| |
|
| | t = time.time() |
| | output = [torch.zeros((0, 6 + nm), device=prediction.device)] * bs |
| | for xi, x in enumerate(prediction): |
| | |
| | |
| | x = x.transpose(0, -1)[xc[xi]] |
| |
|
| | |
| | if labels and len(labels[xi]): |
| | lb = labels[xi] |
| | v = torch.zeros((len(lb), nc + nm + 5), device=x.device) |
| | v[:, :4] = lb[:, 1:5] |
| | v[range(len(lb)), lb[:, 0].long() + 4] = 1.0 |
| | x = torch.cat((x, v), 0) |
| |
|
| | |
| | if not x.shape[0]: |
| | continue |
| |
|
| | |
| | box, cls, mask = x.split((4, nc, nm), 1) |
| | |
| | box = xywh2xyxy(box) |
| | if multi_label: |
| | i, j = (cls > conf_thres).nonzero(as_tuple=False).T |
| | x = torch.cat((box[i], x[i, 4 + j, None], j[:, None].float(), mask[i]), 1) |
| | else: |
| | conf, j = cls.max(1, keepdim=True) |
| | x = torch.cat((box, conf, j.float(), mask), 1)[conf.view(-1) > conf_thres] |
| |
|
| | |
| | if classes is not None: |
| | x = x[(x[:, 5:6] == torch.tensor(classes, device=x.device)).any(1)] |
| |
|
| | |
| | n = x.shape[0] |
| | if not n: |
| | continue |
| | |
| | x = x[x[:, 4].argsort(descending=True)[:max_nms]] |
| |
|
| | |
| | c = x[:, 5:6] * (0 if agnostic else max_wh) |
| | |
| | boxes, scores = x[:, :4] + c, x[:, 4] |
| | i = torchvision.ops.nms(boxes, scores, iou_thres) |
| | i = i[:max_det] |
| | if merge and (1 < n < 3e3): |
| | |
| | iou = box_iou(boxes[i], boxes) > iou_thres |
| | weights = iou * scores[None] |
| | x[i, :4] = torch.mm(weights, x[:, :4]).float() / weights.sum( |
| | 1, keepdim=True |
| | ) |
| | if redundant: |
| | i = i[iou.sum(1) > 1] |
| |
|
| | output[xi] = x[i] |
| | if mps: |
| | output[xi] = output[xi].to(device) |
| | if (time.time() - t) > time_limit: |
| | LOGGER.warning(f"WARNING ⚠️ NMS time limit {time_limit:.3f}s exceeded") |
| | break |
| |
|
| | return output |
| |
|
| |
|
| | class Colors: |
| | |
| | def __init__(self): |
| | |
| | hexs = ( |
| | "FF3838", |
| | "FF9D97", |
| | "FF701F", |
| | "FFB21D", |
| | "CFD231", |
| | "48F90A", |
| | "92CC17", |
| | "3DDB86", |
| | "1A9334", |
| | "00D4BB", |
| | "2C99A8", |
| | "00C2FF", |
| | "344593", |
| | "6473FF", |
| | "0018EC", |
| | "8438FF", |
| | "520085", |
| | "CB38FF", |
| | "FF95C8", |
| | "FF37C7", |
| | ) |
| | self.palette = [self.hex2rgb(f"#{c}") for c in hexs] |
| | self.n = len(self.palette) |
| |
|
| | def __call__(self, i, bgr=False): |
| | c = self.palette[int(i) % self.n] |
| | return (c[2], c[1], c[0]) if bgr else c |
| |
|
| | @staticmethod |
| | def hex2rgb(h): |
| | return tuple(int(h[1 + i : 1 + i + 2], 16) for i in (0, 2, 4)) |
| |
|
| |
|
| | colors = Colors() |
| |
|
| |
|
| | def threaded(func): |
| | |
| | def wrapper(*args, **kwargs): |
| | thread = threading.Thread(target=func, args=args, kwargs=kwargs, daemon=True) |
| | thread.start() |
| | return thread |
| |
|
| | return wrapper |
| |
|
| |
|
| | def plot_images( |
| | images, |
| | batch_idx, |
| | cls, |
| | bboxes, |
| | masks=np.zeros(0, dtype=np.uint8), |
| | paths=None, |
| | fname="images.jpg", |
| | names=None, |
| | ): |
| | |
| | if isinstance(images, torch.Tensor): |
| | images = images.cpu().float().numpy() |
| | if isinstance(cls, torch.Tensor): |
| | cls = cls.cpu().numpy() |
| | if isinstance(bboxes, torch.Tensor): |
| | bboxes = bboxes.cpu().numpy() |
| | if isinstance(masks, torch.Tensor): |
| | masks = masks.cpu().numpy().astype(int) |
| | if isinstance(batch_idx, torch.Tensor): |
| | batch_idx = batch_idx.cpu().numpy() |
| |
|
| | max_size = 1920 |
| | max_subplots = 16 |
| | bs, _, h, w = images.shape |
| | bs = min(bs, max_subplots) |
| | ns = np.ceil(bs**0.5) |
| | if np.max(images[0]) <= 1: |
| | images *= 255 |
| |
|
| | |
| | mosaic = np.full((int(ns * h), int(ns * w), 3), 255, dtype=np.uint8) |
| | for i, im in enumerate(images): |
| | if i == max_subplots: |
| | break |
| | x, y = int(w * (i // ns)), int(h * (i % ns)) |
| | im = im.transpose(1, 2, 0) |
| | mosaic[y : y + h, x : x + w, :] = im |
| |
|
| | |
| | scale = max_size / ns / max(h, w) |
| | if scale < 1: |
| | h = math.ceil(scale * h) |
| | w = math.ceil(scale * w) |
| | mosaic = cv2.resize(mosaic, tuple(int(x * ns) for x in (w, h))) |
| |
|
| | |
| | fs = int((h + w) * ns * 0.01) |
| | annotator = Annotator( |
| | mosaic, line_width=2, font_size=fs, pil=True, example=names |
| | ) |
| | for i in range(i + 1): |
| | x, y = int(w * (i // ns)), int(h * (i % ns)) |
| | annotator.rectangle( |
| | [x, y, x + w, y + h], None, (255, 255, 255), width=2 |
| | ) |
| | if paths: |
| | annotator.text( |
| | |
| | (x + 5, y + 5 + h), |
| | text=Path(paths[i]).name[:40], |
| | txt_color=(220, 220, 220), |
| | ) |
| | if len(cls) > 0: |
| | idx = batch_idx == i |
| |
|
| | boxes = xywh2xyxy(bboxes[idx, :4]).T |
| | classes = cls[idx].astype("int") |
| | labels = bboxes.shape[1] == 4 |
| | |
| | conf = None if labels else bboxes[idx, 4] |
| |
|
| | if boxes.shape[1]: |
| | if boxes.max() <= 1.01: |
| | boxes[[0, 2]] *= w |
| | boxes[[1, 3]] *= h |
| | elif scale < 1: |
| | boxes *= scale |
| | boxes[[0, 2]] += x |
| | boxes[[1, 3]] += y |
| | for j, box in enumerate(boxes.T.tolist()): |
| | c = classes[j] |
| | color = colors(c) |
| | c = names[c] if names else c |
| | if labels or conf[j] > 0.25: |
| | label = f"{c}" if labels else f"{c} {conf[j]:.1f}" |
| | annotator.box_label(box, label, color=color) |
| | annotator.im.save(fname) |
| |
|
| |
|
| | def output_to_target(output, max_det=300): |
| | |
| | targets = [] |
| | for i, o in enumerate(output): |
| | box, conf, cls = o[:max_det, :6].cpu().split((4, 1, 1), 1) |
| | j = torch.full((conf.shape[0], 1), i) |
| | targets.append(torch.cat((j, cls, xyxy2xywh(box), conf), 1)) |
| | targets = torch.cat(targets, 0).numpy() |
| | return targets[:, 0], targets[:, 1], targets[:, 2:] |
| |
|
| |
|
| | def is_ascii(s=""): |
| | |
| | s = str(s) |
| | return len(s.encode().decode("ascii", "ignore")) == len(s) |
| |
|
| |
|
| | def xyxy2xywh(x): |
| | """ |
| | Convert bounding box coordinates from (x1, y1, x2, y2) format to (x, y, width, height) format. |
| | |
| | Args: |
| | x (np.ndarray) or (torch.Tensor): The input bounding box coordinates in (x1, y1, x2, y2) format. |
| | Returns: |
| | y (np.ndarray) or (torch.Tensor): The bounding box coordinates in (x, y, width, height) format. |
| | """ |
| | y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x) |
| | y[..., 0] = (x[..., 0] + x[..., 2]) / 2 |
| | y[..., 1] = (x[..., 1] + x[..., 3]) / 2 |
| | y[..., 2] = x[..., 2] - x[..., 0] |
| | y[..., 3] = x[..., 3] - x[..., 1] |
| | return y |
| |
|
| |
|
| | def xywh2xyxy(x): |
| | |
| | y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x) |
| | y[:, 0] = x[:, 0] - x[:, 2] / 2 |
| | y[:, 1] = x[:, 1] - x[:, 3] / 2 |
| | y[:, 2] = x[:, 0] + x[:, 2] / 2 |
| | y[:, 3] = x[:, 1] + x[:, 3] / 2 |
| | return y |
| |
|
| |
|
| | def check_det_dataset(dataset, autodownload=True): |
| | |
| | data = dataset |
| | |
| | extract_dir = '' |
| |
|
| | |
| | if isinstance(data, (str, Path)): |
| | data = yaml_load(data, append_filename=True) |
| |
|
| | |
| | if isinstance(data['names'], (list, tuple)): |
| | data['names'] = dict(enumerate(data['names'])) |
| | data['nc'] = len(data['names']) |
| |
|
| | |
| | path = Path(extract_dir or data.get('path') or Path(data.get('yaml_file', '')).parent) |
| |
|
| | DATASETS_DIR = os.path.abspath('.') |
| | if not path.is_absolute(): |
| | path = (DATASETS_DIR / path).resolve() |
| | data['path'] = path |
| | for k in 'train', 'val', 'test': |
| | if data.get(k): |
| | if isinstance(data[k], str): |
| | x = (path / data[k]).resolve() |
| | if not x.exists() and data[k].startswith('../'): |
| | x = (path / data[k][3:]).resolve() |
| | data[k] = str(x) |
| | else: |
| | data[k] = [str((path / x).resolve()) for x in data[k]] |
| |
|
| | |
| | train, val, test, s = (data.get(x) for x in ('train', 'val', 'test', 'download')) |
| | if val: |
| | val = [Path(x).resolve() for x in (val if isinstance(val, list) else [val])] |
| | if not all(x.exists() for x in val): |
| | msg = f"\nDataset '{dataset}' not found ⚠️, missing paths %s" % [str(x) for x in val if not x.exists()] |
| | if s and autodownload: |
| | LOGGER.warning(msg) |
| | else: |
| | raise FileNotFoundError(msg) |
| | t = time.time() |
| | if s.startswith('bash '): |
| | LOGGER.info(f'Running {s} ...') |
| | r = os.system(s) |
| | else: |
| | r = exec(s, {'yaml': data}) |
| | dt = f'({round(time.time() - t, 1)}s)' |
| | s = f"success ✅ {dt}, saved to {colorstr('bold', DATASETS_DIR)}" if r in (0, None) else f"failure {dt} ❌" |
| | LOGGER.info(f"Dataset download {s}\n") |
| |
|
| | return data |
| |
|
| |
|
| | def yaml_load(file='data.yaml', append_filename=False): |
| | """ |
| | Load YAML data from a file. |
| | |
| | Args: |
| | file (str, optional): File name. Default is 'data.yaml'. |
| | append_filename (bool): Add the YAML filename to the YAML dictionary. Default is False. |
| | |
| | Returns: |
| | dict: YAML data and file name. |
| | """ |
| | with open(file, errors='ignore', encoding='utf-8') as f: |
| | |
| | s = f.read() |
| | if not s.isprintable(): |
| | s = re.sub(r'[^\x09\x0A\x0D\x20-\x7E\x85\xA0-\uD7FF\uE000-\uFFFD\U00010000-\U0010ffff]+', '', s) |
| | return {**yaml.safe_load(s), 'yaml_file': str(file)} if append_filename else yaml.safe_load(s) |
| |
|
| |
|
| | class IterableSimpleNamespace(SimpleNamespace): |
| | """ |
| | Iterable SimpleNamespace class to allow SimpleNamespace to be used with dict() and in for loops |
| | """ |
| |
|
| | def __iter__(self): |
| | return iter(vars(self).items()) |
| |
|
| | def __str__(self): |
| | return '\n'.join(f"{k}={v}" for k, v in vars(self).items()) |
| |
|
| | def get(self, key, default=None): |
| | return getattr(self, key, default) |
| |
|
| |
|
| | def colorstr(*input): |
| | |
| | *args, string = input if len(input) > 1 else ("blue", "bold", input[0]) |
| | colors = { |
| | "black": "\033[30m", |
| | "red": "\033[31m", |
| | "green": "\033[32m", |
| | "yellow": "\033[33m", |
| | "blue": "\033[34m", |
| | "magenta": "\033[35m", |
| | "cyan": "\033[36m", |
| | "white": "\033[37m", |
| | "bright_black": "\033[90m", |
| | "bright_red": "\033[91m", |
| | "bright_green": "\033[92m", |
| | "bright_yellow": "\033[93m", |
| | "bright_blue": "\033[94m", |
| | "bright_magenta": "\033[95m", |
| | "bright_cyan": "\033[96m", |
| | "bright_white": "\033[97m", |
| | "end": "\033[0m", |
| | "bold": "\033[1m", |
| | "underline": "\033[4m"} |
| | return "".join(colors[x] for x in args) + f"{string}" + colors["end"] |
| |
|
| |
|
| | def seed_worker(worker_id): |
| | |
| | worker_seed = torch.initial_seed() % 2 ** 32 |
| | np.random.seed(worker_seed) |
| | random.seed(worker_seed) |
| |
|
| |
|
| | def build_dataloader(cfg, batch, img_path, stride=32, rect=False, names=None, rank=-1, mode="train"): |
| | assert mode in ["train", "val"] |
| | shuffle = mode == "train" |
| | if cfg.rect and shuffle: |
| | LOGGER.warning("WARNING ⚠️ 'rect=True' is incompatible with DataLoader shuffle, setting shuffle=False") |
| | shuffle = False |
| | dataset = YOLODataset( |
| | img_path=img_path, |
| | imgsz=cfg.imgsz, |
| | batch_size=batch, |
| | augment=mode == "train", |
| | hyp=cfg, |
| | rect=cfg.rect or rect, |
| | cache=cfg.cache or None, |
| | single_cls=cfg.single_cls or False, |
| | stride=int(stride), |
| | pad=0.0 if mode == "train" else 0.5, |
| | prefix=colorstr(f"{mode}: "), |
| | use_segments=cfg.task == "segment", |
| | use_keypoints=cfg.task == "keypoint", |
| | names=names) |
| |
|
| | batch = min(batch, len(dataset)) |
| | nd = torch.cuda.device_count() |
| | workers = cfg.workers if mode == "train" else cfg.workers * 2 |
| | nw = min([os.cpu_count() // max(nd, 1), batch if batch > 1 else 0, workers]) |
| |
|
| | if rank == -1: |
| | sampler = None |
| | if cfg.image_weights or cfg.close_mosaic: |
| | loader = DataLoader |
| | generator = torch.Generator() |
| | generator.manual_seed(6148914691236517205) |
| | return loader(dataset=dataset, |
| | batch_size=batch, |
| | shuffle=shuffle and sampler is None, |
| | num_workers=nw, |
| | sampler=sampler, |
| | pin_memory=PIN_MEMORY, |
| | collate_fn=getattr(dataset, "collate_fn", None), |
| | worker_init_fn=seed_worker, |
| | generator=generator), dataset |
| |
|
| |
|
| | class BaseDataset(Dataset): |
| | """Base Dataset. |
| | Args: |
| | img_path (str): image path. |
| | pipeline (dict): a dict of image transforms. |
| | label_path (str): label path, this can also be an ann_file or other custom label path. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | img_path, |
| | imgsz=640, |
| | cache=False, |
| | augment=True, |
| | hyp=None, |
| | prefix="", |
| | rect=False, |
| | batch_size=None, |
| | stride=32, |
| | pad=0.5, |
| | single_cls=False, |
| | ): |
| | super().__init__() |
| | self.img_path = img_path |
| | self.imgsz = imgsz |
| | self.augment = augment |
| | self.single_cls = single_cls |
| | self.prefix = prefix |
| | self.im_files = self.get_img_files(self.img_path) |
| | self.labels = self.get_labels() |
| | self.ni = len(self.labels) |
| |
|
| | |
| | self.rect = rect |
| | self.batch_size = batch_size |
| | self.stride = stride |
| | self.pad = pad |
| | if self.rect: |
| | assert self.batch_size is not None |
| | self.set_rectangle() |
| |
|
| | |
| | self.ims = [None] * self.ni |
| | self.npy_files = [Path(f).with_suffix(".npy") for f in self.im_files] |
| | if cache: |
| | self.cache_images(cache) |
| |
|
| | |
| | self.transforms = self.build_transforms(hyp=hyp) |
| |
|
| | def get_img_files(self, img_path): |
| | """Read image files.""" |
| | try: |
| | f = [] |
| | for p in img_path if isinstance(img_path, list) else [img_path]: |
| | p = Path(p) |
| | if p.is_dir(): |
| | f += glob.glob(str(p / "**" / "*.*"), recursive=True) |
| | |
| | elif p.is_file(): |
| | with open(p) as t: |
| | t = t.read().strip().splitlines() |
| | parent = str(p.parent) + os.sep |
| | f += [x.replace("./", parent) if x.startswith("./") else x for x in t] |
| | |
| | else: |
| | raise FileNotFoundError(f"{self.prefix}{p} does not exist") |
| | im_files = sorted(x.replace("/", os.sep) for x in f if x.split(".")[-1].lower() in IMG_FORMATS) |
| | |
| | assert im_files, f"{self.prefix}No images found" |
| | except Exception as e: |
| | raise FileNotFoundError(f"{self.prefix}Error loading data from {img_path}\n") from e |
| | return im_files |
| |
|
| | def load_image(self, i): |
| | |
| | im, f, fn = self.ims[i], self.im_files[i], self.npy_files[i] |
| | if im is None: |
| | if fn.exists(): |
| | im = np.load(fn) |
| | else: |
| | im = cv2.imread(f) |
| | if im is None: |
| | raise FileNotFoundError(f"Image Not Found {f}") |
| | h0, w0 = im.shape[:2] |
| | r = self.imgsz / max(h0, w0) |
| | if r != 1: |
| | interp = cv2.INTER_LINEAR if (self.augment or r > 1) else cv2.INTER_AREA |
| | im = cv2.resize(im, (math.ceil(w0 * r), math.ceil(h0 * r)), interpolation=interp) |
| | return im, (h0, w0), im.shape[:2] |
| | return self.ims[i], self.im_hw0[i], self.im_hw[i] |
| |
|
| | def cache_images(self, cache): |
| | |
| | gb = 0 |
| | self.im_hw0, self.im_hw = [None] * self.ni, [None] * self.ni |
| | fcn = self.cache_images_to_disk if cache == "disk" else self.load_image |
| | with ThreadPool(NUM_THREADS) as pool: |
| | results = pool.imap(fcn, range(self.ni)) |
| | pbar = tqdm(enumerate(results), total=self.ni, bar_format=TQDM_BAR_FORMAT) |
| | for i, x in pbar: |
| | if cache == "disk": |
| | gb += self.npy_files[i].stat().st_size |
| | else: |
| | self.ims[i], self.im_hw0[i], self.im_hw[i] = x |
| | gb += self.ims[i].nbytes |
| | pbar.desc = f"{self.prefix}Caching images ({gb / 1E9:.1f}GB {cache})" |
| | pbar.close() |
| |
|
| | def cache_images_to_disk(self, i): |
| | |
| | f = self.npy_files[i] |
| | if not f.exists(): |
| | np.save(f.as_posix(), cv2.imread(self.im_files[i])) |
| |
|
| | def set_rectangle(self): |
| | bi = np.floor(np.arange(self.ni) / self.batch_size).astype(int) |
| | nb = bi[-1] + 1 |
| |
|
| | s = np.array([x.pop("shape") for x in self.labels]) |
| | ar = s[:, 0] / s[:, 1] |
| | irect = ar.argsort() |
| | self.im_files = [self.im_files[i] for i in irect] |
| | self.labels = [self.labels[i] for i in irect] |
| | ar = ar[irect] |
| |
|
| | |
| | shapes = [[1, 1]] * nb |
| | for i in range(nb): |
| | ari = ar[bi == i] |
| | mini, maxi = ari.min(), ari.max() |
| | if maxi < 1: |
| | shapes[i] = [maxi, 1] |
| | elif mini > 1: |
| | shapes[i] = [1, 1 / mini] |
| |
|
| | self.batch_shapes = np.ceil(np.array(shapes) * self.imgsz / self.stride + self.pad).astype(int) * self.stride |
| | self.batch = bi |
| |
|
| | def __getitem__(self, index): |
| | return self.transforms(self.get_label_info(index)) |
| |
|
| | def get_label_info(self, index): |
| | label = self.labels[index].copy() |
| | label.pop("shape", None) |
| | label["img"], label["ori_shape"], label["resized_shape"] = self.load_image(index) |
| | label["ratio_pad"] = ( |
| | label["resized_shape"][0] / label["ori_shape"][0], |
| | label["resized_shape"][1] / label["ori_shape"][1], |
| | ) |
| | if self.rect: |
| | label["rect_shape"] = self.batch_shapes[self.batch[index]] |
| | label = self.update_labels_info(label) |
| | return label |
| |
|
| | def __len__(self): |
| | return len(self.labels) |
| |
|
| | def update_labels_info(self, label): |
| | """custom your label format here""" |
| | return label |
| |
|
| | def build_transforms(self, hyp=None): |
| | """Users can custom augmentations here |
| | like: |
| | if self.augment: |
| | # training transforms |
| | return Compose([]) |
| | else: |
| | # val transforms |
| | return Compose([]) |
| | """ |
| | raise NotImplementedError |
| |
|
| | def get_labels(self): |
| | """Users can custom their own format here. |
| | Make sure your output is a list with each element like below: |
| | dict( |
| | im_file=im_file, |
| | shape=shape, # format: (height, width) |
| | cls=cls, |
| | bboxes=bboxes, # xywh |
| | segments=segments, # xy |
| | keypoints=keypoints, # xy |
| | normalized=True, # or False |
| | bbox_format="xyxy", # or xywh, ltwh |
| | ) |
| | """ |
| | raise NotImplementedError |
| |
|
| |
|
| | def img2label_paths(img_paths): |
| | |
| | sa, sb = f"{os.sep}images{os.sep}", f"{os.sep}labels{os.sep}" |
| | return [sb.join(x.rsplit(sa, 1)).rsplit(".", 1)[0] + ".txt" for x in img_paths] |
| |
|
| |
|
| | def get_hash(paths): |
| | |
| | size = sum(os.path.getsize(p) for p in paths if os.path.exists(p)) |
| | h = hashlib.md5(str(size).encode()) |
| | h.update("".join(paths).encode()) |
| | return h.hexdigest() |
| |
|
| |
|
| | class Compose: |
| |
|
| | def __init__(self, transforms): |
| | self.transforms = transforms |
| |
|
| | def __call__(self, data): |
| | for t in self.transforms: |
| | data = t(data) |
| | return data |
| |
|
| | def append(self, transform): |
| | self.transforms.append(transform) |
| |
|
| | def tolist(self): |
| | return self.transforms |
| |
|
| | def __repr__(self): |
| | format_string = f"{self.__class__.__name__}(" |
| | for t in self.transforms: |
| | format_string += "\n" |
| | format_string += f" {t}" |
| | format_string += "\n)" |
| | return format_string |
| |
|
| |
|
| | class Format: |
| |
|
| | def __init__(self, |
| | bbox_format="xywh", |
| | normalize=True, |
| | return_mask=False, |
| | return_keypoint=False, |
| | mask_ratio=4, |
| | mask_overlap=True, |
| | batch_idx=True): |
| | self.bbox_format = bbox_format |
| | self.normalize = normalize |
| | self.return_mask = return_mask |
| | self.return_keypoint = return_keypoint |
| | self.mask_ratio = mask_ratio |
| | self.mask_overlap = mask_overlap |
| | self.batch_idx = batch_idx |
| |
|
| | def __call__(self, labels): |
| | img = labels.pop("img") |
| | h, w = img.shape[:2] |
| | cls = labels.pop("cls") |
| | instances = labels.pop("instances") |
| | instances.convert_bbox(format=self.bbox_format) |
| | instances.denormalize(w, h) |
| | nl = len(instances) |
| |
|
| | if self.normalize: |
| | instances.normalize(w, h) |
| | labels["img"] = self._format_img(img) |
| | labels["cls"] = torch.from_numpy(cls) if nl else torch.zeros(nl) |
| | labels["bboxes"] = torch.from_numpy(instances.bboxes) if nl else torch.zeros((nl, 4)) |
| | if self.return_keypoint: |
| | labels["keypoints"] = torch.from_numpy(instances.keypoints) if nl else torch.zeros((nl, 17, 2)) |
| | |
| | if self.batch_idx: |
| | labels["batch_idx"] = torch.zeros(nl) |
| | return labels |
| |
|
| | def _format_img(self, img): |
| | if len(img.shape) < 3: |
| | img = np.expand_dims(img, -1) |
| | img = np.ascontiguousarray(img.transpose(2, 0, 1)[::-1]) |
| | img = torch.from_numpy(img) |
| | return img |
| |
|
| | class Bboxes: |
| | """Now only numpy is supported""" |
| |
|
| | def __init__(self, bboxes, format="xyxy") -> None: |
| | assert format in _formats |
| | bboxes = bboxes[None, :] if bboxes.ndim == 1 else bboxes |
| | assert bboxes.ndim == 2 |
| | assert bboxes.shape[1] == 4 |
| | self.bboxes = bboxes |
| | self.format = format |
| |
|
| | def convert(self, format): |
| | assert format in _formats |
| | if self.format == format: |
| | return |
| | elif self.format == "xyxy": |
| | if format == "xywh": |
| | bboxes = xyxy2xywh(self.bboxes) |
| | elif self.format == "xywh": |
| | if format == "xyxy": |
| | bboxes = xywh2xyxy(self.bboxes) |
| | self.bboxes = bboxes |
| | self.format = format |
| |
|
| | def areas(self): |
| | self.convert("xyxy") |
| | return (self.bboxes[:, 2] - self.bboxes[:, 0]) * (self.bboxes[:, 3] - self.bboxes[:, 1]) |
| |
|
| | def mul(self, scale): |
| | """ |
| | Args: |
| | scale (tuple | List | int): the scale for four coords. |
| | """ |
| | assert isinstance(scale, (tuple, list)) |
| | assert len(scale) == 4 |
| | self.bboxes[:, 0] *= scale[0] |
| | self.bboxes[:, 1] *= scale[1] |
| | self.bboxes[:, 2] *= scale[2] |
| | self.bboxes[:, 3] *= scale[3] |
| |
|
| | def add(self, offset): |
| | """ |
| | Args: |
| | offset (tuple | List | int): the offset for four coords. |
| | """ |
| | assert isinstance(offset, (tuple, list)) |
| | assert len(offset) == 4 |
| | self.bboxes[:, 0] += offset[0] |
| | self.bboxes[:, 1] += offset[1] |
| | self.bboxes[:, 2] += offset[2] |
| | self.bboxes[:, 3] += offset[3] |
| |
|
| | def __len__(self): |
| | return len(self.bboxes) |
| |
|
| | @classmethod |
| | def concatenate(cls, boxes_list: List["Bboxes"], axis=0) -> "Bboxes": |
| | """ |
| | Concatenates a list of Boxes into a single Bboxes |
| | |
| | Arguments: |
| | boxes_list (list[Bboxes]) |
| | |
| | Returns: |
| | Bboxes: the concatenated Boxes |
| | """ |
| | assert isinstance(boxes_list, (list, tuple)) |
| | if not boxes_list: |
| | return cls(np.empty(0)) |
| | assert all(isinstance(box, Bboxes) for box in boxes_list) |
| |
|
| | if len(boxes_list) == 1: |
| | return boxes_list[0] |
| | return cls(np.concatenate([b.bboxes for b in boxes_list], axis=axis)) |
| |
|
| | def __getitem__(self, index) -> "Bboxes": |
| | """ |
| | Args: |
| | index: int, slice, or a BoolArray |
| | |
| | Returns: |
| | Bboxes: Create a new :class:`Bboxes` by indexing. |
| | """ |
| | if isinstance(index, int): |
| | return Bboxes(self.bboxes[index].view(1, -1)) |
| | b = self.bboxes[index] |
| | assert b.ndim == 2, f"Indexing on Bboxes with {index} failed to return a matrix!" |
| | return Bboxes(b) |
| |
|
| |
|
| | def resample_segments(segments, n=1000): |
| | """ |
| | Inputs a list of segments (n,2) and returns a list of segments (n,2) up-sampled to n points each. |
| | |
| | Args: |
| | segments (list): a list of (n,2) arrays, where n is the number of points in the segment. |
| | n (int): number of points to resample the segment to. Defaults to 1000 |
| | |
| | Returns: |
| | segments (list): the resampled segments. |
| | """ |
| | for i, s in enumerate(segments): |
| | s = np.concatenate((s, s[0:1, :]), axis=0) |
| | x = np.linspace(0, len(s) - 1, n) |
| | xp = np.arange(len(s)) |
| | segments[i] = np.concatenate([np.interp(x, xp, s[:, i]) for i in range(2)]).reshape(2, -1).T |
| | return segments |
| |
|
| |
|
| | class Instances: |
| |
|
| | def __init__(self, bboxes, segments=None, keypoints=None, bbox_format="xywh", normalized=True) -> None: |
| | """ |
| | Args: |
| | bboxes (ndarray): bboxes with shape [N, 4]. |
| | segments (list | ndarray): segments. |
| | keypoints (ndarray): keypoints with shape [N, 17, 2]. |
| | """ |
| | if segments is None: |
| | segments = [] |
| | self._bboxes = Bboxes(bboxes=bboxes, format=bbox_format) |
| | self.keypoints = keypoints |
| | self.normalized = normalized |
| |
|
| | if len(segments) > 0: |
| | |
| | segments = resample_segments(segments) |
| | |
| | segments = np.stack(segments, axis=0) |
| | else: |
| | segments = np.zeros((0, 1000, 2), dtype=np.float32) |
| | self.segments = segments |
| |
|
| | def convert_bbox(self, format): |
| | self._bboxes.convert(format=format) |
| |
|
| | def bbox_areas(self): |
| | self._bboxes.areas() |
| |
|
| | def scale(self, scale_w, scale_h, bbox_only=False): |
| | """this might be similar with denormalize func but without normalized sign""" |
| | self._bboxes.mul(scale=(scale_w, scale_h, scale_w, scale_h)) |
| | if bbox_only: |
| | return |
| | self.segments[..., 0] *= scale_w |
| | self.segments[..., 1] *= scale_h |
| | if self.keypoints is not None: |
| | self.keypoints[..., 0] *= scale_w |
| | self.keypoints[..., 1] *= scale_h |
| |
|
| | def denormalize(self, w, h): |
| | if not self.normalized: |
| | return |
| | self._bboxes.mul(scale=(w, h, w, h)) |
| | self.segments[..., 0] *= w |
| | self.segments[..., 1] *= h |
| | if self.keypoints is not None: |
| | self.keypoints[..., 0] *= w |
| | self.keypoints[..., 1] *= h |
| | self.normalized = False |
| |
|
| | def normalize(self, w, h): |
| | if self.normalized: |
| | return |
| | self._bboxes.mul(scale=(1 / w, 1 / h, 1 / w, 1 / h)) |
| | self.segments[..., 0] /= w |
| | self.segments[..., 1] /= h |
| | if self.keypoints is not None: |
| | self.keypoints[..., 0] /= w |
| | self.keypoints[..., 1] /= h |
| | self.normalized = True |
| |
|
| | def add_padding(self, padw, padh): |
| | |
| | assert not self.normalized, "you should add padding with absolute coordinates." |
| | self._bboxes.add(offset=(padw, padh, padw, padh)) |
| | self.segments[..., 0] += padw |
| | self.segments[..., 1] += padh |
| | if self.keypoints is not None: |
| | self.keypoints[..., 0] += padw |
| | self.keypoints[..., 1] += padh |
| |
|
| | def __getitem__(self, index) -> "Instances": |
| | """ |
| | Args: |
| | index: int, slice, or a BoolArray |
| | |
| | Returns: |
| | Instances: Create a new :class:`Instances` by indexing. |
| | """ |
| | segments = self.segments[index] if len(self.segments) else self.segments |
| | keypoints = self.keypoints[index] if self.keypoints is not None else None |
| | bboxes = self.bboxes[index] |
| | bbox_format = self._bboxes.format |
| | return Instances( |
| | bboxes=bboxes, |
| | segments=segments, |
| | keypoints=keypoints, |
| | bbox_format=bbox_format, |
| | normalized=self.normalized, |
| | ) |
| |
|
| | def flipud(self, h): |
| | if self._bboxes.format == "xyxy": |
| | y1 = self.bboxes[:, 1].copy() |
| | y2 = self.bboxes[:, 3].copy() |
| | self.bboxes[:, 1] = h - y2 |
| | self.bboxes[:, 3] = h - y1 |
| | else: |
| | self.bboxes[:, 1] = h - self.bboxes[:, 1] |
| | self.segments[..., 1] = h - self.segments[..., 1] |
| | if self.keypoints is not None: |
| | self.keypoints[..., 1] = h - self.keypoints[..., 1] |
| |
|
| | def fliplr(self, w): |
| | if self._bboxes.format == "xyxy": |
| | x1 = self.bboxes[:, 0].copy() |
| | x2 = self.bboxes[:, 2].copy() |
| | self.bboxes[:, 0] = w - x2 |
| | self.bboxes[:, 2] = w - x1 |
| | else: |
| | self.bboxes[:, 0] = w - self.bboxes[:, 0] |
| | self.segments[..., 0] = w - self.segments[..., 0] |
| | if self.keypoints is not None: |
| | self.keypoints[..., 0] = w - self.keypoints[..., 0] |
| |
|
| | def clip(self, w, h): |
| | ori_format = self._bboxes.format |
| | self.convert_bbox(format="xyxy") |
| | self.bboxes[:, [0, 2]] = self.bboxes[:, [0, 2]].clip(0, w) |
| | self.bboxes[:, [1, 3]] = self.bboxes[:, [1, 3]].clip(0, h) |
| | if ori_format != "xyxy": |
| | self.convert_bbox(format=ori_format) |
| | self.segments[..., 0] = self.segments[..., 0].clip(0, w) |
| | self.segments[..., 1] = self.segments[..., 1].clip(0, h) |
| | if self.keypoints is not None: |
| | self.keypoints[..., 0] = self.keypoints[..., 0].clip(0, w) |
| | self.keypoints[..., 1] = self.keypoints[..., 1].clip(0, h) |
| |
|
| | def update(self, bboxes, segments=None, keypoints=None): |
| | new_bboxes = Bboxes(bboxes, format=self._bboxes.format) |
| | self._bboxes = new_bboxes |
| | if segments is not None: |
| | self.segments = segments |
| | if keypoints is not None: |
| | self.keypoints = keypoints |
| |
|
| | def __len__(self): |
| | return len(self.bboxes) |
| |
|
| | @classmethod |
| | def concatenate(cls, instances_list: List["Instances"], axis=0) -> "Instances": |
| | """ |
| | Concatenates a list of Boxes into a single Bboxes |
| | |
| | Arguments: |
| | instances_list (list[Bboxes]) |
| | axis |
| | |
| | Returns: |
| | Boxes: the concatenated Boxes |
| | """ |
| | assert isinstance(instances_list, (list, tuple)) |
| | if not instances_list: |
| | return cls(np.empty(0)) |
| | assert all(isinstance(instance, Instances) for instance in instances_list) |
| |
|
| | if len(instances_list) == 1: |
| | return instances_list[0] |
| |
|
| | use_keypoint = instances_list[0].keypoints is not None |
| | bbox_format = instances_list[0]._bboxes.format |
| | normalized = instances_list[0].normalized |
| |
|
| | cat_boxes = np.concatenate([ins.bboxes for ins in instances_list], axis=axis) |
| | cat_segments = np.concatenate([b.segments for b in instances_list], axis=axis) |
| | cat_keypoints = np.concatenate([b.keypoints for b in instances_list], axis=axis) if use_keypoint else None |
| | return cls(cat_boxes, cat_segments, cat_keypoints, bbox_format, normalized) |
| |
|
| | @property |
| | def bboxes(self): |
| | return self._bboxes.bboxes |
| |
|
| |
|
| | def is_dir_writeable(dir_path: Union[str, Path]) -> bool: |
| | """ |
| | Check if a directory is writeable. |
| | |
| | Args: |
| | dir_path (str) or (Path): The path to the directory. |
| | |
| | Returns: |
| | bool: True if the directory is writeable, False otherwise. |
| | """ |
| | try: |
| | with tempfile.TemporaryFile(dir=dir_path): |
| | pass |
| | return True |
| | except OSError: |
| | return False |
| |
|
| |
|
| | class YOLODataset(BaseDataset): |
| | cache_version = '1.0.1' |
| | rand_interp_methods = [cv2.INTER_NEAREST, cv2.INTER_LINEAR, cv2.INTER_CUBIC, cv2.INTER_AREA, cv2.INTER_LANCZOS4] |
| | """YOLO Dataset. |
| | Args: |
| | img_path (str): image path. |
| | prefix (str): prefix. |
| | """ |
| |
|
| | def __init__(self, |
| | img_path, |
| | imgsz=640, |
| | cache=False, |
| | augment=True, |
| | hyp=None, |
| | prefix="", |
| | rect=False, |
| | batch_size=None, |
| | stride=32, |
| | pad=0.0, |
| | single_cls=False, |
| | use_segments=False, |
| | use_keypoints=False, |
| | names=None): |
| | self.use_segments = use_segments |
| | self.use_keypoints = use_keypoints |
| | self.names = names |
| | assert not (self.use_segments and self.use_keypoints), "Can not use both segments and keypoints." |
| | super().__init__(img_path, imgsz, cache, augment, hyp, prefix, rect, batch_size, stride, pad, single_cls) |
| |
|
| | def cache_labels(self, path=Path("./labels.cache")): |
| | |
| | if path.exists(): |
| | path.unlink() |
| | x = {"labels": []} |
| | nm, nf, ne, nc, msgs = 0, 0, 0, 0, [] |
| | desc = f"{self.prefix}Scanning {path.parent / path.stem}..." |
| | total = len(self.im_files) |
| | with ThreadPool(NUM_THREADS) as pool: |
| | results = pool.imap(func=verify_image_label, |
| | iterable=zip(self.im_files, self.label_files, repeat(self.prefix), |
| | repeat(self.use_keypoints), repeat(len(self.names)))) |
| | pbar = tqdm(results, desc=desc, total=total, bar_format=TQDM_BAR_FORMAT) |
| | for im_file, lb, shape, segments, keypoint, nm_f, nf_f, ne_f, nc_f, msg in pbar: |
| | nm += nm_f |
| | nf += nf_f |
| | ne += ne_f |
| | nc += nc_f |
| | if im_file: |
| | x["labels"].append( |
| | dict( |
| | im_file=im_file, |
| | shape=shape, |
| | cls=lb[:, 0:1], |
| | bboxes=lb[:, 1:], |
| | segments=segments, |
| | keypoints=keypoint, |
| | normalized=True, |
| | bbox_format="xywh")) |
| | if msg: |
| | msgs.append(msg) |
| | pbar.desc = f"{desc} {nf} images, {nm + ne} backgrounds, {nc} corrupt" |
| | pbar.close() |
| |
|
| | if msgs: |
| | LOGGER.info("\n".join(msgs)) |
| | x["hash"] = get_hash(self.label_files + self.im_files) |
| | x["results"] = nf, nm, ne, nc, len(self.im_files) |
| | x["msgs"] = msgs |
| | x["version"] = self.cache_version |
| | self.im_files = [lb["im_file"] for lb in x["labels"]] |
| | if is_dir_writeable(path.parent): |
| | np.save(str(path), x) |
| | path.with_suffix(".cache.npy").rename(path) |
| | LOGGER.info(f"{self.prefix}New cache created: {path}") |
| | else: |
| | LOGGER.warning(f"{self.prefix}WARNING ⚠️ Cache directory {path.parent} is not writeable") |
| | return x |
| |
|
| | def get_labels(self): |
| | self.label_files = img2label_paths(self.im_files) |
| | cache_path = Path(self.label_files[0]).parent.with_suffix(".cache") |
| | try: |
| | cache, exists = np.load(str(cache_path), allow_pickle=True).item(), True |
| | assert cache["version"] == self.cache_version |
| | assert cache["hash"] == get_hash(self.label_files + self.im_files) |
| | except (FileNotFoundError, AssertionError, AttributeError): |
| | cache, exists = self.cache_labels(cache_path), False |
| |
|
| | |
| | nf, nm, ne, nc, n = cache.pop("results") |
| | if exists: |
| | d = f"Scanning {cache_path}... {nf} images, {nm + ne} backgrounds, {nc} corrupt" |
| | tqdm(None, desc=self.prefix + d, total=n, initial=n, bar_format=TQDM_BAR_FORMAT) |
| | if cache["msgs"]: |
| | LOGGER.info("\n".join(cache["msgs"])) |
| |
|
| | |
| | [cache.pop(k) for k in ("hash", "version", "msgs")] |
| | labels = cache["labels"] |
| |
|
| | |
| | len_cls = sum(len(lb["cls"]) for lb in labels) |
| | len_boxes = sum(len(lb["bboxes"]) for lb in labels) |
| | len_segments = sum(len(lb["segments"]) for lb in labels) |
| | if len_segments and len_boxes != len_segments: |
| | LOGGER.warning( |
| | f"WARNING ⚠️ Box and segment counts should be equal, but got len(segments) = {len_segments}, " |
| | f"len(boxes) = {len_boxes}. To resolve this only boxes will be used and all segments will be removed. " |
| | "To avoid this please supply either a detect or segment dataset, not a detect-segment mixed dataset.") |
| | for lb in labels: |
| | lb["segments"] = [] |
| | return labels |
| |
|
| |
|
| | def build_transforms(self, hyp=None): |
| | transforms = Compose([LetterBox(new_shape=(self.imgsz, self.imgsz), scaleup=False)]) |
| | transforms.append( |
| | Format(bbox_format="xywh", |
| | normalize=True, |
| | return_mask=self.use_segments, |
| | return_keypoint=self.use_keypoints, |
| | batch_idx=True, |
| | mask_ratio=hyp.mask_ratio, |
| | mask_overlap=hyp.overlap_mask)) |
| | return transforms |
| |
|
| | def close_mosaic(self, hyp): |
| | hyp.mosaic = 0.0 |
| | hyp.copy_paste = 0.0 |
| | hyp.mixup = 0.0 |
| | self.transforms = self.build_transforms(hyp) |
| |
|
| | def update_labels_info(self, label): |
| | """custom your label format here""" |
| | |
| | |
| | bboxes = label.pop("bboxes") |
| | segments = label.pop("segments") |
| | keypoints = label.pop("keypoints", None) |
| | bbox_format = label.pop("bbox_format") |
| | normalized = label.pop("normalized") |
| | label["instances"] = Instances(bboxes, segments, keypoints, bbox_format=bbox_format, normalized=normalized) |
| | return label |
| |
|
| | @staticmethod |
| | def collate_fn(batch): |
| | new_batch = {} |
| | keys = batch[0].keys() |
| | values = list(zip(*[list(b.values()) for b in batch])) |
| | for i, k in enumerate(keys): |
| | value = values[i] |
| | if k == "img": |
| | value = torch.stack(value, 0) |
| | if k in ["masks", "keypoints", "bboxes", "cls"]: |
| | value = torch.cat(value, 0) |
| | new_batch[k] = value |
| | new_batch["batch_idx"] = list(new_batch["batch_idx"]) |
| | for i in range(len(new_batch["batch_idx"])): |
| | new_batch["batch_idx"][i] += i |
| | new_batch["batch_idx"] = torch.cat(new_batch["batch_idx"], 0) |
| | return new_batch |
| |
|
| |
|
| | class DFL(nn.Module): |
| | |
| | def __init__(self, c1=16): |
| | super().__init__() |
| | self.conv = nn.Conv2d(c1, 1, 1, bias=False).requires_grad_(False) |
| | x = torch.arange(c1, dtype=torch.float) |
| | self.conv.weight.data[:] = nn.Parameter(x.view(1, c1, 1, 1)) |
| | self.c1 = c1 |
| |
|
| | def forward(self, x): |
| | b, c, a = x.shape |
| | return self.conv(x.view(b, 4, self.c1, a).transpose(2, 1).softmax(1)).view( |
| | b, 4, a |
| | ) |
| |
|
| |
|
| | def dist2bbox(distance, anchor_points, xywh=True, dim=-1): |
| | """Transform distance(ltrb) to box(xywh or xyxy).""" |
| | lt, rb = torch.split(distance, 2, dim) |
| | x1y1 = anchor_points - lt |
| | x2y2 = anchor_points + rb |
| | if xywh: |
| | c_xy = (x1y1 + x2y2) / 2 |
| | wh = x2y2 - x1y1 |
| | return torch.cat((c_xy, wh), dim) |
| | return torch.cat((x1y1, x2y2), dim) |
| |
|
| |
|
| | def post_process(x): |
| | dfl = DFL(16) |
| | anchors = torch.tensor( |
| | np.load( |
| | "./anchors.npy", |
| | allow_pickle=True, |
| | ) |
| | ) |
| | strides = torch.tensor( |
| | np.load( |
| | "./strides.npy", |
| | allow_pickle=True, |
| | ) |
| | ) |
| | box, cls = torch.cat([xi.view(x[0].shape[0], 144, -1) for xi in x], 2).split( |
| | (16 * 4, 80), 1 |
| | ) |
| | dbox = dist2bbox(dfl(box), anchors.unsqueeze(0), xywh=True, dim=1) * strides |
| | y = torch.cat((dbox, cls.sigmoid()), 1) |
| | return y, x |
| |
|
| |
|
| | def smooth(y, f=0.05): |
| | |
| | nf = round(len(y) * f * 2) // 2 + 1 |
| | p = np.ones(nf // 2) |
| | yp = np.concatenate((p * y[0], y, p * y[-1]), 0) |
| | return np.convolve(yp, np.ones(nf) / nf, mode='valid') |
| |
|
| |
|
| | def compute_ap(recall, precision): |
| | """ Compute the average precision, given the recall and precision curves |
| | # Arguments |
| | recall: The recall curve (list) |
| | precision: The precision curve (list) |
| | # Returns |
| | Average precision, precision curve, recall curve |
| | """ |
| |
|
| | |
| | mrec = np.concatenate(([0.0], recall, [1.0])) |
| | mpre = np.concatenate(([1.0], precision, [0.0])) |
| |
|
| | |
| | mpre = np.flip(np.maximum.accumulate(np.flip(mpre))) |
| |
|
| | |
| | method = 'interp' |
| | if method == 'interp': |
| | x = np.linspace(0, 1, 101) |
| | ap = np.trapz(np.interp(x, mrec, mpre), x) |
| | else: |
| | i = np.where(mrec[1:] != mrec[:-1])[0] |
| | ap = np.sum((mrec[i + 1] - mrec[i]) * mpre[i + 1]) |
| |
|
| | return ap, mpre, mrec |
| |
|
| |
|
| | def ap_per_class(tp, conf, pred_cls, target_cls, plot=False, save_dir=Path(), names=(), eps=1e-16, prefix=""): |
| | """ Compute the average precision, given the recall and precision curves. |
| | Source: https://github.com/rafaelpadilla/Object-Detection-Metrics. |
| | # Arguments |
| | tp: True positives (nparray, nx1 or nx10). |
| | conf: Objectness value from 0-1 (nparray). |
| | pred_cls: Predicted object classes (nparray). |
| | target_cls: True object classes (nparray). |
| | plot: Plot precision-recall curve at mAP@0.5 |
| | save_dir: Plot save directory |
| | # Returns |
| | The average precision as computed in py-faster-rcnn. |
| | """ |
| |
|
| | |
| | i = np.argsort(-conf) |
| | tp, conf, pred_cls = tp[i], conf[i], pred_cls[i] |
| |
|
| | |
| | unique_classes, nt = np.unique(target_cls, return_counts=True) |
| | nc = unique_classes.shape[0] |
| |
|
| | |
| | px, py = np.linspace(0, 1, 1000), [] |
| | ap, p, r = np.zeros((nc, tp.shape[1])), np.zeros((nc, 1000)), np.zeros((nc, 1000)) |
| | for ci, c in enumerate(unique_classes): |
| | i = pred_cls == c |
| | n_l = nt[ci] |
| | n_p = i.sum() |
| | if n_p == 0 or n_l == 0: |
| | continue |
| |
|
| | |
| | fpc = (1 - tp[i]).cumsum(0) |
| | tpc = tp[i].cumsum(0) |
| |
|
| | |
| | recall = tpc / (n_l + eps) |
| | r[ci] = np.interp(-px, -conf[i], recall[:, 0], left=0) |
| |
|
| | |
| | precision = tpc / (tpc + fpc) |
| | p[ci] = np.interp(-px, -conf[i], precision[:, 0], left=1) |
| |
|
| | |
| | for j in range(tp.shape[1]): |
| | ap[ci, j], mpre, mrec = compute_ap(recall[:, j], precision[:, j]) |
| | if plot and j == 0: |
| | py.append(np.interp(px, mrec, mpre)) |
| |
|
| | |
| | f1 = 2 * p * r / (p + r + eps) |
| | names = [v for k, v in names.items() if k in unique_classes] |
| | names = dict(enumerate(names)) |
| |
|
| | i = smooth(f1.mean(0), 0.1).argmax() |
| | p, r, f1 = p[:, i], r[:, i], f1[:, i] |
| | tp = (r * nt).round() |
| | fp = (tp / (p + eps) - tp).round() |
| | return tp, fp, p, r, f1, ap, unique_classes.astype(int) |
| |
|
| |
|
| | class Metric: |
| |
|
| | def __init__(self) -> None: |
| | self.p = [] |
| | self.r = [] |
| | self.f1 = [] |
| | self.all_ap = [] |
| | self.ap_class_index = [] |
| | self.nc = 0 |
| |
|
| | @property |
| | def ap50(self): |
| | """AP@0.5 of all classes. |
| | Return: |
| | (nc, ) or []. |
| | """ |
| | return self.all_ap[:, 0] if len(self.all_ap) else [] |
| |
|
| | @property |
| | def ap(self): |
| | """AP@0.5:0.95 |
| | Return: |
| | (nc, ) or []. |
| | """ |
| | return self.all_ap.mean(1) if len(self.all_ap) else [] |
| |
|
| | @property |
| | def mp(self): |
| | """mean precision of all classes. |
| | Return: |
| | float. |
| | """ |
| | return self.p.mean() if len(self.p) else 0.0 |
| |
|
| | @property |
| | def mr(self): |
| | """mean recall of all classes. |
| | Return: |
| | float. |
| | """ |
| | return self.r.mean() if len(self.r) else 0.0 |
| |
|
| | @property |
| | def map50(self): |
| | """Mean AP@0.5 of all classes. |
| | Return: |
| | float. |
| | """ |
| | return self.all_ap[:, 0].mean() if len(self.all_ap) else 0.0 |
| |
|
| | @property |
| | def map75(self): |
| | """Mean AP@0.75 of all classes. |
| | Return: |
| | float. |
| | """ |
| | return self.all_ap[:, 5].mean() if len(self.all_ap) else 0.0 |
| |
|
| | @property |
| | def map(self): |
| | """Mean AP@0.5:0.95 of all classes. |
| | Return: |
| | float. |
| | """ |
| | return self.all_ap.mean() if len(self.all_ap) else 0.0 |
| |
|
| | def mean_results(self): |
| | """Mean of results, return mp, mr, map50, map""" |
| | return [self.mp, self.mr, self.map50, self.map] |
| |
|
| | def class_result(self, i): |
| | """class-aware result, return p[i], r[i], ap50[i], ap[i]""" |
| | return self.p[i], self.r[i], self.ap50[i], self.ap[i] |
| |
|
| | @property |
| | def maps(self): |
| | """mAP of each class""" |
| | maps = np.zeros(self.nc) + self.map |
| | for i, c in enumerate(self.ap_class_index): |
| | maps[c] = self.ap[i] |
| | return maps |
| |
|
| | def fitness(self): |
| | |
| | w = [0.0, 0.0, 0.1, 0.9] |
| | return (np.array(self.mean_results()) * w).sum() |
| |
|
| | def update(self, results): |
| | """ |
| | Args: |
| | results: tuple(p, r, ap, f1, ap_class) |
| | """ |
| | self.p, self.r, self.f1, self.all_ap, self.ap_class_index = results |
| |
|
| |
|
| | class DetMetrics: |
| |
|
| | def __init__(self, save_dir=Path("."), plot=False, names=()) -> None: |
| | self.save_dir = save_dir |
| | self.plot = plot |
| | self.names = names |
| | self.box = Metric() |
| |
|
| | def process(self, tp, conf, pred_cls, target_cls): |
| | results = ap_per_class(tp, conf, pred_cls, target_cls, plot=self.plot, save_dir=self.save_dir, |
| | names=self.names)[2:] |
| | self.box.nc = len(self.names) |
| | self.box.update(results) |
| |
|
| | @property |
| | def keys(self): |
| | return ["metrics/precision(B)", "metrics/recall(B)", "metrics/mAP50(B)", "metrics/mAP50-95(B)"] |
| |
|
| | def mean_results(self): |
| | return self.box.mean_results() |
| |
|
| | def class_result(self, i): |
| | return self.box.class_result(i) |
| |
|
| | @property |
| | def maps(self): |
| | return self.box.maps |
| |
|
| | @property |
| | def fitness(self): |
| | return self.box.fitness() |
| |
|
| | @property |
| | def ap_class_index(self): |
| | return self.box.ap_class_index |
| |
|
| | @property |
| | def results_dict(self): |
| | return dict(zip(self.keys + ["fitness"], self.mean_results() + [self.fitness])) |
| |
|
| |
|
| | def increment_path(path, exist_ok=False, sep='', mkdir=False): |
| | """ |
| | Increments a file or directory path, i.e. runs/exp --> runs/exp{sep}2, runs/exp{sep}3, ... etc. |
| | |
| | If the path exists and exist_ok is not set to True, the path will be incremented by appending a number and sep to |
| | the end of the path. If the path is a file, the file extension will be preserved. If the path is a directory, the |
| | number will be appended directly to the end of the path. If mkdir is set to True, the path will be created as a |
| | directory if it does not already exist. |
| | |
| | Args: |
| | path (str or pathlib.Path): Path to increment. |
| | exist_ok (bool, optional): If True, the path will not be incremented and will be returned as-is. Defaults to False. |
| | sep (str, optional): Separator to use between the path and the incrementation number. Defaults to an empty string. |
| | mkdir (bool, optional): If True, the path will be created as a directory if it does not exist. Defaults to False. |
| | |
| | Returns: |
| | pathlib.Path: Incremented path. |
| | """ |
| | path = Path(path) |
| | if path.exists() and not exist_ok: |
| | path, suffix = (path.with_suffix(''), path.suffix) if path.is_file() else (path, '') |
| |
|
| | |
| | for n in range(2, 9999): |
| | p = f'{path}{sep}{n}{suffix}' |
| | if not os.path.exists(p): |
| | break |
| | path = Path(p) |
| |
|
| | if mkdir: |
| | path.mkdir(parents=True, exist_ok=True) |
| |
|
| | return path |
| |
|
| |
|
| | def cfg2dict(cfg): |
| | """ |
| | Convert a configuration object to a dictionary. |
| | |
| | This function converts a configuration object to a dictionary, whether it is a file path, a string, or a SimpleNamespace object. |
| | |
| | Inputs: |
| | cfg (str) or (Path) or (SimpleNamespace): Configuration object to be converted to a dictionary. |
| | |
| | Returns: |
| | cfg (dict): Configuration object in dictionary format. |
| | """ |
| | if isinstance(cfg, (str, Path)): |
| | cfg = yaml_load(cfg) |
| | elif isinstance(cfg, SimpleNamespace): |
| | cfg = vars(cfg) |
| | return cfg |
| |
|
| |
|
| | def get_cfg(cfg: Union[str, Path, Dict, SimpleNamespace] = None, overrides: Dict = None): |
| | """ |
| | Load and merge configuration data from a file or dictionary. |
| | |
| | Args: |
| | cfg (str) or (Path) or (Dict) or (SimpleNamespace): Configuration data. |
| | overrides (str) or (Dict), optional: Overrides in the form of a file name or a dictionary. Default is None. |
| | |
| | Returns: |
| | (SimpleNamespace): Training arguments namespace. |
| | """ |
| | cfg = cfg2dict(cfg) |
| |
|
| | |
| | if overrides: |
| | overrides = cfg2dict(overrides) |
| | cfg = {**cfg, **overrides} |
| |
|
| | |
| | for k in 'project', 'name': |
| | if k in cfg and isinstance(cfg[k], (int, float)): |
| | cfg[k] = str(cfg[k]) |
| |
|
| | |
| | for k, v in cfg.items(): |
| | if v is not None: |
| | if k in CFG_FLOAT_KEYS and not isinstance(v, (int, float)): |
| | raise TypeError(f"'{k}={v}' is of invalid type {type(v).__name__}. " |
| | f"Valid '{k}' types are int (i.e. '{k}=0') or float (i.e. '{k}=0.5')") |
| | elif k in CFG_FRACTION_KEYS: |
| | if not isinstance(v, (int, float)): |
| | raise TypeError(f"'{k}={v}' is of invalid type {type(v).__name__}. " |
| | f"Valid '{k}' types are int (i.e. '{k}=0') or float (i.e. '{k}=0.5')") |
| | if not (0.0 <= v <= 1.0): |
| | raise ValueError(f"'{k}={v}' is an invalid value. " |
| | f"Valid '{k}' values are between 0.0 and 1.0.") |
| | elif k in CFG_INT_KEYS and not isinstance(v, int): |
| | raise TypeError(f"'{k}={v}' is of invalid type {type(v).__name__}. " |
| | f"'{k}' must be an int (i.e. '{k}=0')") |
| | elif k in CFG_BOOL_KEYS and not isinstance(v, bool): |
| | raise TypeError(f"'{k}={v}' is of invalid type {type(v).__name__}. " |
| | f"'{k}' must be a bool (i.e. '{k}=True' or '{k}=False')") |
| |
|
| | |
| | return IterableSimpleNamespace(**cfg) |
| |
|
| |
|
| | def clip_boxes(boxes, shape): |
| | """ |
| | It takes a list of bounding boxes and a shape (height, width) and clips the bounding boxes to the |
| | shape |
| | |
| | Args: |
| | boxes (torch.Tensor): the bounding boxes to clip |
| | shape (tuple): the shape of the image |
| | """ |
| | if isinstance(boxes, torch.Tensor): |
| | boxes[..., 0].clamp_(0, shape[1]) |
| | boxes[..., 1].clamp_(0, shape[0]) |
| | boxes[..., 2].clamp_(0, shape[1]) |
| | boxes[..., 3].clamp_(0, shape[0]) |
| | else: |
| | boxes[..., [0, 2]] = boxes[..., [0, 2]].clip(0, shape[1]) |
| | boxes[..., [1, 3]] = boxes[..., [1, 3]].clip(0, shape[0]) |
| |
|
| |
|
| | def scale_boxes(img1_shape, boxes, img0_shape, ratio_pad=None): |
| | """ |
| | Rescales bounding boxes (in the format of xyxy) from the shape of the image they were originally specified in |
| | (img1_shape) to the shape of a different image (img0_shape). |
| | |
| | Args: |
| | img1_shape (tuple): The shape of the image that the bounding boxes are for, in the format of (height, width). |
| | boxes (torch.Tensor): the bounding boxes of the objects in the image, in the format of (x1, y1, x2, y2) |
| | img0_shape (tuple): the shape of the target image, in the format of (height, width). |
| | ratio_pad (tuple): a tuple of (ratio, pad) for scaling the boxes. If not provided, the ratio and pad will be |
| | calculated based on the size difference between the two images. |
| | |
| | Returns: |
| | boxes (torch.Tensor): The scaled bounding boxes, in the format of (x1, y1, x2, y2) |
| | """ |
| | if ratio_pad is None: |
| | gain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1]) |
| | pad = (img1_shape[1] - img0_shape[1] * gain) / 2, (img1_shape[0] - img0_shape[0] * gain) / 2 |
| | else: |
| | gain = ratio_pad[0][0] |
| | pad = ratio_pad[1] |
| |
|
| | boxes[..., [0, 2]] -= pad[0] |
| | boxes[..., [1, 3]] -= pad[1] |
| | boxes[..., :4] /= gain |
| | clip_boxes(boxes, img0_shape) |
| | return boxes |
| |
|
| |
|
| | def exif_size(img): |
| | |
| | s = img.size |
| | with contextlib.suppress(Exception): |
| | rotation = dict(img._getexif().items())[orientation] |
| | if rotation in [6, 8]: |
| | s = (s[1], s[0]) |
| | return s |
| |
|
| |
|
| | def verify_image_label(args): |
| | |
| | im_file, lb_file, prefix, keypoint, num_cls = args |
| | |
| | nm, nf, ne, nc, msg, segments, keypoints = 0, 0, 0, 0, "", [], None |
| | try: |
| | |
| | im = Image.open(im_file) |
| | im.verify() |
| | shape = exif_size(im) |
| | shape = (shape[1], shape[0]) |
| | assert (shape[0] > 9) & (shape[1] > 9), f"image size {shape} <10 pixels" |
| | assert im.format.lower() in IMG_FORMATS, f"invalid image format {im.format}" |
| | if im.format.lower() in ("jpg", "jpeg"): |
| | with open(im_file, "rb") as f: |
| | f.seek(-2, 2) |
| | |
| | |
| | if os.path.isfile(lb_file): |
| | nf = 1 |
| | with open(lb_file) as f: |
| | lb = [x.split() for x in f.read().strip().splitlines() if len(x)] |
| | if any(len(x) > 6 for x in lb) and (not keypoint): |
| | classes = np.array([x[0] for x in lb], dtype=np.float32) |
| | segments = [np.array(x[1:], dtype=np.float32).reshape(-1, 2) for x in lb] |
| | lb = np.concatenate((classes.reshape(-1, 1), segments2boxes(segments)), 1) |
| | lb = np.array(lb, dtype=np.float32) |
| | nl = len(lb) |
| | if nl: |
| | if keypoint: |
| | assert lb.shape[1] == 56, "labels require 56 columns each" |
| | assert (lb[:, 5::3] <= 1).all(), "non-normalized or out of bounds coordinate labels" |
| | assert (lb[:, 6::3] <= 1).all(), "non-normalized or out of bounds coordinate labels" |
| | kpts = np.zeros((lb.shape[0], 39)) |
| | for i in range(len(lb)): |
| | kpt = np.delete(lb[i, 5:], np.arange(2, lb.shape[1] - 5, 3)) |
| | kpts[i] = np.hstack((lb[i, :5], kpt)) |
| | lb = kpts |
| | assert lb.shape[1] == 39, "labels require 39 columns each after removing occlusion parameter" |
| | else: |
| | assert lb.shape[1] == 5, f"labels require 5 columns, {lb.shape[1]} columns detected" |
| | assert (lb[:, 1:] <= 1).all(), \ |
| | f"non-normalized or out of bounds coordinates {lb[:, 1:][lb[:, 1:] > 1]}" |
| | |
| | max_cls = int(lb[:, 0].max()) |
| | assert max_cls <= num_cls, \ |
| | f'Label class {max_cls} exceeds dataset class count {num_cls}. ' \ |
| | f'Possible class labels are 0-{num_cls - 1}' |
| | assert (lb >= 0).all(), f"negative label values {lb[lb < 0]}" |
| | _, i = np.unique(lb, axis=0, return_index=True) |
| | if len(i) < nl: |
| | lb = lb[i] |
| | if segments: |
| | segments = [segments[x] for x in i] |
| | msg = f"{prefix}WARNING ⚠️ {im_file}: {nl - len(i)} duplicate labels removed" |
| | else: |
| | ne = 1 |
| | lb = np.zeros((0, 39), dtype=np.float32) if keypoint else np.zeros((0, 5), dtype=np.float32) |
| | else: |
| | nm = 1 |
| | lb = np.zeros((0, 39), dtype=np.float32) if keypoint else np.zeros((0, 5), dtype=np.float32) |
| | if keypoint: |
| | keypoints = lb[:, 5:].reshape(-1, 17, 2) |
| | lb = lb[:, :5] |
| | return im_file, lb, shape, segments, keypoints, nm, nf, ne, nc, msg |
| | except Exception as e: |
| | nc = 1 |
| | msg = f"{prefix}WARNING ⚠️ {im_file}: ignoring corrupt image/label: {e}" |
| | return [None, None, None, None, None, nm, nf, ne, nc, msg] |