FunctionsStringSearch.h 27.3 KB
Newer Older
A
Alexey Milovidov 已提交
1 2
#pragma once

3
#include <Poco/Mutex.h>
4 5
#include <boost/concept_check.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
6

A
Alexey Milovidov 已提交
7 8 9 10 11 12
#include <statdaemons/OptimizedRegularExpression.h>

#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/Columns/ColumnString.h>
#include <DB/Columns/ColumnConst.h>
13
#include <DB/Common/Volnitsky.h>
A
Alexey Milovidov 已提交
14 15 16 17 18 19 20 21 22 23
#include <DB/Functions/IFunction.h>


namespace DB
{

/** Функции поиска и замены в строках:
  *
  * position(haystack, needle)	- обычный поиск подстроки в строке, возвращает позицию (в байтах) найденной подстроки, начиная с 1, или 0, если подстрока не найдена.
  * positionUTF8(haystack, needle) - то же самое, но позиция вычисляется в кодовых точках, при условии, что строка в кодировке UTF-8.
A
Alexey Milovidov 已提交
24 25 26 27
  * 
  * like(haystack, pattern)		- поиск по регулярному выражению LIKE; возвращает 0 или 1. Регистронезависимое, но только для латиницы.
  * notLike(haystack, pattern)
  *
A
Alexey Milovidov 已提交
28
  * match(haystack, pattern)	- поиск по регулярному выражению re2; возвращает 0 или 1.
29
  *
30 31 32 33 34
  * Применяет регексп re2 и достаёт:
  * - первый subpattern, если в regexp-е есть subpattern;
  * - нулевой subpattern (сматчившуюся часть, иначе);
  * - если не сматчилось - пустую строку.
  * extract(haystack, pattern)
A
Alexey Milovidov 已提交
35
  *
A
Alexey Milovidov 已提交
36 37 38
  * replaceOne(haystack, pattern, replacement) - замена шаблона по заданным правилам, только первое вхождение.
  * replaceAll(haystack, pattern, replacement) - замена шаблона по заданным правилам, все вхождения.
  *
39
  * Внимание! На данный момент, аргументы needle, pattern, n, replacement обязаны быть константами.
A
Alexey Milovidov 已提交
40 41 42 43 44
  */


struct PositionImpl
{
A
Alexey Milovidov 已提交
45 46 47
	typedef UInt64 ResultType;

	/// Предполагается, что res нужного размера и инициализирован нулями.
48
	static void vector(const ColumnString::Chars_t & data, const ColumnString::Offsets_t & offsets,
A
Alexey Milovidov 已提交
49
		const std::string & needle,
50
		PODArray<UInt64> & res)
A
Alexey Milovidov 已提交
51 52 53 54 55 56 57 58
	{
		const UInt8 * begin = &data[0];
		const UInt8 * pos = begin;
		const UInt8 * end = pos + data.size();

		/// Текущий индекс в массиве строк.
		size_t i = 0;

59 60
		Volnitsky searcher(needle.data(), needle.size(), end - pos);

A
Alexey Milovidov 已提交
61
		/// Искать будем следующее вхождение сразу во всех строках.
62
		while (pos < end && end != (pos = searcher.search(pos, end - pos)))
A
Alexey Milovidov 已提交
63 64 65
		{
			/// Определим, к какому индексу оно относится.
			while (begin + offsets[i] < pos)
66 67
			{
				res[i] = 0;
A
Alexey Milovidov 已提交
68
				++i;
69
			}
A
Alexey Milovidov 已提交
70 71 72 73

			/// Проверяем, что вхождение не переходит через границы строк.
			if (pos + needle.size() < begin + offsets[i])
				res[i] = (i != 0) ? pos - begin - offsets[i - 1] + 1 : (pos - begin + 1);
74 75
			else
				res[i] = 0;
A
Alexey Milovidov 已提交
76

A
Alexey Milovidov 已提交
77
			pos = begin + offsets[i];
A
Alexey Milovidov 已提交
78 79
			++i;
		}
80 81

		memset(&res[i], 0, (res.size() - i) * sizeof(res[0]));
A
Alexey Milovidov 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
	}

	static void constant(const std::string & data, const std::string & needle, UInt64 & res)
	{
		res = data.find(needle);
		if (res == std::string::npos)
			res = 0;
		else
			++res;
	}
};


struct PositionUTF8Impl
{
	typedef UInt64 ResultType;
	
99
	static void vector(const ColumnString::Chars_t & data, const ColumnString::Offsets_t & offsets,
A
Alexey Milovidov 已提交
100
		const std::string & needle,
101
		PODArray<UInt64> & res)
A
Alexey Milovidov 已提交
102 103 104 105
	{
		const UInt8 * begin = &data[0];
		const UInt8 * pos = begin;
		const UInt8 * end = pos + data.size();
A
Alexey Milovidov 已提交
106

A
Alexey Milovidov 已提交
107 108 109
		/// Текущий индекс в массиве строк.
		size_t i = 0;

110 111
		Volnitsky searcher(needle.data(), needle.size(), end - pos);

A
Alexey Milovidov 已提交
112
		/// Искать будем следующее вхождение сразу во всех строках.
113
		while (pos < end && end != (pos = searcher.search(pos, end - pos)))
A
Alexey Milovidov 已提交
114 115 116
		{
			/// Определим, к какому индексу оно относится.
			while (begin + offsets[i] < pos)
117 118
			{
				res[i] = 0;
A
Alexey Milovidov 已提交
119
				++i;
120
			}
A
Alexey Milovidov 已提交
121 122 123 124 125 126 127 128 129 130

			/// Проверяем, что вхождение не переходит через границы строк.
			if (pos + needle.size() < begin + offsets[i])
			{
				/// А теперь надо найти, сколько кодовых точек находится перед pos.
				res[i] = 1;
				for (const UInt8 * c = begin + (i != 0 ? offsets[i - 1] : 0); c < pos; ++c)
					if (*c <= 0x7F || *c >= 0xC0)
						++res[i];
			}
131 132
			else
				res[i] = 0;
A
Alexey Milovidov 已提交
133 134 135 136

			pos = begin + offsets[i];
			++i;
		}
137 138

		memset(&res[i], 0, (res.size() - i) * sizeof(res[0]));
A
Alexey Milovidov 已提交
139 140 141 142 143 144 145 146 147 148 149 150 151
	}

	static void constant(const std::string & data, const std::string & needle, UInt64 & res)
	{
		res = data.find(needle);
		if (res == std::string::npos)
			res = 0;
		else
			++res;
	}
};


A
Alexey Milovidov 已提交
152
/// Переводит выражение LIKE в regexp re2. Например, abc%def -> ^abc.*def$
A
Alexey Milovidov 已提交
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
inline String likePatternToRegexp(const String & pattern)
{
	String res = "^";
	res.reserve(pattern.size() * 2);
	const char * pos = pattern.data();
	const char * end = pos + pattern.size();

	while (pos < end)
	{
		switch (*pos)
		{
			case '^': case '$': case '.': case '[': case '|': case '(': case ')': case '?': case '*': case '+': case '{':
				res += '\\';
				res += *pos;
				break;
			case '%':
				res += ".*";
				break;
			case '_':
				res += ".";
				break;
			case '\\':
				++pos;
				if (pos == end)
					res += "\\\\";
				else
				{
					if (*pos == '%' || *pos == '_')
						res += *pos;
					else
					{
						res += '\\';
						res += *pos;
					}
				}
				break;
			default:
				res += *pos;
				break;
		}
		++pos;
	}

	res += '$';
	return res;
}


A
Alexey Milovidov 已提交
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
/// Сводится ли выражение LIKE к поиску подстроки в строке?
inline bool likePatternIsStrstr(const String & pattern, String & res)
{
	res = "";

	if (pattern.size() < 2 || *pattern.begin() != '%' || *pattern.rbegin() != '%')
		return false;

	res.reserve(pattern.size() * 2);

	const char * pos = pattern.data();
	const char * end = pos + pattern.size();

	++pos;
	--end;

	while (pos < end)
	{
		switch (*pos)
		{
			case '%': case '_':
				return false;
			case '\\':
				++pos;
				if (pos == end)
					return false;
				else
					res += *pos;
				break;
			default:
				res += *pos;
				break;
		}
		++pos;
	}

	return true;
}


A
Alexey Milovidov 已提交
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
struct Regexps
{
	typedef std::map<String, OptimizedRegularExpression> KnownRegexps;

	static const OptimizedRegularExpression & get(const std::string & pattern)
	{
		/// В GCC thread safe statics.
		static KnownRegexps known_regexps;
		static Poco::FastMutex mutex;
		Poco::ScopedLock<Poco::FastMutex> lock(mutex);

		KnownRegexps::const_iterator it = known_regexps.find(pattern);
		if (known_regexps.end() == it)
			it = known_regexps.insert(std::make_pair(pattern, OptimizedRegularExpression(pattern))).first;

		return it->second;
	}

	static const OptimizedRegularExpression & getLike(const std::string & pattern)
	{
		/// В GCC thread safe statics.
		static KnownRegexps known_regexps;
		static Poco::FastMutex mutex;
		Poco::ScopedLock<Poco::FastMutex> lock(mutex);

		KnownRegexps::const_iterator it = known_regexps.find(pattern);
		if (known_regexps.end() == it)
A
Alexey Milovidov 已提交
268
 			it = known_regexps.insert(std::make_pair(pattern, OptimizedRegularExpression(likePatternToRegexp(pattern)))).first;
A
Alexey Milovidov 已提交
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284

		return it->second;
	}
};


/** like - использовать выражения LIKE, если true; использовать выражения re2, если false.
  * Замечание: хотелось бы запускать регексп сразу над всем массивом, аналогично функции position,
  *  но для этого пришлось бы сделать поддержку символов \0 в движке регулярных выражений,
  *  и их интерпретацию как начал и концов строк.
  */
template <bool like, bool revert = false>
struct MatchImpl
{
	typedef UInt8 ResultType;

285
	static void vector(const ColumnString::Chars_t & data, const ColumnString::Offsets_t & offsets,
A
Alexey Milovidov 已提交
286
		const std::string & pattern,
287
		PODArray<UInt8> & res)
A
Alexey Milovidov 已提交
288
	{
A
Alexey Milovidov 已提交
289 290 291 292 293 294 295 296 297 298
		String strstr_pattern;
		/// Простой случай, когда выражение LIKE сводится к поиску подстроки в строке
		if (like && likePatternIsStrstr(pattern, strstr_pattern))
		{
			const UInt8 * begin = &data[0];
			const UInt8 * pos = begin;
			const UInt8 * end = pos + data.size();

			/// Текущий индекс в массиве строк.
			size_t i = 0;
A
Alexey Milovidov 已提交
299

300 301
			Volnitsky searcher(strstr_pattern.data(), strstr_pattern.size(), end - pos);

A
Alexey Milovidov 已提交
302
			/// Искать будем следующее вхождение сразу во всех строках.
303
			while (pos < end && end != (pos = searcher.search(pos, end - pos)))
A
Alexey Milovidov 已提交
304 305 306
			{
				/// Определим, к какому индексу оно относится.
				while (begin + offsets[i] < pos)
307 308
				{
					res[i] = revert;
A
Alexey Milovidov 已提交
309
					++i;
310
				}
A
Alexey Milovidov 已提交
311 312 313

				/// Проверяем, что вхождение не переходит через границы строк.
				if (pos + strstr_pattern.size() < begin + offsets[i])
314
					res[i] = !revert;
315 316
				else
					res[i] = revert;
A
Alexey Milovidov 已提交
317 318 319 320

				pos = begin + offsets[i];
				++i;
			}
321 322

			memset(&res[i], revert, (res.size() - i) * sizeof(res[0]));
A
Alexey Milovidov 已提交
323 324 325 326 327 328 329
		}
		else
		{
			const OptimizedRegularExpression & regexp = like ? Regexps::getLike(pattern) : Regexps::get(pattern);

			size_t size = offsets.size();
			for (size_t i = 0; i < size; ++i)
330
				res[i] = revert ^ regexp.match(reinterpret_cast<const char *>(&data[i != 0 ? offsets[i - 1] : 0]), (i != 0 ? offsets[i] - offsets[i - 1] : offsets[0]) - 1);
A
Alexey Milovidov 已提交
331
		}
A
Alexey Milovidov 已提交
332 333 334 335 336 337 338 339 340 341
	}

	static void constant(const std::string & data, const std::string & pattern, UInt8 & res)
	{
		const OptimizedRegularExpression & regexp = like ? Regexps::getLike(pattern) : Regexps::get(pattern);
		res = revert ^ regexp.match(data);
	}
};


342 343
struct ExtractImpl
{
344
	static void vector(const ColumnString::Chars_t & data, const ColumnString::Offsets_t & offsets,
345
					   const std::string & pattern,
346
					   ColumnString::Chars_t & res_data, ColumnString::Offsets_t & res_offsets)
347
	{
348
		res_data.reserve(data.size() / 5);
349 350 351 352
		res_offsets.resize(offsets.size());
		
		const OptimizedRegularExpression & regexp = Regexps::get(pattern);

353
		unsigned capture = regexp.getNumberOfSubpatterns() > 0 ? 1 : 0;
354 355 356 357 358 359 360 361 362
		OptimizedRegularExpression::MatchVec matches;
		matches.reserve(capture + 1);
		size_t prev_offset = 0;
		size_t res_offset = 0;

		for (size_t i = 0; i < offsets.size(); ++i)
		{
			size_t cur_offset = offsets[i];
			
363 364
			unsigned count = regexp.match(reinterpret_cast<const char *>(&data[prev_offset]), cur_offset - prev_offset - 1, matches, capture + 1);
			if (count > capture && matches[capture].offset != std::string::npos)
365
			{
366
				const OptimizedRegularExpression::Match & match = matches[capture];
367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385
				res_data.resize(res_offset + match.length + 1);
				memcpy(&res_data[res_offset], &data[prev_offset + match.offset], match.length);
				res_offset += match.length;
			}
			else
			{
				res_data.resize(res_offset + 1);
			}
			
			res_data[res_offset] = 0;
			++res_offset;
			res_offsets[i] = res_offset;

			prev_offset = cur_offset;
		}
	}
};


386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630
/** Заменить все вхождения подстроки needle на строку replacement. needle и replacement - константы.
  */
template <bool replaceOne = false>
struct ReplaceImpl
{
	static void vector(const ColumnString::Chars_t & data, const ColumnString::Offsets_t & offsets,
		const std::string & needle, const std::string & replacement,
		ColumnString::Chars_t & res_data, ColumnString::Offsets_t & res_offsets)
	{
		const UInt8 * begin = &data[0];
		const UInt8 * pos = begin;
		const UInt8 * end = pos + data.size();

		ColumnString::Offset_t res_offset = 0;
		res_data.reserve(data.size());
		size_t size = offsets.size();
		res_offsets.resize(size);

		/// Текущий индекс в массиве строк.
		size_t i = 0;

		Volnitsky searcher(needle.data(), needle.size(), end - pos);

		/// Искать будем следующее вхождение сразу во всех строках.
		while (pos < end)
		{
			const UInt8 * match = searcher.search(pos, end - pos);

			/// Копируем данные без изменения
			res_data.resize(res_data.size() + (match - pos));
			memcpy(&res_data[res_offset], pos, match - pos);

			/// Определим, к какому индексу оно относится.
			while (i < offsets.size() && begin + offsets[i] < match)
			{
				res_offsets[i] = res_offset + ((begin + offsets[i]) - pos);
				++i;
			}
			res_offset += (match - pos);

			/// Если дошли до конца, пора остановиться
			if (i == offsets.size())
				break;

			/// Правда ли, что с этой строкой больше не надо выполнять преобразования.
			bool can_finish_current_string = false;

			/// Проверяем, что вхождение не переходит через границы строк.
			if (match + needle.size() < begin + offsets[i])
			{
				res_data.resize(res_data.size() + replacement.size());
				memcpy(&res_data[res_offset], replacement.data(), replacement.size());
				res_offset += replacement.size();
				pos = match + needle.size();
				if (replaceOne)
					can_finish_current_string = true;
			}
			else
				can_finish_current_string = true;

			if (can_finish_current_string)
			{
				res_data.resize(res_data.size() + (begin + offsets[i] - match));
				memcpy(&res_data[res_offset], match, (begin + offsets[i] - match));
				res_offsets[i] = res_offset + (begin + offsets[i] - match);
				pos = begin + offsets[i];
			}
		}
	}

	static void vector_fixed(const ColumnString::Chars_t & data, size_t n,
		const std::string & needle, const std::string & replacement,
		ColumnString::Chars_t & res_data, ColumnString::Offsets_t & res_offsets)
	{
		const UInt8 * begin = &data[0];
		const UInt8 * pos = begin;
		const UInt8 * end = pos + data.size();

		ColumnString::Offset_t res_offset = 0;
		size_t size = data.size() / n;
		res_data.reserve(data.size());
		res_offsets.resize(size);

		/// Текущий индекс в массиве строк.
		size_t i = 0;

		Volnitsky searcher(needle.data(), needle.size(), end - pos);

		/// Искать будем следующее вхождение сразу во всех строках.
		while (pos < end)
		{
			const UInt8 * match = searcher.search(pos, end - pos);

			/// Копируем данные без изменения
			res_data.resize(res_data.size() + (match - pos));
			memcpy(&res_data[res_offset], pos, match - pos);

			/// Определим, к какому индексу оно относится.
			while (i < size && begin + n * (i + 1) < match)
			{
				res_offsets[i] = res_offset + ((begin + n * (i + 1)) - pos);
				++i;
			}
			res_offset += (match - pos);

			/// Если дошли до конца, пора остановиться
			if (i == size)
				break;

			/// Правда ли, что с этой строкой больше не надо выполнять преобразования.
			bool can_finish_current_string = false;

			/// Проверяем, что вхождение не переходит через границы строк.
			if (match + needle.size() < begin + n * (i + 1))
			{
				res_data.resize(res_data.size() + replacement.size());
				memcpy(&res_data[res_offset], replacement.data(), replacement.size());
				res_offset += replacement.size();
				pos = match + needle.size();
				if (replaceOne)
					can_finish_current_string = true;
			}
			else
				can_finish_current_string = true;

			if (can_finish_current_string)
			{
				res_data.resize(res_data.size() + (begin + n * (i + 1) - match));
				memcpy(&res_data[res_offset], match, (begin + n * (i + 1) - match));
				res_offsets[i] = res_offset + (begin + n * (i + 1) - match);
				pos = begin + n * (i + 1);
			}
		}
	}

	static void constant(const std::string & data, const std::string & needle, const std::string & replacement,
		std::string & res_data)
	{
		res_data = "";
		int replace_cnt = 0;
		for (size_t i = 0; i < data.size(); ++i)
		{
			bool match = true;
			if (i + needle.size() > data.size() || (replaceOne && replace_cnt > 0))
				match = false;
			for (size_t j = 0; match && j < needle.size(); ++j)
				if (data[i + j] != needle[j])
					match = false;
			if (match)
			{
				replace_cnt ++;
				res_data += replacement;
				i = i + needle.size() - 1;
			} else
				res_data += data[i];
		}
	}
};


template <typename Impl, typename Name>
class FunctionStringReplace : public IFunction
{
public:
	/// Получить имя функции.
	String getName() const
	{
		return Name::get();
	}

	/// Получить тип результата по типам аргументов. Если функция неприменима для данных аргументов - кинуть исключение.
	DataTypePtr getReturnType(const DataTypes & arguments) const
	{
		if (arguments.size() != 3)
			throw Exception("Number of arguments for function " + getName() + " doesn't match: passed "
				+ toString(arguments.size()) + ", should be 3.",
				ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);

		if (!dynamic_cast<const DataTypeString *>(&*arguments[0]) && !dynamic_cast<const DataTypeFixedString *>(&*arguments[0]))
			throw Exception("Illegal type " + arguments[0]->getName() + " of first argument of function " + getName(),
				ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);

		if (!dynamic_cast<const DataTypeString *>(&*arguments[0]) && !dynamic_cast<const DataTypeFixedString *>(&*arguments[0]))
			throw Exception("Illegal type " + arguments[1]->getName() + " of second argument of function " + getName(),
				ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);

		if (!dynamic_cast<const DataTypeString *>(&*arguments[0]) && !dynamic_cast<const DataTypeFixedString *>(&*arguments[0]))
			throw Exception("Illegal type " + arguments[2]->getName() + " of third argument of function " + getName(),
				ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);

		return new DataTypeString;
	}

	/// Выполнить функцию над блоком.
	void execute(Block & block, const ColumnNumbers & arguments, size_t result)
	{
		const ColumnPtr column_src = block.getByPosition(arguments[0]).column;
		const ColumnPtr column_needle = block.getByPosition(arguments[1]).column;
		const ColumnPtr column_replacement = block.getByPosition(arguments[2]).column;

		if (!column_needle->isConst() || !column_replacement->isConst())
			throw Exception("2nd and 3rd arguments of function " + getName() + " must be constants.");


		const IColumn * c1 = &*block.getByPosition(arguments[1]).column;
		const IColumn * c2 = &*block.getByPosition(arguments[2]).column;
		const ColumnConstString * c1_const = dynamic_cast<const ColumnConstString *>(c1);
		const ColumnConstString * c2_const = dynamic_cast<const ColumnConstString *>(c2);
		String needle = c1_const->getData();
		String replacement = c2_const->getData();

		if (needle.size() == 0)
			throw Exception("Length of the second argument of function replace must be greater than 0.", ErrorCodes::ARGUMENT_OUT_OF_BOUND);

		if (const ColumnString * col = dynamic_cast<const ColumnString *>(&*column_src))
		{
			ColumnString * col_res = new ColumnString;
			block.getByPosition(result).column = col_res;
			Impl::vector(col->getChars(), col->getOffsets(),
				needle, replacement,
				col_res->getChars(), col_res->getOffsets());
		}
		else if (const ColumnFixedString * col = dynamic_cast<const ColumnFixedString *>(&*column_src))
		{
			ColumnString * col_res = new ColumnString;
			block.getByPosition(result).column = col_res;
			Impl::vector_fixed(col->getChars(), col->getN(),
				needle, replacement,
				col_res->getChars(), col_res->getOffsets());
		}
		else if (const ColumnConstString * col = dynamic_cast<const ColumnConstString *>(&*column_src))
		{
			String res;
			Impl::constant(col->getData(), needle, replacement, res);
			ColumnConstString * col_res = new ColumnConstString(col->size(), res);
			block.getByPosition(result).column = col_res;
		}
		else
		   throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName()
				+ " of first argument of function " + getName(),
				ErrorCodes::ILLEGAL_COLUMN);
	}
};


A
Alexey Milovidov 已提交
631 632
template <typename Impl, typename Name>
class FunctionsStringSearch : public IFunction
A
Alexey Milovidov 已提交
633 634 635 636 637
{
public:
	/// Получить имя функции.
	String getName() const
	{
A
Alexey Milovidov 已提交
638
		return Name::get();
A
Alexey Milovidov 已提交
639 640 641 642 643 644 645
	}

	/// Получить тип результата по типам аргументов. Если функция неприменима для данных аргументов - кинуть исключение.
	DataTypePtr getReturnType(const DataTypes & arguments) const
	{
		if (arguments.size() != 2)
			throw Exception("Number of arguments for function " + getName() + " doesn't match: passed "
646
				+ toString(arguments.size()) + ", should be 2.",
A
Alexey Milovidov 已提交
647 648 649 650 651 652 653 654 655 656
				ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);

		if (!dynamic_cast<const DataTypeString *>(&*arguments[0]))
			throw Exception("Illegal type " + arguments[0]->getName() + " of argument of function " + getName(),
				ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);

		if (!dynamic_cast<const DataTypeString *>(&*arguments[1]))
			throw Exception("Illegal type " + arguments[1]->getName() + " of argument of function " + getName(),
				ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);

A
Alexey Milovidov 已提交
657
		return new typename DataTypeFromFieldType<typename Impl::ResultType>::Type;
A
Alexey Milovidov 已提交
658 659 660 661 662
	}

	/// Выполнить функцию над блоком.
	void execute(Block & block, const ColumnNumbers & arguments, size_t result)
	{
A
Alexey Milovidov 已提交
663
		typedef typename Impl::ResultType ResultType;
A
Alexey Milovidov 已提交
664 665 666 667 668 669 670 671 672 673 674 675 676
		
		const ColumnPtr column = block.getByPosition(arguments[0]).column;
		const ColumnPtr column_needle = block.getByPosition(arguments[1]).column;

		const ColumnConstString * col_needle = dynamic_cast<const ColumnConstString *>(&*column_needle);
		if (!col_needle)
			throw Exception("Second argument of function " + getName() + " must be constant string.", ErrorCodes::ILLEGAL_COLUMN);
		
		if (const ColumnString * col = dynamic_cast<const ColumnString *>(&*column))
		{
			ColumnVector<ResultType> * col_res = new ColumnVector<ResultType>;
			block.getByPosition(result).column = col_res;

A
Alexey Milovidov 已提交
677
			typename ColumnVector<ResultType>::Container_t & vec_res = col_res->getData();
A
Alexey Milovidov 已提交
678
			vec_res.resize(col->size());
679
			Impl::vector(col->getChars(), col->getOffsets(), col_needle->getData(), vec_res);
A
Alexey Milovidov 已提交
680 681 682 683
		}
		else if (const ColumnConstString * col = dynamic_cast<const ColumnConstString *>(&*column))
		{
			ResultType res = 0;
A
Alexey Milovidov 已提交
684
			Impl::constant(col->getData(), col_needle->getData(), res);
A
Alexey Milovidov 已提交
685 686 687 688 689 690 691 692 693 694 695

			ColumnConst<ResultType> * col_res = new ColumnConst<ResultType>(col->size(), res);
			block.getByPosition(result).column = col_res;
		}
		else
		   throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName()
				+ " of argument of function " + getName(),
				ErrorCodes::ILLEGAL_COLUMN);
	}
};

A
Alexey Milovidov 已提交
696

697 698 699 700 701 702 703 704 705 706 707 708 709 710 711
template <typename Impl, typename Name>
class FunctionsStringSearchToString : public IFunction
{
public:
	/// Получить имя функции.
	String getName() const
	{
		return Name::get();
	}
	
	/// Получить тип результата по типам аргументов. Если функция неприменима для данных аргументов - кинуть исключение.
	DataTypePtr getReturnType(const DataTypes & arguments) const
	{
		if (arguments.size() != 2)
			throw Exception("Number of arguments for function " + getName() + " doesn't match: passed "
712
			+ toString(arguments.size()) + ", should be 2.",
713
							ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
714 715 716 717 718 719 720 721 722 723
		
		if (!dynamic_cast<const DataTypeString *>(&*arguments[0]))
			throw Exception("Illegal type " + arguments[0]->getName() + " of argument of function " + getName(),
			ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
		
		if (!dynamic_cast<const DataTypeString *>(&*arguments[1]))
			throw Exception("Illegal type " + arguments[1]->getName() + " of argument of function " + getName(),
			ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
		
		return new DataTypeString;
724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740
	}
	
	/// Выполнить функцию над блоком.
	void execute(Block & block, const ColumnNumbers & arguments, size_t result)
	{
		const ColumnPtr column = block.getByPosition(arguments[0]).column;
		const ColumnPtr column_needle = block.getByPosition(arguments[1]).column;
		
		const ColumnConstString * col_needle = dynamic_cast<const ColumnConstString *>(&*column_needle);
		if (!col_needle)
			throw Exception("Second argument of function " + getName() + " must be constant string.", ErrorCodes::ILLEGAL_COLUMN);
		
		if (const ColumnString * col = dynamic_cast<const ColumnString *>(&*column))
		{
			ColumnString * col_res = new ColumnString;
			block.getByPosition(result).column = col_res;
			
741
			ColumnString::Chars_t & vec_res = col_res->getChars();
742
			ColumnString::Offsets_t & offsets_res = col_res->getOffsets();
743
			Impl::vector(col->getChars(), col->getOffsets(), col_needle->getData(), vec_res, offsets_res);
744 745 746
		}
		else if (const ColumnConstString * col = dynamic_cast<const ColumnConstString *>(&*column))
		{
747
			const std::string & data = col->getData();
748 749 750
			ColumnString::Chars_t vdata(
				reinterpret_cast<const ColumnString::Chars_t::value_type *>(data.c_str()),
				reinterpret_cast<const ColumnString::Chars_t::value_type *>(data.c_str() + data.size() + 1));
751
			ColumnString::Offsets_t offsets(1, vdata.size());
752
			ColumnString::Chars_t res_vdata;
753
			ColumnString::Offsets_t res_offsets;
754 755
			Impl::vector(vdata, offsets, col_needle->getData(), res_vdata, res_offsets);
			
756 757 758 759
			std::string res;

			if (!res_offsets.empty())
				res.assign(&res_vdata[0], &res_vdata[res_vdata.size() - 1]);
760 761 762 763 764 765 766 767 768 769 770 771
			
			ColumnConstString * col_res = new ColumnConstString(col->size(), res);
			block.getByPosition(result).column = col_res;
		}
		else
			throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName()
			+ " of argument of function " + getName(),
							ErrorCodes::ILLEGAL_COLUMN);
	}
};


A
Alexey Milovidov 已提交
772 773 774 775 776
struct NamePosition 		{ static const char * get() { return "position"; } };
struct NamePositionUTF8		{ static const char * get() { return "positionUTF8"; } };
struct NameMatch			{ static const char * get() { return "match"; } };
struct NameLike				{ static const char * get() { return "like"; } };
struct NameNotLike			{ static const char * get() { return "notLike"; } };
777
struct NameExtract			{ static const char * get() { return "extract"; } };
778 779
struct NameReplaceOne			{ static const char * get() { return "replaceOne"; } };
struct NameReplaceAll			{ static const char * get() { return "replaceAll"; } };
A
Alexey Milovidov 已提交
780 781 782 783

typedef FunctionsStringSearch<PositionImpl, 			NamePosition> 		FunctionPosition;
typedef FunctionsStringSearch<PositionUTF8Impl, 		NamePositionUTF8> 	FunctionPositionUTF8;
typedef FunctionsStringSearch<MatchImpl<false>, 		NameMatch> 			FunctionMatch;
784
typedef FunctionsStringSearch<MatchImpl<true>, 			NameLike> 			FunctionLike;
A
Alexey Milovidov 已提交
785
typedef FunctionsStringSearch<MatchImpl<true, true>, 	NameNotLike> 		FunctionNotLike;
786
typedef FunctionsStringSearchToString<ExtractImpl, 		NameExtract> 		FunctionExtract;
787 788
typedef FunctionStringReplace<ReplaceImpl<true>,		NameReplaceOne>		FunctionReplaceOne;
typedef FunctionStringReplace<ReplaceImpl<false>,		NameReplaceAll>		FunctionReplaceAll;
A
Alexey Milovidov 已提交
789

A
Alexey Milovidov 已提交
790
}