"""
Class to read DBF files.
"""
import os
import sys
import datetime
import collections
from .ifiles import ifind
from .struct_parser import StructParser
from .field_parser import FieldParser
from .memo import find_memofile, open_memofile, FakeMemoFile, BinaryMemo
from .codepages import guess_encoding
from .dbversions import get_dbversion_string
from .exceptions import *
DBFHeader = StructParser(
'DBFHeader',
'<BBBBLHHHBBLLLBBH',
['dbversion',
'year',
'month',
'day',
'numrecords',
'headerlen',
'recordlen',
'reserved1',
'incomplete_transaction',
'encryption_flag',
'free_record_thread',
'reserved2',
'reserved3',
'mdx_flag',
'language_driver',
'reserved4',
])
DBFField = StructParser(
'DBFField',
'<11scLBBHBBBB7sB',
['name',
'type',
'address',
'length',
'decimal_count',
'reserved1',
'workarea_id',
'reserved2',
'reserved3',
'set_fields_flag',
'reserved4',
'index_field_flag',
])
[docs]def expand_year(year):
"""Convert 2-digit year to 4-digit year."""
if year < 80:
return 2000 + year
else:
return 1900 + year
[docs]class RecordIterator(object):
def __init__(self, table, record_type):
self._record_type = record_type
self._table = table
def __iter__(self):
return self._table._iter_records(self._record_type)
def __len__(self):
return self._table._count_records(self._record_type)
[docs]class DBF(object):
"""DBF table."""
def __init__(self, filename, encoding=None, ignorecase=True,
lowernames=False,
parserclass=FieldParser,
recfactory=collections.OrderedDict,
load=False,
raw=False,
ignore_missing_memofile=False):
self.encoding = encoding
self.ignorecase = ignorecase
self.lowernames = lowernames
self.parserclass = parserclass
self.raw = raw
self.ignore_missing_memofile = ignore_missing_memofile
if recfactory is None:
self.recfactory = lambda items: items
else:
self.recfactory = recfactory
# Name part before .dbf is the table name
self.name = os.path.basename(filename)
self.name = os.path.splitext(self.name)[0].lower()
self._records = None
self._deleted = None
if ignorecase:
self.filename = ifind(filename)
if not self.filename:
raise DBFNotFound('could not find file {!r}'.format(filename))
else:
self.filename = filename
# Filled in by self._read_headers()
self.memofilename = None
self.header = None
self.fields = [] # namedtuples
self.field_names = [] # strings
with open(self.filename, mode='rb') as infile:
self._read_header(infile)
self._read_field_headers(infile)
self._check_headers()
try:
self.date = datetime.date(expand_year(self.header.year),
self.header.month,
self.header.day)
except ValueError:
# Invalid date or '\x00\x00\x00'.
self.date = None
self.memofilename = self._get_memofilename()
if load:
self.load()
@property
def dbversion(self):
return get_dbversion_string(self.header.dbversion)
def _get_memofilename(self):
# Does the table have a memo field?
field_types = [field.type for field in self.fields]
if not set(field_types) & set('MGPB'):
# No memo fields.
return None
path = find_memofile(self.filename)
if path is None:
if self.ignore_missing_memofile:
return None
raise MissingMemoFile('missing memo file for {}'.format(
self.filename))
else:
return path
@property
def loaded(self):
"""``True`` if records are loaded into memory."""
return self._records is not None
[docs] def load(self):
"""Load records into memory.
This loads both records and deleted records. The ``records``
and ``deleted`` attributes will now be lists of records.
"""
if not self.loaded:
self._records = list(self._iter_records(b' '))
self._deleted = list(self._iter_records(b'*'))
[docs] def unload(self):
"""Unload records from memory.
The records and deleted attributes will now be instances of
``RecordIterator``, which streams records from disk.
"""
self._records = None
self._deleted = None
@property
def records(self):
"""Records (not included deleted ones). When loaded a list of records,
when not loaded a new ``RecordIterator`` object.
"""
if self.loaded:
return self._records
else:
return RecordIterator(self, b' ')
@property
def deleted(self):
"""Deleted records. When loaded a list of records, when not loaded a
new ``RecordIterator`` object.
"""
if self.loaded:
return self._deleted
else:
return RecordIterator(self, b'*')
def _read_header(self, infile):
# Todo: more checks?
self.header = DBFHeader.read(infile)
if self.encoding is None:
try:
self.encoding = guess_encoding(self.header.language_driver)
except LookupError as err:
self.encoding = 'ascii'
def _read_field_headers(self, infile):
while True:
sep = infile.read(1)
if sep in (b'\r', b'\n', b''):
# End of field headers
break
field = DBFField.unpack(sep + infile.read(DBFField.size - 1))
field.type = chr(ord(field.type))
# Field name is b'\0' terminated.
field.name = field.name.split(b'\0')[0].decode(self.encoding)
if self.lowernames:
field.name = field.name.lower()
self.field_names.append(field.name)
self.fields.append(field)
def _open_memofile(self):
if self.memofilename and not self.raw:
return open_memofile(self.memofilename, self.header.dbversion)
else:
return FakeMemoFile(self.memofilename)
def _check_headers(self):
field_parser = self.parserclass(self)
"""Check headers for possible format errors."""
for field in self.fields:
if field.type == 'I' and field.length != 4:
message = 'Field type I must have length 4 (was {})'
raise ValueError(message.format(field.length))
elif field.type == 'L' and field.length != 1:
message = 'Field type L must have length 1 (was {})'
raise ValueError(message.format(field.length))
elif not field_parser.field_type_supported(field.type):
# Todo: return as byte string?
raise ValueError('Unknown field type: {!r}'.format(field.type))
def _skip_record(self, infile):
# -1 for the record separator which was already read.
infile.seek(self.header.recordlen - 1, 1)
def _count_records(self, record_type=b' '):
count = 0
with open(self.filename, 'rb') as infile:
# Skip to first record.
infile.seek(self.header.headerlen, 0)
while True:
sep = infile.read(1)
if sep == record_type:
count += 1
self._skip_record(infile)
elif sep in (b'\x1a', b''):
# End of records.
break
else:
self._skip_record(infile)
return count
def _iter_records(self, record_type=b' '):
with open(self.filename, 'rb') as infile, \
self._open_memofile() as memofile:
# Skip to first record.
infile.seek(self.header.headerlen, 0)
if not self.raw:
field_parser = self.parserclass(self, memofile)
parse = field_parser.parse
# Shortcuts for speed.
skip_record = self._skip_record
read = infile.read
while True:
sep = read(1)
if sep == record_type:
if self.raw:
items = [(field.name, read(field.length)) \
for field in self.fields]
else:
items = [(field.name,
parse(field, read(field.length))) \
for field in self.fields]
yield self.recfactory(items)
elif sep in (b'\x1a', b''):
# End of records.
break
else:
skip_record(infile)
def __iter__(self):
if self.loaded:
return list.__iter__(self._records)
else:
return self._iter_records()
def __len__(self):
return len(self.records)
def __repr__(self):
if self.loaded:
status = 'loaded'
else:
status = 'unloaded'
return '<{} DBF table {!r}>'.format(status, self.filename)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.unload()
return False