souffle  2.0.2-371-g6315b36
ReadStreamCSV.h
Go to the documentation of this file.
1 /*
2  * Souffle - A Datalog Compiler
3  * Copyright (c) 2013, 2014, Oracle and/or its affiliates. All rights reserved
4  * Licensed under the Universal Permissive License v 1.0 as shown at:
5  * - https://opensource.org/licenses/UPL
6  * - <souffle root>/licenses/SOUFFLE-UPL.txt
7  */
8 
9 /************************************************************************
10  *
11  * @file ReadStreamCSV.h
12  *
13  ***********************************************************************/
14 
15 #pragma once
16 
17 #include "souffle/RamTypes.h"
18 #include "souffle/SymbolTable.h"
19 #include "souffle/io/ReadStream.h"
23 
24 #ifdef USE_LIBZ
25 #include "souffle/io/gzfstream.h"
26 #else
27 #include <fstream>
28 #endif
29 
30 #include <algorithm>
31 #include <cassert>
32 #include <cstddef>
33 #include <cstdint>
34 #include <iostream>
35 #include <map>
36 #include <memory>
37 #include <sstream>
38 #include <stdexcept>
39 #include <string>
40 #include <vector>
41 
42 namespace souffle {
43 class RecordTable;
44 
45 class ReadStreamCSV : public ReadStream {
46 public:
47  ReadStreamCSV(std::istream& file, const std::map<std::string, std::string>& rwOperation,
48  SymbolTable& symbolTable, RecordTable& recordTable)
50  delimiter(getOr(rwOperation, "delimiter", "\t")), file(file), lineNumber(0),
51  inputMap(getInputColumnMap(rwOperation, static_cast<unsigned int>(arity))) {
52  while (inputMap.size() < arity) {
53  int size = static_cast<int>(inputMap.size());
54  inputMap[size] = size;
55  }
56  }
57 
58 protected:
59  /**
60  * Read and return the next tuple.
61  *
62  * Returns nullptr if no tuple was readable.
63  * @return
64  */
65  Own<RamDomain[]> readNextTuple() override {
66  if (file.eof()) {
67  return nullptr;
68  }
69  std::string line;
70  Own<RamDomain[]> tuple = std::make_unique<RamDomain[]>(typeAttributes.size());
71 
72  if (!getline(file, line)) {
73  return nullptr;
74  }
75  // Handle Windows line endings on non-Windows systems
76  if (!line.empty() && line.back() == '\r') {
77  line = line.substr(0, line.length() - 1);
78  }
79  ++lineNumber;
80 
81  size_t start = 0;
82  size_t end = 0;
83  size_t columnsFilled = 0;
84  for (uint32_t column = 0; columnsFilled < arity; column++) {
85  size_t charactersRead = 0;
86  std::string element = nextElement(line, start, end);
87  if (inputMap.count(column) == 0) {
88  continue;
89  }
90  ++columnsFilled;
91 
92  try {
93  auto&& ty = typeAttributes.at(inputMap[column]);
94  switch (ty[0]) {
95  case 's': {
96  tuple[inputMap[column]] = symbolTable.unsafeLookup(element);
97  charactersRead = element.size();
98  break;
99  }
100  case 'r': {
101  tuple[inputMap[column]] = readRecord(element, ty, 0, &charactersRead);
102  break;
103  }
104  case '+': {
105  tuple[inputMap[column]] = readADT(element, ty, 0, &charactersRead);
106  break;
107  }
108  case 'i': {
109  tuple[inputMap[column]] = RamSignedFromString(element, &charactersRead);
110  break;
111  }
112  case 'u': {
113  tuple[inputMap[column]] = ramBitCast(readRamUnsigned(element, charactersRead));
114  break;
115  }
116  case 'f': {
117  tuple[inputMap[column]] = ramBitCast(RamFloatFromString(element, &charactersRead));
118  break;
119  }
120  default: fatal("invalid type attribute: `%c`", ty[0]);
121  }
122  // Check if everything was read.
123  if (charactersRead != element.size()) {
124  throw std::invalid_argument(
125  "Expected: " + delimiter + " or \\n. Got: " + element[charactersRead]);
126  }
127  } catch (...) {
128  std::stringstream errorMessage;
129  errorMessage << "Error converting <" + element + "> in column " << column + 1 << " in line "
130  << lineNumber << "; ";
131  throw std::invalid_argument(errorMessage.str());
132  }
133  }
134 
135  return tuple;
136  }
137 
138  /**
139  * Read an unsigned element. Possible bases are 2, 10, 16
140  * Base is indicated by the first two chars.
141  */
142  RamUnsigned readRamUnsigned(const std::string& element, size_t& charactersRead) {
143  // Sanity check
144  assert(element.size() > 0);
145 
146  RamSigned value = 0;
147 
148  // Check prefix and parse the input.
149  if (isPrefix("0b", element)) {
150  value = RamUnsignedFromString(element, &charactersRead, 2);
151  } else if (isPrefix("0x", element)) {
152  value = RamUnsignedFromString(element, &charactersRead, 16);
153  } else {
154  value = RamUnsignedFromString(element, &charactersRead);
155  }
156  return value;
157  }
158 
159  std::string nextElement(const std::string& line, size_t& start, size_t& end) {
160  std::string element;
161 
162  // Handle record/tuple delimiter coincidence.
163  if (delimiter.find(',') != std::string::npos) {
164  int record_parens = 0;
165  size_t next_delimiter = line.find(delimiter, start);
166 
167  // Find first delimiter after the record.
168  while (end < std::min(next_delimiter, line.length()) || record_parens != 0) {
169  // Track the number of parenthesis.
170  if (line[end] == '[') {
171  ++record_parens;
172  } else if (line[end] == ']') {
173  --record_parens;
174  }
175 
176  // Check for unbalanced parenthesis.
177  if (record_parens < 0) {
178  break;
179  };
180 
181  ++end;
182 
183  // Find a next delimiter if the old one is invalid.
184  // But only if inside the unbalance parenthesis.
185  if (end == next_delimiter && record_parens != 0) {
186  next_delimiter = line.find(delimiter, end);
187  }
188  }
189 
190  // Handle the end-of-the-line case where parenthesis are unbalanced.
191  if (record_parens != 0) {
192  std::stringstream errorMessage;
193  errorMessage << "Unbalanced record parenthesis " << lineNumber << "; ";
194  throw std::invalid_argument(errorMessage.str());
195  }
196  } else {
197  end = std::min(line.find(delimiter, start), line.length());
198  }
199 
200  // Check for missing value.
201  if (start > end) {
202  std::stringstream errorMessage;
203  errorMessage << "Values missing in line " << lineNumber << "; ";
204  throw std::invalid_argument(errorMessage.str());
205  }
206 
207  element = line.substr(start, end - start);
208  start = end + delimiter.size();
209 
210  return element;
211  }
212 
213  std::map<int, int> getInputColumnMap(
214  const std::map<std::string, std::string>& rwOperation, const unsigned arity_) const {
215  std::string columnString = getOr(rwOperation, "columns", "");
216  std::map<int, int> inputColumnMap;
217 
218  if (!columnString.empty()) {
219  std::istringstream iss(columnString);
220  std::string mapping;
221  int index = 0;
222  while (std::getline(iss, mapping, ':')) {
223  inputColumnMap[stoi(mapping)] = index++;
224  }
225  if (inputColumnMap.size() < arity_) {
226  throw std::invalid_argument("Invalid column set was given: <" + columnString + ">");
227  }
228  } else {
229  while (inputColumnMap.size() < arity_) {
230  int size = static_cast<int>(inputColumnMap.size());
231  inputColumnMap[size] = size;
232  }
233  }
234  return inputColumnMap;
235  }
236 
237  const std::string delimiter;
238  std::istream& file;
239  size_t lineNumber;
240  std::map<int, int> inputMap;
241 };
242 
243 class ReadFileCSV : public ReadStreamCSV {
244 public:
245  ReadFileCSV(const std::map<std::string, std::string>& rwOperation, SymbolTable& symbolTable,
248  baseName(souffle::baseName(getFileName(rwOperation))),
249  fileHandle(getFileName(rwOperation), std::ios::in | std::ios::binary) {
250  if (!fileHandle.is_open()) {
251  throw std::invalid_argument("Cannot open fact file " + baseName + "\n");
252  }
253  // Strip headers if we're using them
254  if (getOr(rwOperation, "headers", "false") == "true") {
255  std::string line;
256  getline(file, line);
257  }
258  }
259 
260  /**
261  * Read and return the next tuple.
262  *
263  * Returns nullptr if no tuple was readable.
264  * @return
265  */
266  Own<RamDomain[]> readNextTuple() override {
267  try {
269  } catch (std::exception& e) {
270  std::stringstream errorMessage;
271  errorMessage << e.what();
272  errorMessage << "cannot parse fact file " << baseName << "!\n";
273  throw std::invalid_argument(errorMessage.str());
274  }
275  }
276 
277  ~ReadFileCSV() override = default;
278 
279 protected:
280  /**
281  * Return given filename or construct from relation name.
282  * Default name is [configured path]/[relation name].facts
283  *
284  * @param rwOperation map of IO configuration options
285  * @return input filename
286  */
287  static std::string getFileName(const std::map<std::string, std::string>& rwOperation) {
288  auto name = getOr(rwOperation, "filename", rwOperation.at("name") + ".facts");
289  if (name.front() != '/') {
290  name = getOr(rwOperation, "fact-dir", ".") + "/" + name;
291  }
292  return name;
293  }
294 
295  std::string baseName;
296 #ifdef USE_LIBZ
297  gzfstream::igzfstream fileHandle;
298 #else
299  std::ifstream fileHandle;
300 #endif
301 };
302 
304 public:
305  Own<ReadStream> getReader(const std::map<std::string, std::string>& rwOperation, SymbolTable& symbolTable,
306  RecordTable& recordTable) override {
307  return mk<ReadStreamCSV>(std::cin, rwOperation, symbolTable, recordTable);
308  }
309 
310  const std::string& getName() const override {
311  static const std::string name = "stdin";
312  return name;
313  }
314  ~ReadCinCSVFactory() override = default;
315 };
316 
317 class ReadFileCSVFactory : public ReadStreamFactory {
318 public:
319  Own<ReadStream> getReader(const std::map<std::string, std::string>& rwOperation, SymbolTable& symbolTable,
320  RecordTable& recordTable) override {
321  return mk<ReadFileCSV>(rwOperation, symbolTable, recordTable);
322  }
323 
324  const std::string& getName() const override {
325  static const std::string name = "file";
326  return name;
327  }
328 
329  ~ReadFileCSVFactory() override = default;
330 };
331 
332 } /* namespace souffle */
souffle::RamUnsigned
uint32_t RamUnsigned
Definition: RamTypes.h:58
souffle::RamSignedFromString
RamSigned RamSignedFromString(const std::string &str, std::size_t *position=nullptr, const int base=10)
Converts a string to a RamSigned.
Definition: StringUtil.h:51
souffle::ReadStreamCSV::ReadStreamCSV
ReadStreamCSV(std::istream &file, const std::map< std::string, std::string > &rwOperation, SymbolTable &symbolTable, RecordTable &recordTable)
Definition: ReadStreamCSV.h:51
TCB_SPAN_NAMESPACE_NAME::detail::size
constexpr auto size(const C &c) -> decltype(c.size())
Definition: span.h:198
souffle::ReadFileCSVFactory::getName
const std::string & getName() const override
Definition: ReadStreamCSV.h:328
souffle::SerialisationStream< false >::recordTable
RO< RecordTable > & recordTable
Definition: SerialisationStream.h:72
souffle::ReadStreamCSV::readRamUnsigned
RamUnsigned readRamUnsigned(const std::string &element, size_t &charactersRead)
Read an unsigned element.
Definition: ReadStreamCSV.h:146
souffle::isPrefix
bool isPrefix(const std::string &prefix, const std::string &element)
Determine if one string is a prefix of another.
Definition: StringUtil.h:292
souffle::ReadFileCSV
Definition: ReadStreamCSV.h:247
SymbolTable.h
souffle::RecordTable
Definition: RecordTable.h:114
souffle::ReadCinCSVFactory::getName
const std::string & getName() const override
Definition: ReadStreamCSV.h:314
e
l j a showGridBackground &&c b raw series this eventEmitter e
Definition: htmlJsChartistMin.h:15
souffle::Own
std::unique_ptr< A > Own
Definition: ContainerUtil.h:42
souffle::ReadFileCSV::fileHandle
std::ifstream fileHandle
Definition: ReadStreamCSV.h:303
gzfstream.h
souffle::SerialisationStream< false >::symbolTable
RO< SymbolTable > & symbolTable
Definition: SerialisationStream.h:71
souffle::ReadStreamFactory
Definition: ReadStream.h:311
souffle::SerialisationStream< false >::typeAttributes
std::vector< std::string > typeAttributes
Definition: SerialisationStream.h:74
souffle::ReadFileCSV::getFileName
static std::string getFileName(const std::map< std::string, std::string > &rwOperation)
Return given filename or construct from relation name.
Definition: ReadStreamCSV.h:291
souffle::ReadFileCSV::ReadFileCSV
ReadFileCSV(const std::map< std::string, std::string > &rwOperation, SymbolTable &symbolTable, RecordTable &recordTable)
Definition: ReadStreamCSV.h:249
souffle::getOr
C::mapped_type const & getOr(const C &container, typename C::key_type key, const typename C::mapped_type &defaultValue)
Get value for a given key; if not found, return default value.
Definition: ContainerUtil.h:111
souffle::ReadStreamCSV
Definition: ReadStreamCSV.h:49
ReadStream.h
ContainerUtil.h
souffle::ReadFileCSVFactory::getReader
Own< ReadStream > getReader(const std::map< std::string, std::string > &rwOperation, SymbolTable &symbolTable, RecordTable &recordTable) override
Definition: ReadStreamCSV.h:323
StringUtil.h
souffle::SymbolTable
Definition: SymbolTable.h:48
souffle::ReadFileCSV::readNextTuple
Own< RamDomain[]> readNextTuple() override
Read and return the next tuple.
Definition: ReadStreamCSV.h:270
souffle::ReadFileCSVFactory
Definition: ReadStreamCSV.h:321
souffle::ReadStreamCSV::nextElement
std::string nextElement(const std::string &line, size_t &start, size_t &end)
Definition: ReadStreamCSV.h:163
souffle::ReadCinCSVFactory::~ReadCinCSVFactory
~ReadCinCSVFactory() override=default
souffle::ReadStreamCSV::getInputColumnMap
std::map< int, int > getInputColumnMap(const std::map< std::string, std::string > &rwOperation, const unsigned arity_) const
Definition: ReadStreamCSV.h:217
souffle::ReadCinCSVFactory::getReader
Own< ReadStream > getReader(const std::map< std::string, std::string > &rwOperation, SymbolTable &symbolTable, RecordTable &recordTable) override
Definition: ReadStreamCSV.h:309
std
Definition: Brie.h:3053
RamTypes.h
souffle::ReadStreamCSV::readNextTuple
Own< RamDomain[]> readNextTuple() override
Read and return the next tuple.
Definition: ReadStreamCSV.h:69
souffle::ReadFileCSV::baseName
std::string baseName
Definition: ReadStreamCSV.h:299
souffle::fatal
void fatal(const char *format, const Args &... args)
Definition: MiscUtil.h:198
souffle::ReadFileCSVFactory::~ReadFileCSVFactory
~ReadFileCSVFactory() override=default
FileUtil.h
souffle
Definition: AggregateOp.h:25
souffle::RamUnsignedFromString
RamUnsigned RamUnsignedFromString(const std::string &str, std::size_t *position=nullptr, const int base=10)
Converts a string to a RamUnsigned.
Definition: StringUtil.h:110
souffle::SerialisationStream< false >::arity
size_t arity
Definition: SerialisationStream.h:76
souffle::ramBitCast
To ramBitCast(From source)
In C++20 there will be a new way to cast between types by reinterpreting bits (std::bit_cast),...
Definition: RamTypes.h:87
souffle::RamSigned
RamDomain RamSigned
Definition: RamTypes.h:57
souffle::ReadStreamCSV::file
std::istream & file
Definition: ReadStreamCSV.h:242
souffle::ReadStreamCSV::lineNumber
size_t lineNumber
Definition: ReadStreamCSV.h:243
souffle::RamFloatFromString
RamFloat RamFloatFromString(const std::string &str, std::size_t *position=nullptr)
Converts a string to a RamFloat.
Definition: StringUtil.h:93
souffle::ReadStream::readADT
RamDomain readADT(const std::string &source, const std::string &adtName, size_t pos=0, size_t *charactersRead=nullptr)
Definition: ReadStream.h:143
souffle::ReadCinCSVFactory
Definition: ReadStreamCSV.h:307
souffle::ReadStream::readRecord
RamDomain readRecord(const std::string &source, const std::string &recordTypeName, size_t pos=0, size_t *charactersRead=nullptr)
Read a record from a string.
Definition: ReadStream.h:71
souffle::tuple
Defines a tuple for the OO interface such that relations with varying columns can be accessed.
Definition: SouffleInterface.h:443
souffle::ReadStreamCSV::delimiter
const std::string delimiter
Definition: ReadStreamCSV.h:241
souffle::ReadStreamCSV::inputMap
std::map< int, int > inputMap
Definition: ReadStreamCSV.h:244
souffle::ReadStream
Definition: ReadStream.h:40
souffle::ReadFileCSV::~ReadFileCSV
~ReadFileCSV() override=default