Skip to content

Commit

Permalink
added new property 'is_valid' to check if the mmap of the numpy array…
Browse files Browse the repository at this point in the history
… is valid or not - basically this can be queued to see if the ndsharray writer is already initialized or not
  • Loading branch information
monzelr committed Jun 12, 2023
1 parent fcecc3a commit 967a621
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 61 deletions.
2 changes: 1 addition & 1 deletion ndsharray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

__author__ = 'Rune Monzel'
__email__ = '[email protected]'
__version__ = '1.0.1'
__version__ = '1.1.0'
__all__ = ["NdShArray",
"supported_types"]

142 changes: 82 additions & 60 deletions ndsharray/ndsharray.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ def __init__(self, name: str, array: np.ndarray = np.ndarray((0, ), dtype=np.uin
self._ndarray_mmap: Union[None, mmap.mmap] = None
self._ndarray_fd: Union[None, int] = None

self._is_valid = False

# buffer size of the _mmap_ndarray
_bytes = self._array_to_bytes(self._array)
self._buffer_size: int = len(_bytes)
Expand All @@ -124,7 +126,7 @@ def __init__(self, name: str, array: np.ndarray = np.ndarray((0, ), dtype=np.uin
self._mmap, self._fd = self._create_mmap(self._name, len(self.ndarray_mmap_name), r_w=self._access)

# create ndarray mmap
self._create_ndarray_mmap()
self._is_valid = self._create_ndarray_mmap()

if self._access == "w":
self.write(array) # call write to force saving the array via mmap.flush!
Expand All @@ -147,6 +149,15 @@ def name(self) -> str:
"""
return self._name

@property
def is_valid(self) -> bool:
"""
checks if the header of the numpy array is valid or not
:return:
"""
return self._is_valid

@property
def ndarray_mmap_name(self) -> str:
"""
Expand Down Expand Up @@ -333,36 +344,37 @@ def read(self) -> Tuple[bool, np.ndarray]:
self._create_ndarray_mmap()
_recreated_map = True

# first stage of checking if new data have been arrived
self._ndarray_mmap.seek(0)
_bytes = self._ndarray_mmap.read(8)
try:
_write_time = struct.unpack("d", _bytes)[0]
except ValueError:
_write_time = 0
if _write_time <= self._last_write_time and not _recreated_map:
return False, _numpy_array

# without checking, read the whole buffer
_bytes += self._ndarray_mmap.read()

if len(_bytes) != self._buffer_size:
self._create_ndarray_mmap()

_mmap_correct, _validity, _numpy_array = self._bytes_to_array(_bytes)
if not _mmap_correct:
warnings.warn("The mmap of the ndarray seems to be corrupt and the used protocol does not fit.",
BytesWarning)

# for efficiency
self._array = _numpy_array
# for debug purpose
self._read_time_ms = (time.monotonic()-_write_time) * 1000.0
self._last_write_time = _write_time
if self._is_valid:
# first stage of checking if new data have been arrived
self._ndarray_mmap.seek(0)
_bytes = self._ndarray_mmap.read(8)
try:
_write_time = struct.unpack("d", _bytes)[0]
except ValueError:
_write_time = 0
if _write_time <= self._last_write_time and not _recreated_map:
return False, _numpy_array

# without checking, read the whole buffer
_bytes += self._ndarray_mmap.read()

if len(_bytes) != self._buffer_size:
self._create_ndarray_mmap()

_mmap_correct, _validity, _numpy_array = self._bytes_to_array(_bytes)
if not _mmap_correct:
warnings.warn("The mmap of the ndarray seems to be corrupt and the used protocol does not fit.",
BytesWarning)

# for efficiency
self._array = _numpy_array
# for debug purpose
self._read_time_ms = (time.monotonic()-_write_time) * 1000.0
self._last_write_time = _write_time

return _validity, _numpy_array

def _create_ndarray_mmap(self) -> None:
def _create_ndarray_mmap(self) -> bool:
"""
creates two mmap:
- the mmap with tag 'name' just holds the mmap-tag-name of ndarray
Expand All @@ -386,38 +398,48 @@ def _create_ndarray_mmap(self) -> None:
self._mmap.seek(0)
_ndarray_mmap_name = bytes_to_str(self._mmap.read(len(self._name)+33))
self._uuid = _ndarray_mmap_name[-32:]

# create temporary mmap to get the dtype and dimension of the array
_tmp_mmap, _tmp_fd = self._create_mmap(self.ndarray_mmap_name, 8+2*n_bytes_for_int, r_w="r")
_tmp_mmap.seek(0)
_bytes = _tmp_mmap.read(8+2*n_bytes_for_int) # skip the time: +8
idx = 8
_np_dtype = supported_types[bytes_to_int(_bytes[idx:idx+n_bytes_for_int])]
idx += n_bytes_for_int
_np_dim = bytes_to_int(_bytes[idx:idx+n_bytes_for_int])
self._close_mmap(_tmp_mmap, _tmp_fd)

# create temporary mmap to get the shape of the array
_tmp_2_mmap, _tmp_2_fd = self._create_mmap(self.ndarray_mmap_name,
8 + 2 * n_bytes_for_int + _np_dim * n_bytes_for_int,
r_w="r")
_tmp_2_mmap.seek(8+2*n_bytes_for_int) # skip the time, dtype and dimension
# read shape
_bytes += _tmp_2_mmap.read(_np_dim * n_bytes_for_int)
idx = 8 + 2 * n_bytes_for_int
_np_shape = []
for s in range(_np_dim):
_np_shape.append(bytes_to_int(_bytes[idx:idx + n_bytes_for_int]))
idx += n_bytes_for_int
_np_shape = tuple(_np_shape)
self._close_mmap(_tmp_2_mmap, _tmp_2_fd)

# rebuild _array and get the length of the byte array -> super lazy and inefficient...
self._array = np.ndarray(_np_shape, dtype=_np_dtype)
self._buffer_size = len(self._array_to_bytes(self._array))

self._ndarray_mmap, self._ndarray_fd = self._create_mmap(self.ndarray_mmap_name, self._buffer_size,
r_w=self._access)
try:
int(self._uuid, 16)
self._is_valid = True
except ValueError:
self._is_valid = False

try:
if self._is_valid:
# create temporary mmap to get the dtype and dimension of the array
_tmp_mmap, _tmp_fd = self._create_mmap(self.ndarray_mmap_name, 8+2*n_bytes_for_int, r_w="r")
_tmp_mmap.seek(0)
_bytes = _tmp_mmap.read(8+2*n_bytes_for_int) # skip the time: +8
idx = 8
_np_dtype = supported_types[bytes_to_int(_bytes[idx:idx+n_bytes_for_int])]
idx += n_bytes_for_int
_np_dim = bytes_to_int(_bytes[idx:idx+n_bytes_for_int])
self._close_mmap(_tmp_mmap, _tmp_fd)

# create temporary mmap to get the shape of the array
_tmp_2_mmap, _tmp_2_fd = self._create_mmap(self.ndarray_mmap_name,
8 + 2 * n_bytes_for_int + _np_dim * n_bytes_for_int,
r_w="r")
_tmp_2_mmap.seek(8+2*n_bytes_for_int) # skip the time, dtype and dimension
# read shape
_bytes += _tmp_2_mmap.read(_np_dim * n_bytes_for_int)
idx = 8 + 2 * n_bytes_for_int
_np_shape = []
for s in range(_np_dim):
_np_shape.append(bytes_to_int(_bytes[idx:idx + n_bytes_for_int]))
idx += n_bytes_for_int
_np_shape = tuple(_np_shape)
self._close_mmap(_tmp_2_mmap, _tmp_2_fd)

# rebuild _array and get the length of the byte array -> super lazy and inefficient...
self._array = np.ndarray(_np_shape, dtype=_np_dtype)
self._buffer_size = len(self._array_to_bytes(self._array))

self._ndarray_mmap, self._ndarray_fd = self._create_mmap(self.ndarray_mmap_name, self._buffer_size,
r_w=self._access)
except:
self.is_valid = False
return self._is_valid

@staticmethod
def _create_mmap(name: str, buffer_size: int, r_w: str) -> Tuple[mmap.mmap, Union[None, int]]:
Expand Down

0 comments on commit 967a621

Please sign in to comment.