24 #ifndef FST_EXTENSIONS_FAR_STTABLE_H_ 25 #define FST_EXTENSIONS_FAR_STTABLE_H_ 35 #include <string_view> 48 template <
class T,
class Writer>
52 : stream_(source, std::ios_base::out | std::ios_base::binary),
57 FSTERROR() <<
"STTableWriter::STTableWriter: Error writing to file: " 65 LOG(ERROR) <<
"STTableWriter: Writing to standard out unsupported.";
71 void Add(std::string_view key,
const T &t) {
73 FSTERROR() <<
"STTableWriter::Add: Key empty: " << key;
75 }
else if (key < last_key_) {
76 FSTERROR() <<
"STTableWriter::Add: Key out of order: " << key;
80 last_key_.assign(key.data(), key.size());
81 positions_.push_back(stream_.tellp());
83 entry_writer_(stream_, t);
86 bool Error()
const {
return error_; }
90 WriteType(stream_, static_cast<int64_t>(positions_.size()));
95 std::ofstream stream_;
96 std::vector<int64_t> positions_;
97 std::string last_key_;
111 template <
class T,
class Reader>
115 : sources_(sources), error_(false) {
116 compare_.reset(
new Compare(&keys_));
117 keys_.resize(sources.size());
118 streams_.resize(sources.size(),
nullptr);
119 positions_.resize(sources.size());
120 for (
size_t i = 0; i < sources.size(); ++i) {
121 streams_[i] =
new std::ifstream(
122 sources[i], std::ios_base::in | std::ios_base::binary);
123 if (streams_[i]->fail()) {
124 FSTERROR() <<
"STTableReader::STTableReader: Error reading file: " 129 int32_t magic_number = 0;
130 ReadType(*streams_[i], &magic_number);
131 int32_t file_version = 0;
132 ReadType(*streams_[i], &file_version);
133 if (magic_number != kSTTableMagicNumber) {
134 FSTERROR() <<
"STTableReader::STTableReader: Wrong file type: " 139 if (file_version != kSTTableFileVersion) {
140 FSTERROR() <<
"STTableReader::STTableReader: Wrong file version: " 146 streams_[i]->seekg(-static_cast<int>(
sizeof(int64_t)),
148 ReadType(*streams_[i], &num_entries);
149 if (num_entries > 0) {
151 -static_cast<int>(
sizeof(int64_t)) * (num_entries + 1),
153 positions_[i].resize(num_entries);
154 for (
size_t j = 0; (j < num_entries) && (!streams_[i]->fail()); ++j) {
155 ReadType(*streams_[i], &(positions_[i][j]));
157 streams_[i]->seekg(positions_[i][0]);
158 if (streams_[i]->fail()) {
159 FSTERROR() <<
"STTableReader::STTableReader: Error reading file: " 170 for (
auto &stream : streams_)
delete stream;
174 if (source.empty()) {
175 LOG(ERROR) <<
"STTableReader: Operation not supported on standard input";
178 std::vector<std::string> sources;
179 sources.push_back(source);
184 const std::vector<std::string> &sources) {
190 for (
size_t i = 0; i < streams_.size(); ++i) {
191 if (!positions_[i].empty()) {
192 streams_[i]->seekg(positions_[i].front());
198 bool Find(std::string_view key) {
199 if (error_)
return false;
200 for (
size_t i = 0; i < streams_.size(); ++i) LowerBound(i, key);
202 if (heap_.empty())
return false;
203 return keys_[current_] == key;
206 bool Done()
const {
return error_ || heap_.empty(); }
210 if (streams_[current_]->tellg() <= positions_[current_].back()) {
211 ReadType(*(streams_[current_]), &(keys_[current_]));
212 if (streams_[current_]->fail()) {
213 FSTERROR() <<
"STTableReader: Error reading file: " 214 << sources_[current_];
218 std::push_heap(heap_.begin(), heap_.end(), *compare_);
222 if (!heap_.empty()) PopHeap();
225 const std::string &
GetKey()
const {
return keys_[current_]; }
229 bool Error()
const {
return error_; }
234 explicit Compare(
const std::vector<std::string> *keys) : keys(keys) {}
236 bool operator()(
size_t i,
size_t j)
const {
237 return (*keys)[i] > (*keys)[j];
241 const std::vector<std::string> *keys;
246 void LowerBound(
size_t id, std::string_view find_key) {
247 auto *strm = streams_[id];
248 const auto &positions = positions_[id];
249 if (positions.empty())
return;
251 size_t high = positions.size() - 1;
253 size_t mid = (low + high) / 2;
254 strm->seekg(positions[mid]);
257 if (key > find_key) {
259 }
else if (key < find_key) {
262 for (
size_t i = mid; i > low; --i) {
263 strm->seekg(positions[i - 1]);
265 if (key != find_key) {
266 strm->seekg(positions[i]);
270 strm->seekg(positions[low]);
274 strm->seekg(positions[low]);
280 for (
size_t i = 0; i < streams_.size(); ++i) {
281 if (positions_[i].empty())
continue;
282 ReadType(*streams_[i], &(keys_[i]));
283 if (streams_[i]->fail()) {
284 FSTERROR() <<
"STTableReader: Error reading file: " << sources_[i];
290 if (heap_.empty())
return;
291 std::make_heap(heap_.begin(), heap_.end(), *compare_);
299 std::pop_heap(heap_.begin(), heap_.end(), *compare_);
300 current_ = heap_.back();
301 entry_.reset(entry_reader_(*streams_[current_]));
302 if (!entry_) error_ =
true;
303 if (streams_[current_]->fail()) {
304 FSTERROR() <<
"STTableReader: Error reading entry for key: " 305 << keys_[current_] <<
", file: " << sources_[current_];
310 Reader entry_reader_;
311 std::vector<std::istream *> streams_;
312 std::vector<std::string> sources_;
313 std::vector<std::vector<int64_t>> positions_;
314 std::vector<std::string> keys_;
318 std::unique_ptr<Compare> compare_;
319 mutable std::unique_ptr<T> entry_;
329 template <
class Header>
331 if (source.empty()) {
332 LOG(ERROR) <<
"ReadSTTable: Can't read header from standard input";
335 std::ifstream strm(source, std::ios_base::in | std::ios_base::binary);
337 LOG(ERROR) <<
"ReadSTTableHeader: Could not open file: " << source;
340 int32_t magic_number = 0;
342 int32_t file_version = 0;
344 if (magic_number != kSTTableMagicNumber) {
345 LOG(ERROR) <<
"ReadSTTableHeader: Wrong file type: " << source;
348 if (file_version != kSTTableFileVersion) {
349 LOG(ERROR) <<
"ReadSTTableHeader: Wrong file version: " << source;
353 strm.seekg(-static_cast<int>(
sizeof(int64_t)), std::ios_base::end);
356 LOG(ERROR) <<
"ReadSTTableHeader: Error reading file: " << source;
359 if (i == 0)
return true;
360 strm.seekg(-2 * static_cast<int>(
sizeof(int64_t)), std::ios_base::end);
365 if (!header->Read(strm, source +
":" + key)) {
366 LOG(ERROR) <<
"ReadSTTableHeader: Error reading FstHeader: " << source;
370 LOG(ERROR) <<
"ReadSTTableHeader: Error reading file: " << source;
376 bool IsSTTable(
const std::string &source);
380 #endif // FST_EXTENSIONS_FAR_STTABLE_H_ static STTableReader< T, Reader > * Open(const std::string &source)
constexpr int32_t kSTTableMagicNumber
constexpr int32_t kSTTableFileVersion
static STTableWriter< T, Writer > * Create(const std::string &source)
void Add(std::string_view key, const T &t)
bool Find(std::string_view key)
static STTableReader< T, Reader > * Open(const std::vector< std::string > &sources)
std::ostream & WriteType(std::ostream &strm, const T t)
STTableWriter(const std::string &source)
STTableReader(const std::vector< std::string > &sources)
const std::string & GetKey() const
bool ReadSTTableHeader(const std::string &source, Header *header)
bool IsSTTable(const std::string &source)
const T * GetEntry() const
std::istream & ReadType(std::istream &strm, T *t)